Refactored to align more with gtk template project

This commit is contained in:
itdominator 2024-02-12 21:36:53 -06:00
parent 389b2ee828
commit 351800d61b
36 changed files with 1037 additions and 299 deletions

View File

@ -1,15 +1,16 @@
# Python imports # Python imports
import builtins import builtins
import threading import threading
import sys
# Lib imports # Lib imports
# Application imports # Application imports
from utils.event_system import EventSystem from libs.event_system import EventSystem
from utils.endpoint_registry import EndpointRegistry from libs.endpoint_registry import EndpointRegistry
# from utils.keybindings import Keybindings # from libs.keybindings import Keybindings
from utils.logger import Logger from libs.logger import Logger
from utils.settings import Settings from libs.settings_manager.manager import SettingsManager
@ -30,14 +31,30 @@ def daemon_threaded_wrapper(fn):
# NOTE: Just reminding myself we can add to builtins two different ways... # NOTE: Just reminding myself we can add to builtins two different ways...
# __builtins__.update({"event_system": Builtins()}) # __builtins__.update({"event_system": Builtins()})
builtins.app_name = "<change_me>" builtins.app_name = "<change_me>"
# builtins.keybindings = Keybindings() # builtins.keybindings = Keybindings()
builtins.event_system = EventSystem() builtins.event_system = EventSystem()
builtins.endpoint_registry = EndpointRegistry() builtins.endpoint_registry = EndpointRegistry()
builtins.settings = Settings() builtins.settings_manager = SettingsManager()
builtins.logger = Logger(settings.get_home_config_path(), \ # builtins.db = DB()
_ch_log_lvl=settings.get_ch_log_lvl(), \
_fh_log_lvl=settings.get_fh_log_lvl()).get_logger() settings_manager.load_settings()
builtins.settings = settings_manager.settings
builtins.logger = Logger(settings_manager.get_home_config_path(), \
_ch_log_lvl=settings.debugging.ch_log_lvl, \
_fh_log_lvl=settings.debugging.fh_log_lvl).get_logger()
builtins.threaded = threaded_wrapper builtins.threaded = threaded_wrapper
builtins.daemon_threaded = daemon_threaded_wrapper builtins.daemon_threaded = daemon_threaded_wrapper
builtins.event_sleep_time = 0.05
def custom_except_hook(exc_type, exc_value, exc_traceback):
if issubclass(exc_type, KeyboardInterrupt):
sys.__excepthook__(exc_type, exc_value, exc_traceback)
return
logger.error("Uncaught exception", exc_info=(exc_type, exc_value, exc_traceback))
sys.excepthook = custom_except_hook

View File

@ -17,31 +17,38 @@ from app import Application
def main(args, unknownargs):
setproctitle(f'{app_name}')
if args.debug == "true":
settings_manager.set_debug(True)
if args.trace_debug == "true":
settings_manager.set_trace_debug(True)
settings_manager.do_dirty_start_check()
Application(args, unknownargs)
if __name__ == "__main__": if __name__ == "__main__":
''' Set process title, get arguments, and create GTK main thread. ''' ''' Set process title, get arguments, and create GTK main thread. '''
parser = argparse.ArgumentParser()
# Add long and short arguments
parser.add_argument("--debug", "-d", default="false", help="Do extra console messaging.")
parser.add_argument("--trace-debug", "-td", default="false", help="Disable saves, ignore IPC lock, do extra console messaging.")
parser.add_argument("--no-plugins", "-np", default="false", help="Do not load plugins.")
parser.add_argument("--new-tab", "-nt", default="false", help="Opens a 'New Tab' if a handler is set for it.")
parser.add_argument("--file", "-f", default="default", help="JUST SOME FILE ARG.")
# Read arguments (If any...)
args, unknownargs = parser.parse_known_args()
try: try:
setproctitle(f'{app_name}')
# faulthandler.enable() # For better debug info # faulthandler.enable() # For better debug info
main(args, unknownargs)
parser = argparse.ArgumentParser()
# Add long and short arguments
parser.add_argument("--debug", "-d", default="false", help="Do extra console messaging.")
parser.add_argument("--trace-debug", "-td", default="false", help="Disable saves, ignore IPC lock, do extra console messaging.")
parser.add_argument("--file", "-f", default="default", help="JUST SOME FILE ARG.")
# Read arguments (If any...)
args, unknownargs = parser.parse_known_args()
if args.debug == "true":
settings.set_debug(True)
if args.trace_debug == "true":
settings.set_trace_debug(True)
settings.do_dirty_start_check()
Application(args, unknownargs)
except Exception as e: except Exception as e:
traceback.print_exc() traceback.print_exc()
quit() quit()

View File

@ -1,35 +1,64 @@
# Python imports # Python imports
import signal
import os import os
# Lib imports # Lib imports
# Application imports # Application imports
from utils.ipc_server import IPCServer from libs.debugging import debug_signal_handler
from libs.ipc_server import IPCServer
from core.window import Window from core.window import Window
class AppLaunchException(Exception): class AppLaunchException(Exception):
... ...
class Application(IPCServer): class Application:
''' Create Settings and Controller classes. Bind signal to Builder. Inherit from Builtins to bind global methods and classes.''' """ docstring for Application. """
def __init__(self, args, unknownargs): def __init__(self, args, unknownargs):
super(Application, self).__init__() super(Application, self).__init__()
if not settings.is_trace_debug():
try:
self.create_ipc_listener()
except Exception:
...
if not self.is_ipc_alive: if not settings_manager.is_trace_debug():
for arg in unknownargs + [args.new_tab,]: self.load_ipc(args, unknownargs)
if os.path.isdir(arg):
message = f"FILE|{arg}"
self.send_ipc_message(message)
raise AppLaunchException(f"{app_name} IPC Server Exists: Will send path(s) to it and close...")
self.setup_debug_hook()
Window(args, unknownargs).run() Window(args, unknownargs).run()
def load_ipc(self, args, unknownargs):
ipc_server = IPCServer()
self.ipc_realization_check(ipc_server)
if not ipc_server.is_ipc_alive:
for arg in unknownargs + [args.new_tab,]:
if os.path.isfile(arg):
message = f"FILE|{arg}"
ipc_server.send_ipc_message(message)
raise AppLaunchException(f"{app_name} IPC Server Exists: Have sent path(s) to it and closing...")
def ipc_realization_check(self, ipc_server):
try:
ipc_server.create_ipc_listener()
except Exception:
ipc_server.send_test_ipc_message()
try:
ipc_server.create_ipc_listener()
except Exception as e:
...
def setup_debug_hook(self):
try:
# kill -SIGUSR2 <pid> from Linux/Unix or SIGBREAK signal from Windows
signal.signal(
vars(signal).get("SIGBREAK") or vars(signal).get("SIGUSR1"),
debug_signal_handler
)
except ValueError:
# Typically: ValueError: signal only works in main thread
...

View File

@ -14,7 +14,7 @@ class ControllerData:
''' ControllerData contains most of the state of the app at ay given time. It also has some support methods. ''' ''' ControllerData contains most of the state of the app at ay given time. It also has some support methods. '''
def setup_controller_data(self) -> None: def setup_controller_data(self) -> None:
self.window = settings.get_main_window() self.window = settings_manager.get_main_window()
self.builder = None self.builder = None
self.core_widget = None self.core_widget = None
self.plugins = PluginsController() self.plugins = PluginsController()
@ -60,4 +60,4 @@ class ControllerData:
proc = subprocess.Popen(['xclip','-selection','clipboard'], stdin=subprocess.PIPE) proc = subprocess.Popen(['xclip','-selection','clipboard'], stdin=subprocess.PIPE)
proc.stdin.write(data.encode(encoding)) proc.stdin.write(data.encode(encoding))
proc.stdin.close() proc.stdin.close()
retcode = proc.wait() retcode = proc.wait()

View File

@ -29,6 +29,7 @@ class Window(App):
def __init__(self, args, unknownargs): def __init__(self, args, unknownargs):
super(Window, self).__init__() super(Window, self).__init__()
settings_manager.set_main_window(self)
self._controller = None self._controller = None
@ -36,15 +37,14 @@ class Window(App):
self._setup_signals() self._setup_signals()
self._subscribe_to_events() self._subscribe_to_events()
settings.set_main_window(self)
self._load_widgets(args, unknownargs) self._load_widgets(args, unknownargs)
def _setup_styling(self): def _setup_styling(self):
self.title = app_name self.title = app_name
self.icon = settings.get_window_icon() self.icon = settings_manager.get_window_icon()
self.size = (settings.get_main_window_width(), self.size = (settings.config.main_window_width,
settings.get_main_window_height()) settings.config.main_window_height)
def _setup_signals(self): def _setup_signals(self):
... ...
@ -72,9 +72,8 @@ class Window(App):
self.rect.size = instance.size self.rect.size = instance.size
def _tear_down(self, widget=None, eve=None): def _tear_down(self, widget=None, eve=None):
settings.clear_pid() settings_manager.clear_pid()
time.sleep(event_sleep_time)
quit() quit()
def on_stop(self): def on_stop(self):
self._tear_down() self._tear_down()

View File

@ -0,0 +1,5 @@
"""
Dasta Class module
"""
from .event import Event

View File

@ -0,0 +1,14 @@
# Python imports
from dataclasses import dataclass, field
# Lib imports
# Application imports
@dataclass
class Event:
topic: str
content: str
raw_content: str

42
src/libs/db.py Normal file
View File

@ -0,0 +1,42 @@
# Python imports
from typing import Optional
from os import path
# Lib imports
from sqlmodel import Session, create_engine
# Application imports
from .models import SQLModel, User
class DB:
def __init__(self):
super(DB, self).__init__()
self.engine = None
self.create_engine()
def create_engine(self):
db_path = f"sqlite:///{settings_manager.get_home_config_path()}/database.db"
self.engine = create_engine(db_path)
SQLModel.metadata.create_all(self.engine)
def _add_entry(self, entry):
with Session(self.engine) as session:
session.add(entry)
session.commit()
def add_user_entry(self, name = None, password = None, email = None):
if not name or not password or not email: return
user = User()
user.name = name
user.password = password
user.email = email
self._add_entry(user)

52
src/libs/debugging.py Normal file
View File

@ -0,0 +1,52 @@
# Python imports
# Lib imports
# Application imports
# Break into a Python console upon SIGUSR1 (Linux) or SIGBREAK (Windows:
# CTRL+Pause/Break). To be included in all production code, just in case.
def debug_signal_handler(signal, frame):
del signal
del frame
try:
import rpdb2
logger.debug("\n\nStarting embedded RPDB2 debugger. Password is 'foobar'\n\n")
rpdb2.start_embedded_debugger("foobar", True, True)
rpdb2.setbreak(depth=1)
return
except StandardError:
...
try:
from rfoo.utils import rconsole
logger.debug("\n\nStarting embedded rconsole debugger...\n\n")
rconsole.spawn_server()
return
except StandardError as ex:
...
try:
from pudb import set_trace
logger.debug("\n\nStarting PuDB debugger...\n\n")
set_trace(paused = True)
return
except StandardError as ex:
...
try:
import pdb
logger.debug("\n\nStarting embedded PDB debugger...\n\n")
pdb.Pdb(skip=['gi.*']).set_trace()
return
except StandardError as ex:
...
try:
import code
code.interact()
except StandardError as ex:
logger.debug(f"{ex}, returning to normal program flow...")

View File

@ -3,11 +3,11 @@
# Lib imports # Lib imports
# Application imports # Application imports
from .singleton import Singleton
class EndpointRegistry(Singleton):
class EndpointRegistry():
def __init__(self): def __init__(self):
self._endpoints = {} self._endpoints = {}

View File

@ -4,16 +4,29 @@ from collections import defaultdict
# Lib imports # Lib imports
# Application imports # Application imports
from .singleton import Singleton
class EventSystem(Singleton):
class EventSystem:
""" Create event system. """ """ Create event system. """
def __init__(self): def __init__(self):
self.subscribers = defaultdict(list) self.subscribers = defaultdict(list)
self._is_paused = False
self._subscribe_to_events()
def _subscribe_to_events(self):
self.subscribe("pause_event_processing", self._pause_processing_events)
self.subscribe("resume_event_processing", self._resume_processing_events)
def _pause_processing_events(self):
self._is_paused = True
def _resume_processing_events(self):
self._is_paused = False
def subscribe(self, event_type, fn): def subscribe(self, event_type, fn):
self.subscribers[event_type].append(fn) self.subscribers[event_type].append(fn)
@ -25,6 +38,9 @@ class EventSystem:
self.subscribers.pop(event_type, None) self.subscribers.pop(event_type, None)
def emit(self, event_type, data = None): def emit(self, event_type, data = None):
if self._is_paused and event_type != "resume_event_processing":
return
if event_type in self.subscribers: if event_type in self.subscribers:
for fn in self.subscribers[event_type]: for fn in self.subscribers[event_type]:
if data: if data:
@ -36,6 +52,9 @@ class EventSystem:
fn() fn()
def emit_and_await(self, event_type, data = None): def emit_and_await(self, event_type, data = None):
if self._is_paused and event_type != "resume_event_processing":
return
""" NOTE: Should be used when signal has only one listener and vis-a-vis """ """ NOTE: Should be used when signal has only one listener and vis-a-vis """
if event_type in self.subscribers: if event_type in self.subscribers:
response = None response = None

View File

@ -8,11 +8,11 @@ from multiprocessing.connection import Listener
# Lib imports # Lib imports
# Application imports # Application imports
from .singleton import Singleton
class IPCServer(Singleton):
class IPCServer:
""" Create a listener so that other {app_name} instances send requests back to existing instance. """ """ Create a listener so that other {app_name} instances send requests back to existing instance. """
def __init__(self, ipc_address: str = '127.0.0.1', conn_type: str = "socket"): def __init__(self, ipc_address: str = '127.0.0.1', conn_type: str = "socket"):
self.is_ipc_alive = False self.is_ipc_alive = False
@ -40,7 +40,7 @@ class IPCServer:
def create_ipc_listener(self) -> None: def create_ipc_listener(self) -> None:
if self._conn_type == "socket": if self._conn_type == "socket":
if os.path.exists(self._ipc_address) and settings.is_dirty_start(): if os.path.exists(self._ipc_address) and settings_manager.is_dirty_start():
os.unlink(self._ipc_address) os.unlink(self._ipc_address)
listener = Listener(address=self._ipc_address, family="AF_UNIX", authkey=self._ipc_authkey) listener = Listener(address=self._ipc_address, family="AF_UNIX", authkey=self._ipc_authkey)
@ -55,24 +55,32 @@ class IPCServer:
@daemon_threaded @daemon_threaded
def _run_ipc_loop(self, listener) -> None: def _run_ipc_loop(self, listener) -> None:
# NOTE: Not thread safe if using with Gtk. Need to import GLib and use idle_add
while True: while True:
conn = listener.accept() try:
start_time = time.perf_counter() conn = listener.accept()
self._handle_ipc_message(conn, start_time) start_time = time.perf_counter()
self._handle_ipc_message(conn, start_time)
except Exception as e:
logger.debug( repr(e) )
listener.close() listener.close()
def _handle_ipc_message(self, conn, start_time) -> None: def _handle_ipc_message(self, conn, start_time) -> None:
while True: while True:
msg = conn.recv() msg = conn.recv()
if settings.is_debug(): logger.debug(msg)
print(msg)
if "FILE|" in msg: if "FILE|" in msg:
file = msg.split("FILE|")[1].strip() file = msg.split("FILE|")[1].strip()
if file: if file:
event_system.emit("handle_file_from_ipc", file) event_system.emit("handle_file_from_ipc", file)
if "DIR|" in msg:
file = msg.split("DIR|")[1].strip()
if file:
event_system.emit("handle_dir_from_ipc", file)
conn.close() conn.close()
break break
@ -100,6 +108,25 @@ class IPCServer:
conn.send(message) conn.send(message)
conn.close() conn.close()
except ConnectionRefusedError as e: except ConnectionRefusedError as e:
print("Connection refused...") logger.error("Connection refused...")
except Exception as e: except Exception as e:
print(repr(e)) logger.error( repr(e) )
def send_test_ipc_message(self, message: str = "Empty Data...") -> None:
try:
if self._conn_type == "socket":
conn = Client(address=self._ipc_address, family="AF_UNIX", authkey=self._ipc_authkey)
elif "unsecured" not in self._conn_type:
conn = Client((self._ipc_address, self._ipc_port), authkey=self._ipc_authkey)
else:
conn = Client((self._ipc_address, self._ipc_port))
conn.send(message)
conn.close()
except ConnectionRefusedError as e:
if self._conn_type == "socket":
logger.error("IPC Socket no longer valid.... Removing.")
os.unlink(self._ipc_address)
except Exception as e:
logger.error( repr(e) )

View File

@ -7,21 +7,20 @@ gi.require_version('Gdk', '3.0')
from gi.repository import Gdk from gi.repository import Gdk
# Application imports # Application imports
from .singleton import Singleton
def logger(log = ""):
def err(log = ""):
"""Print an error message"""
print(log) print(log)
class KeymapError(Exception): class KeymapError(Exception):
"""Custom exception for errors in keybinding configurations""" """ Custom exception for errors in keybinding configurations """
MODIFIER = re.compile('<([^<]+)>') MODIFIER = re.compile('<([^<]+)>')
class Keybindings: class Keybindings(Singleton):
"""Class to handle loading and lookup of Terminator keybindings""" """ Class to handle loading and lookup of Terminator keybindings """
modifiers = { modifiers = {
'ctrl': Gdk.ModifierType.CONTROL_MASK, 'ctrl': Gdk.ModifierType.CONTROL_MASK,
@ -43,13 +42,24 @@ class Keybindings:
self.keymap = Gdk.Keymap.get_default() self.keymap = Gdk.Keymap.get_default()
self.configure({}) self.configure({})
def print_keys(self):
print(self.keys)
def append_bindings(self, combos):
""" Accept new binding(s) and reload """
for item in combos:
method, keys = item.split(":")
self.keys[method] = keys
self.reload()
def configure(self, bindings): def configure(self, bindings):
"""Accept new bindings and reconfigure with them""" """ Accept new bindings and reconfigure with them """
self.keys = bindings self.keys = bindings
self.reload() self.reload()
def reload(self): def reload(self):
"""Parse bindings and mangle into an appropriate form""" """ Parse bindings and mangle into an appropriate form """
self._lookup = {} self._lookup = {}
self._masks = 0 self._masks = 0
@ -66,10 +76,10 @@ class Keybindings:
try: try:
keyval, mask = self._parsebinding(binding) keyval, mask = self._parsebinding(binding)
# Does much the same, but with poorer error handling. # Does much the same, but with worse error handling.
#keyval, mask = Gtk.accelerator_parse(binding) # keyval, mask = Gtk.accelerator_parse(binding)
except KeymapError as e: except KeymapError as e:
err ("keybinding reload failed to parse binding '%s': %s" % (binding, e)) logger(f"Keybinding reload failed to parse binding '{binding}': {e}")
else: else:
if mask & Gdk.ModifierType.SHIFT_MASK: if mask & Gdk.ModifierType.SHIFT_MASK:
if keyval == Gdk.KEY_Tab: if keyval == Gdk.KEY_Tab:
@ -88,7 +98,7 @@ class Keybindings:
self._masks |= mask self._masks |= mask
def _parsebinding(self, binding): def _parsebinding(self, binding):
"""Parse an individual binding using Gtk's binding function""" """ Parse an individual binding using Gtk's binding function """
mask = 0 mask = 0
modifiers = re.findall(MODIFIER, binding) modifiers = re.findall(MODIFIER, binding)
@ -103,25 +113,25 @@ class Keybindings:
keyval = Gdk.keyval_from_name(key) keyval = Gdk.keyval_from_name(key)
if keyval == 0: if keyval == 0:
raise KeymapError("Key '%s' is unrecognised..." % key) raise KeymapError(f"Key '{key}' is unrecognised...")
return (keyval, mask) return (keyval, mask)
def _lookup_modifier(self, modifier): def _lookup_modifier(self, modifier):
"""Map modifier names to gtk values""" """ Map modifier names to gtk values """
try: try:
return self.modifiers[modifier.lower()] return self.modifiers[modifier.lower()]
except KeyError: except KeyError:
raise KeymapError("Unhandled modifier '<%s>'" % modifier) raise KeymapError(f"Unhandled modifier '<{modifier}>'")
def lookup(self, event): def lookup(self, event):
"""Translate a keyboard event into a mapped key""" """ Translate a keyboard event into a mapped key """
try: try:
_found, keyval, _egp, _lvl, consumed = self.keymap.translate_keyboard_state( _found, keyval, _egp, _lvl, consumed = self.keymap.translate_keyboard_state(
event.hardware_keycode, event.hardware_keycode,
Gdk.ModifierType(event.get_state() & ~Gdk.ModifierType.LOCK_MASK), Gdk.ModifierType(event.get_state() & ~Gdk.ModifierType.LOCK_MASK),
event.group) event.group)
except TypeError: except TypeError:
err ("Keybinding lookup failed to translate keyboard event: %s" % dir(event)) logger(f"Keybinding lookup failed to translate keyboard event: {dir(event)}")
return None return None
mask = (event.get_state() & ~consumed) & self._masks mask = (event.get_state() & ~consumed) & self._masks

View File

@ -5,11 +5,11 @@ import logging
# Lib imports # Lib imports
# Application imports # Application imports
from .singleton import Singleton
class Logger(Singleton):
class Logger:
""" """
Create a new logging object and return it. Create a new logging object and return it.
:note: :note:

View File

@ -0,0 +1,3 @@
"""
Utils/Mixins module
"""

View File

@ -0,0 +1,70 @@
# Python imports
# Lib imports
import gi
gi.require_version('Gtk', '3.0')
gi.require_version('Gdk', '3.0')
from gi.repository import Gtk
from gi.repository import Gdk
from gi.repository import Gio
# Application imports
class DnDMixin:
def _setup_dnd(self):
flags = Gtk.DestDefaults.ALL
PLAIN_TEXT_TARGET_TYPE = 70
URI_TARGET_TYPE = 80
text_target = Gtk.TargetEntry.new('text/plain', Gtk.TargetFlags(0), PLAIN_TEXT_TARGET_TYPE)
uri_target = Gtk.TargetEntry.new('text/uri-list', Gtk.TargetFlags(0), URI_TARGET_TYPE)
# targets = [ text_target, uri_target ]
targets = [ uri_target ]
action = Gdk.DragAction.COPY
# self.drag_dest_set_target_list(targets)
self.drag_dest_set(flags, targets, action)
self._setup_dnd_signals()
def _setup_dnd_signals(self):
# self.connect("drag-motion", self._on_drag_motion)
# self.connect('drag-drop', self._on_drag_set)
self.connect("drag-data-received", self._on_drag_data_received)
def _on_drag_motion(self, widget, drag_context, x, y, time):
Gdk.drag_status(drag_context, drag_context.get_actions(), time)
return False
def _on_drag_set(self, widget, drag_context, data, info, time):
self.drag_get_data(drag_context, drag_context.list_targets()[-1], time)
return False
def _on_drag_data_received(self, widget, drag_context, x, y, data, info, time):
if info == 70: return
if info == 80:
uris = data.get_uris()
files = []
if len(uris) == 0:
uris = data.get_text().split("\n")
for uri in uris:
gfile = None
try:
gfile = Gio.File.new_for_uri(uri)
except Exception as e:
gfile = Gio.File.new_for_path(uri)
files.append(gfile)
event_system.emit('set_pre_drop_dnd', (files,))

View File

@ -0,0 +1,20 @@
# Python imports
# Lib imports
# Application imports
class IPCSignalsMixin:
""" IPCSignalsMixin handle messages from another starting solarfm process. """
def print_to_console(self, message=None):
logger.debug(message)
def handle_file_from_ipc(self, path: str) -> None:
logger.debug(f"File From IPC: {path}")
def handle_dir_from_ipc(self, path: str) -> None:
logger.debug(f"Dir From IPC: {path}")

View File

@ -0,0 +1,98 @@
# Python imports
import re
# Lib imports
import gi
gi.require_version('Gtk', '3.0')
gi.require_version('Gdk', '3.0')
from gi.repository import Gtk
from gi.repository import Gdk
# Application imports
valid_keyvalue_pat = re.compile(r"[a-z0-9A-Z-_\[\]\(\)\| ]")
class KeyboardSignalsMixin:
""" KeyboardSignalsMixin keyboard hooks controller. """
# TODO: Need to set methods that use this to somehow check the keybindings state instead.
def unset_keys_and_data(self, widget = None, eve = None):
self.ctrl_down = False
self.shift_down = False
self.alt_down = False
def unmap_special_keys(self, keyname):
if "control" in keyname:
self.ctrl_down = False
if "shift" in keyname:
self.shift_down = False
if "alt" in keyname:
self.alt_down = False
def on_global_key_press_controller(self, eve, user_data):
keyname = Gdk.keyval_name(user_data.keyval).lower()
modifiers = Gdk.ModifierType(user_data.get_state() & ~Gdk.ModifierType.LOCK_MASK)
self.was_midified_key = True if modifiers != 0 else False
if keyname.replace("_l", "").replace("_r", "") in ["control", "alt", "shift"]:
if "control" in keyname:
self.ctrl_down = True
if "shift" in keyname:
self.shift_down = True
if "alt" in keyname:
self.alt_down = True
def on_global_key_release_controller(self, widget, event):
""" Handler for keyboard events """
keyname = Gdk.keyval_name(event.keyval).lower()
modifiers = Gdk.ModifierType(event.get_state() & ~Gdk.ModifierType.LOCK_MASK)
if keyname.replace("_l", "").replace("_r", "") in ["control", "alt", "shift"]:
should_return = self.was_midified_key and (self.ctrl_down or self.shift_down or self.alt_down)
self.unmap_special_keys(keyname)
if should_return:
self.was_midified_key = False
return
mapping = keybindings.lookup(event)
logger.debug(f"on_global_key_release_controller > key > {keyname}")
logger.debug(f"on_global_key_release_controller > keyval > {event.keyval}")
logger.debug(f"on_global_key_release_controller > mapping > {mapping}")
if mapping:
self.handle_mapped_key_event(mapping)
else:
self.handle_as_key_event_scope(mapping)
def handle_mapped_key_event(self, mapping):
try:
self.handle_as_controller_scope()
except Exception:
self.handle_as_plugin_scope(mapping)
def handle_as_controller_scope(self, mapping):
getattr(self, mapping)()
def handle_as_plugin_scope(self, mapping):
if "||" in mapping:
sender, eve_type = mapping.split("||")
else:
sender = ""
eve_type = mapping
self.handle_as_key_event_system(sender, eve_type)
def handle_as_key_event_scope(self, mapping):
logger.debug(f"on_global_key_release_controller > key > {keyname}")
if self.ctrl_down and not keyname in ["1", "kp_1", "2", "kp_2", "3", "kp_3", "4", "kp_4"]:
self.handle_key_event_system(None, mapping)
def handle_key_event_system(self, sender, eve_type):
event_system.emit(eve_type)

15
src/libs/models.py Normal file
View File

@ -0,0 +1,15 @@
# Python imports
from typing import Optional
# Lib imports
from sqlmodel import SQLModel, Field
# Application imports
class User(SQLModel, table = True):
id: Optional[int] = Field(default = None, primary_key = True)
name: str
password: str
email: Optional[str] = None

View File

@ -0,0 +1,4 @@
"""
Settings module
"""
from .manager import SettingsManager

View File

@ -0,0 +1,191 @@
# Python imports
import inspect
import json
import zipfile
from os import path
from os import mkdir
# Lib imports
# Application imports
from ..singleton import Singleton
from .start_check_mixin import StartCheckMixin
from .options.settings import Settings
class MissingConfigError(Exception):
pass
class SettingsManager(StartCheckMixin, Singleton):
def __init__(self):
self._SCRIPT_PTH = path.dirname(path.realpath(__file__))
self._USER_HOME = path.expanduser('~')
self._HOME_CONFIG_PATH = f"{self._USER_HOME}/.config/{app_name.lower()}"
self._USR_PATH = f"/usr/share/{app_name.lower()}"
self._USR_CONFIG_FILE = f"{self._USR_PATH}/settings.json"
self._CONTEXT_PATH = f"{self._HOME_CONFIG_PATH}/context_path"
self._PLUGINS_PATH = f"{self._HOME_CONFIG_PATH}/plugins"
self._DEFAULT_ICONS = f"{self._HOME_CONFIG_PATH}/icons"
self._CONFIG_FILE = f"{self._HOME_CONFIG_PATH}/settings.json"
self._GLADE_FILE = f"{self._HOME_CONFIG_PATH}/Main_Window.glade"
self._CSS_FILE = f"{self._HOME_CONFIG_PATH}/stylesheet.css"
self._KEY_BINDINGS_FILE = f"{self._HOME_CONFIG_PATH}/key-bindings.json"
self._PID_FILE = f"{self._HOME_CONFIG_PATH}/{app_name.lower()}.pid"
self._UI_WIDEGTS_PATH = f"{self._HOME_CONFIG_PATH}/ui_widgets"
self._CONTEXT_MENU = f"{self._HOME_CONFIG_PATH}/contexct_menu.json"
self._WINDOW_ICON = f"{self._DEFAULT_ICONS}/{app_name.lower()}.png"
# self._USR_CONFIG_FILE = f"{self._USR_PATH}/settings.json"
# self._PLUGINS_PATH = f"plugins"
# self._CONFIG_FILE = f"settings.json"
# self._GLADE_FILE = f"Main_Window.glade"
# self._CSS_FILE = f"stylesheet.css"
# self._KEY_BINDINGS_FILE = f"key-bindings.json"
# self._PID_FILE = f"{app_name.lower()}.pid"
# self._WINDOW_ICON = f"{app_name.lower()}.png"
# self._UI_WIDEGTS_PATH = f"ui_widgets"
# self._CONTEXT_MENU = f"contexct_menu.json"
# self._DEFAULT_ICONS = f"icons"
# with zipfile.ZipFile("files.zip", mode="r", allowZip64=True) as zf:
# with io.TextIOWrapper(zf.open("text1.txt"), encoding="utf-8") as f:
if not path.exists(self._HOME_CONFIG_PATH):
mkdir(self._HOME_CONFIG_PATH)
if not path.exists(self._PLUGINS_PATH):
mkdir(self._PLUGINS_PATH)
if not path.exists(self._DEFAULT_ICONS):
self._DEFAULT_ICONS = f"{self._USR_PATH}/icons"
if not path.exists(self._DEFAULT_ICONS):
raise MissingConfigError("Unable to find the application icons directory.")
if not path.exists(self._GLADE_FILE):
self._GLADE_FILE = f"{self._USR_PATH}/Main_Window.glade"
if not path.exists(self._GLADE_FILE):
raise MissingConfigError("Unable to find the application Glade file.")
if not path.exists(self._KEY_BINDINGS_FILE):
self._KEY_BINDINGS_FILE = f"{self._USR_PATH}/key-bindings.json"
if not path.exists(self._KEY_BINDINGS_FILE):
raise MissingConfigError("Unable to find the application Keybindings file.")
if not path.exists(self._CSS_FILE):
self._CSS_FILE = f"{self._USR_PATH}/stylesheet.css"
if not path.exists(self._CSS_FILE):
raise MissingConfigError("Unable to find the application Stylesheet file.")
if not path.exists(self._WINDOW_ICON):
self._WINDOW_ICON = f"{self._USR_PATH}/icons/{app_name.lower()}.png"
if not path.exists(self._WINDOW_ICON):
raise MissingConfigError("Unable to find the application icon.")
if not path.exists(self._UI_WIDEGTS_PATH):
self._UI_WIDEGTS_PATH = f"{self._USR_PATH}/ui_widgets"
if not path.exists(self._CONTEXT_MENU):
self._CONTEXT_MENU = f"{self._USR_PATH}/contexct_menu.json"
try:
with open(self._KEY_BINDINGS_FILE) as file:
bindings = json.load(file)["keybindings"]
keybindings.configure(bindings)
except Exception as e:
print( f"Settings Manager: {self._KEY_BINDINGS_FILE}\n\t\t{repr(e)}" )
try:
with open(self._CONTEXT_MENU) as file:
self._context_menu_data = json.load(file)
except Exception as e:
print( f"Settings Manager: {self._CONTEXT_MENU}\n\t\t{repr(e)}" )
self.settings: Settings = None
self._main_window = None
self._builder = None
self.PAINT_BG_COLOR = (0, 0, 0, 0.0)
self._trace_debug = False
self._debug = False
self._dirty_start = False
def register_signals_to_builder(self, classes=None):
handlers = {}
for c in classes:
methods = None
try:
methods = inspect.getmembers(c, predicate=inspect.ismethod)
handlers.update(methods)
except Exception as e:
...
self._builder.connect_signals(handlers)
def set_main_window(self, window): self._main_window = window
def set_builder(self, builder) -> any: self._builder = builder
def get_monitor_data(self) -> list:
screen = self._main_window.get_screen()
monitors = []
for m in range(screen.get_n_monitors()):
monitors.append(screen.get_monitor_geometry(m))
print("{}x{}+{}+{}".format(monitor.width, monitor.height, monitor.x, monitor.y))
return monitors
def get_main_window(self) -> any: return self._main_window
def get_builder(self) -> any: return self._builder
def get_paint_bg_color(self) -> any: return self.PAINT_BG_COLOR
def get_glade_file(self) -> str: return self._GLADE_FILE
def get_ui_widgets_path(self) -> str: return self._UI_WIDEGTS_PATH
def get_context_menu_data(self) -> str: return self._context_menu_data
def get_context_path(self) -> str: return self._CONTEXT_PATH
def get_plugins_path(self) -> str: return self._PLUGINS_PATH
def get_icon_theme(self) -> str: return self._ICON_THEME
def get_css_file(self) -> str: return self._CSS_FILE
def get_window_icon(self) -> str: return self._WINDOW_ICON
def get_home_config_path(self) -> str: return self._HOME_CONFIG_PATH
def get_window_icon(self) -> str: return self._WINDOW_ICON
def get_home_path(self) -> str: return self._USER_HOME
def is_trace_debug(self) -> str: return self._trace_debug
def is_debug(self) -> str: return self._debug
def call_method(self, target_class = None, _method_name = None, data = None):
method_name = str(_method_name)
method = getattr(target_class, method_name, lambda data: f"No valid key passed...\nkey={method_name}\nargs={data}")
return method(data) if data else method()
def set_main_window_x(self, x = 0): self.settings.config.main_window_x = x
def set_main_window_y(self, y = 0): self.settings.config.main_window_y = y
def set_main_window_width(self, width = 800): self.settings.config.main_window_width = width
def set_main_window_height(self, height = 600): self.settings.config.main_window_height = height
def set_main_window_min_width(self, width = 720): self.settings.config.main_window_min_width = width
def set_main_window_min_height(self, height = 480): self.settings.config.main_window_min_height = height
def set_trace_debug(self, trace_debug):
self._trace_debug = trace_debug
def set_debug(self, debug):
self._debug = debug
def load_settings(self):
if not path.exists(self._CONFIG_FILE):
self.settings = Settings()
return
with open(self._CONFIG_FILE) as file:
data = json.load(file)
data["load_defaults"] = False
self.settings = Settings(**data)
def save_settings(self):
with open(self._CONFIG_FILE, 'w') as outfile:
json.dump(self.settings.as_dict(), outfile, separators=(',', ':'), indent=4)

View File

@ -0,0 +1,8 @@
"""
Options module
"""
from .settings import Settings
from .config import Config
from .filters import Filters
from .theming import Theming
from .debugging import Debugging

View File

@ -0,0 +1,39 @@
# Python imports
from dataclasses import dataclass, field
# Lib imports
# Application imports
@dataclass
class Config:
base_of_home: str = ""
hide_hidden_files: str = "true"
thumbnailer_path: str = "ffmpegthumbnailer"
blender_thumbnailer_path: str = ""
go_past_home: str = "true"
lock_folder: str = "false"
locked_folders: list = field(default_factory=lambda: [ "venv", "flasks" ])
mplayer_options: str = "-quiet -really-quiet -xy 1600 -geometry 50%:50%",
music_app: str = "/opt/deadbeef/bin/deadbeef"
media_app: str = "mpv"
image_app: str = "mirage"
office_app: str = "libreoffice"
pdf_app: str = "evince"
code_app: str = "atom"
text_app: str = "leafpad"
file_manager_app: str = "solarfm"
terminal_app: str = "terminator"
remux_folder_max_disk_usage: str = "8589934592"
make_transparent: int = 0
main_window_x: int = 721
main_window_y: int = 465
main_window_min_width: int = 720
main_window_min_height: int = 480
main_window_width: int = 800
main_window_height: int = 600
application_dirs: list = field(default_factory=lambda: [
"/usr/share/applications",
f"{settings_manager.get_home_path()}/.local/share/applications"
])

View File

@ -0,0 +1,12 @@
# Python imports
from dataclasses import dataclass
# Lib imports
# Application imports
@dataclass
class Debugging:
ch_log_lvl: int = 10
fh_log_lvl: int = 20

View File

@ -0,0 +1,90 @@
# Python imports
from dataclasses import dataclass, field
# Lib imports
# Application imports
@dataclass
class Filters:
meshs: list = field(default_factory=lambda: [
".blend",
".dae",
".fbx",
".gltf",
".obj",
".stl"
])
code: list = field(default_factory=lambda: [
".cpp",
".css",
".c",
".go",
".html",
".htm",
".java",
".js",
".json",
".lua",
".md",
".py",
".rs",
".toml",
".xml",
".pom"
])
videos: list = field(default_factory=lambda:[
".mkv",
".mp4",
".webm",
".avi",
".mov",
".m4v",
".mpg",
".mpeg",
".wmv",
".flv"
])
office: list = field(default_factory=lambda: [
".doc",
".docx",
".xls",
".xlsx",
".xlt",
".xltx",
".xlm",
".ppt",
".pptx",
".pps",
".ppsx",
".odt",
".rtf"
])
images: list = field(default_factory=lambda: [
".png",
".jpg",
".jpeg",
".gif",
".ico",
".tga",
".webp"
])
text: list = field(default_factory=lambda: [
".txt",
".text",
".sh",
".cfg",
".conf",
".log"
])
music: list = field(default_factory=lambda: [
".psf",
".mp3",
".ogg",
".flac",
".m4a"
])
pdf: list = field(default_factory=lambda: [
".pdf"
])

View File

@ -0,0 +1,31 @@
# Python imports
from dataclasses import dataclass, field
from dataclasses import asdict
# Gtk imports
# Application imports
from .config import Config
from .filters import Filters
from .theming import Theming
from .debugging import Debugging
@dataclass
class Settings:
load_defaults: bool = True
config: Config = field(default_factory=lambda: Config())
filters: Filters = field(default_factory=lambda: Filters())
theming: Theming = field(default_factory=lambda: Theming())
debugging: Debugging = field(default_factory=lambda: Debugging())
def __post_init__(self):
if not self.load_defaults:
self.load_defaults = False
self.config = Config(**self.config)
self.filters = Filters(**self.filters)
self.theming = Theming(**self.theming)
self.debugging = Debugging(**self.debugging)
def as_dict(self):
return asdict(self)

View File

@ -0,0 +1,14 @@
# Python imports
from dataclasses import dataclass
# Lib imports
# Application imports
@dataclass
class Theming:
transparency: int = 64
success_color: str = "#88cc27"
warning_color: str = "#ffa800"
error_color: str = "#ff0000"

View File

@ -0,0 +1,3 @@
"""
Settings Other module
"""

View File

@ -0,0 +1,42 @@
# Python imports
# Lib imports
import gi
gi.require_version('WebKit2', '4.0')
from gi.repository import WebKit2
# Application imports
class WebkitUISettings(WebKit2.Settings):
def __init__(self):
super(WebkitUISettings, self).__init__()
self._set_default_settings()
# Note: Highly insecure setup but most "app" like setup I could think of.
# Audit heavily any scripts/links ran/clicked under this setup!
def _set_default_settings(self):
self.set_enable_xss_auditor(True)
self.set_enable_hyperlink_auditing(True)
# self.set_enable_xss_auditor(False)
# self.set_enable_hyperlink_auditing(False)
self.set_allow_file_access_from_file_urls(True)
self.set_allow_universal_access_from_file_urls(True)
self.set_enable_page_cache(False)
self.set_enable_offline_web_application_cache(False)
self.set_enable_html5_local_storage(False)
self.set_enable_html5_database(False)
self.set_enable_fullscreen(False)
self.set_print_backgrounds(False)
self.set_enable_tabs_to_links(False)
self.set_enable_developer_extras(True)
self.set_enable_webrtc(True)
self.set_enable_webaudio(True)
self.set_enable_accelerated_2d_canvas(True)
self.set_user_agent(f"{app_name}")

View File

@ -0,0 +1,63 @@
# Python imports
import os
import json
import inspect
# Lib imports
# Application imports
class StartCheckMixin:
def is_dirty_start(self) -> bool:
return self._dirty_start
def clear_pid(self):
if not self.is_trace_debug():
self._clean_pid()
def do_dirty_start_check(self):
if self.is_trace_debug():
pid = os.getpid()
self._print_pid(pid)
return
if os.path.exists(self._PID_FILE):
with open(self._PID_FILE, "r") as f:
pid = f.readline().strip()
if pid not in ("", None):
if self.is_pid_alive( int(pid) ):
print("PID file exists and PID is alive... Letting downstream errors (sans debug args) handle app closure propigation.")
return
self._write_new_pid()
""" Check For the existence of a unix pid. """
def is_pid_alive(self, pid):
print(f"PID Found: {pid}")
try:
os.kill(pid, 0)
except OSError:
print(f"{app_name} PID file exists but PID is irrelevant; starting dirty...")
self._dirty_start = True
return False
return True
def _write_new_pid(self):
pid = os.getpid()
self._write_pid(pid)
self._print_pid(pid)
def _print_pid(self, pid):
print(f"{app_name} PID: {pid}")
def _clean_pid(self):
os.unlink(self._PID_FILE)
def _write_pid(self, pid):
with open(self._PID_FILE, "w") as _pid:
_pid.write(f"{pid}")

24
src/libs/singleton.py Normal file
View File

@ -0,0 +1,24 @@
# Python imports
# Lib imports
# Application imports
class SingletonError(Exception):
pass
class Singleton:
ccount = 0
def __new__(cls, *args, **kwargs):
obj = super(Singleton, cls).__new__(cls)
cls.ccount += 1
if cls.ccount == 2:
raise SingletonError(f"Exceeded {cls.__name__} instantiation limit...")
return obj

View File

@ -26,7 +26,7 @@ class PluginsController:
path = os.path.dirname(os.path.realpath(__file__)) path = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, path) # NOTE: I think I'm not using this correctly... sys.path.insert(0, path) # NOTE: I think I'm not using this correctly...
self._plugins_path = settings.get_plugins_path() self._plugins_path = settings_manager.get_plugins_path()
self._plugins_dir_watcher = None self._plugins_dir_watcher = None
self._plugin_collection = [] self._plugin_collection = []
@ -112,4 +112,4 @@ class PluginsController:
self._plugin_collection.append(plugin) self._plugin_collection.append(plugin)
def reload_plugins(self, file: str = None) -> None: def reload_plugins(self, file: str = None) -> None:
print(f"Reloading plugins... stub.") print(f"Reloading plugins... stub.")

View File

@ -1,4 +0,0 @@
"""
Settings module
"""
from .settings import Settings

View File

@ -1,156 +0,0 @@
# Python imports
import os
import json
import inspect
# Lib imports
# Application imports
from .start_check_mixin import StartCheckMixin
class MissingConfigError(Exception):
pass
class Settings(StartCheckMixin):
def __init__(self):
self._SCRIPT_PTH = os.path.dirname(os.path.realpath(__file__))
self._USER_HOME = os.path.expanduser('~')
self._USR_PATH = f"/usr/share/{app_name.lower()}"
self._USR_CONFIG_FILE = f"{self._USR_PATH}/settings.json"
self._HOME_CONFIG_PATH = f"{self._USER_HOME}/.config/{app_name.lower()}"
self._PLUGINS_PATH = f"{self._HOME_CONFIG_PATH}/plugins"
self._DEFAULT_ICONS = f"{self._HOME_CONFIG_PATH}/icons"
self._CONFIG_FILE = f"{self._HOME_CONFIG_PATH}/settings.json"
self._GLADE_FILE = f"{self._HOME_CONFIG_PATH}/Main_Window.glade"
self._CSS_FILE = f"{self._HOME_CONFIG_PATH}/stylesheet.css"
self._KEY_BINDINGS_FILE = f"{self._HOME_CONFIG_PATH}/key-bindings.json"
self._PID_FILE = f"{self._HOME_CONFIG_PATH}/{app_name.lower()}.pid"
self._WINDOW_ICON = f"{self._DEFAULT_ICONS}/icons/{app_name.lower()}.png"
if not os.path.exists(self._HOME_CONFIG_PATH):
os.mkdir(self._HOME_CONFIG_PATH)
if not os.path.exists(self._PLUGINS_PATH):
os.mkdir(self._PLUGINS_PATH)
if not os.path.exists(self._CONFIG_FILE):
import shutil
try:
shutil.copyfile(self._USR_CONFIG_FILE, self._CONFIG_FILE)
except Exception as e:
raise
if not os.path.exists(self._DEFAULT_ICONS):
self._DEFAULT_ICONS = f"{self._USR_PATH}/icons"
if not os.path.exists(self._DEFAULT_ICONS):
raise MissingConfigError("Unable to find the application icons directory.")
if not os.path.exists(self._GLADE_FILE):
self._GLADE_FILE = f"{self._USR_PATH}/Main_Window.glade"
if not os.path.exists(self._GLADE_FILE):
raise MissingConfigError("Unable to find the application Glade file.")
if not os.path.exists(self._KEY_BINDINGS_FILE):
self._KEY_BINDINGS_FILE = f"{self._USR_PATH}/key-bindings.json"
if not os.path.exists(self._KEY_BINDINGS_FILE):
raise MissingConfigError("Unable to find the application Keybindings file.")
if not os.path.exists(self._CSS_FILE):
self._CSS_FILE = f"{self._USR_PATH}/stylesheet.css"
if not os.path.exists(self._CSS_FILE):
raise MissingConfigError("Unable to find the application Stylesheet file.")
if not os.path.exists(self._WINDOW_ICON):
self._WINDOW_ICON = f"{self._USR_PATH}/icons/{app_name.lower()}.png"
if not os.path.exists(self._WINDOW_ICON):
raise MissingConfigError("Unable to find the application icon.")
# with open(self._KEY_BINDINGS_FILE) as file:
# bindings = json.load(file)["keybindings"]
# keybindings.configure(bindings)
self._main_window = None
self._main_window_w = 800
self._main_window_h = 600
self._builder = None
self._trace_debug = False
self._debug = False
self._dirty_start = False
self.load_settings()
def register_signals_to_builder(self, classes=None):
handlers = {}
for c in classes:
methods = None
try:
methods = inspect.getmembers(c, predicate=inspect.ismethod)
handlers.update(methods)
except Exception as e:
...
self._builder.connect_signals(handlers)
def set_main_window(self, window): self._main_window = window
def set_builder(self, builder) -> any: self._builder = builder
def get_monitor_data(self) -> list:
screen = self._main_window.get_screen()
monitors = []
for m in range(screen.get_n_monitors()):
monitors.append(screen.get_monitor_geometry(m))
print("{}x{}+{}+{}".format(monitor.width, monitor.height, monitor.x, monitor.y))
return monitors
def get_main_window(self) -> any: return self._main_window
def get_main_window_width(self) -> any: return self._main_window_w
def get_main_window_height(self) -> any: return self._main_window_h
def get_builder(self) -> any: return self._builder
def get_glade_file(self) -> str: return self._GLADE_FILE
def get_plugins_path(self) -> str: return self._PLUGINS_PATH
def get_icon_theme(self) -> str: return self._ICON_THEME
def get_css_file(self) -> str: return self._CSS_FILE
def get_home_config_path(self) -> str: return self._HOME_CONFIG_PATH
def get_window_icon(self) -> str: return self._WINDOW_ICON
def get_home_path(self) -> str: return self._USER_HOME
# Filter returns
def get_office_filter(self) -> tuple: return tuple(self._settings["filters"]["office"])
def get_vids_filter(self) -> tuple: return tuple(self._settings["filters"]["videos"])
def get_text_filter(self) -> tuple: return tuple(self._settings["filters"]["text"])
def get_music_filter(self) -> tuple: return tuple(self._settings["filters"]["music"])
def get_images_filter(self) -> tuple: return tuple(self._settings["filters"]["images"])
def get_pdf_filter(self) -> tuple: return tuple(self._settings["filters"]["pdf"])
def get_success_color(self) -> str: return self._theming["success_color"]
def get_warning_color(self) -> str: return self._theming["warning_color"]
def get_error_color(self) -> str: return self._theming["error_color"]
def is_trace_debug(self) -> str: return self._trace_debug
def is_debug(self) -> str: return self._debug
def get_ch_log_lvl(self) -> str: return self._settings["debugging"]["ch_log_lvl"]
def get_fh_log_lvl(self) -> str: return self._settings["debugging"]["fh_log_lvl"]
def set_trace_debug(self, trace_debug):
self._trace_debug = trace_debug
def set_debug(self, debug):
self._debug = debug
def load_settings(self):
with open(self._CONFIG_FILE) as f:
self._settings = json.load(f)
self._config = self._settings["config"]
self._theming = self._settings["theming"]
def save_settings(self):
with open(self._CONFIG_FILE, 'w') as outfile:
json.dump(self._settings, outfile, separators=(',', ':'), indent=4)

View File

@ -1,50 +0,0 @@
# Python imports
import os
import json
import inspect
# Lib imports
# Application imports
class StartCheckMixin:
def is_dirty_start(self) -> bool: return self._dirty_start
def clear_pid(self): self._clean_pid()
def do_dirty_start_check(self):
if not os.path.exists(self._PID_FILE):
self._write_new_pid()
else:
with open(self._PID_FILE, "r") as _pid:
pid = _pid.readline().strip()
if pid not in ("", None):
self._check_alive_status(int(pid))
else:
self._write_new_pid()
""" Check For the existence of a unix pid. """
def _check_alive_status(self, pid):
print(f"PID Found: {pid}")
try:
os.kill(pid, 0)
except OSError:
print(f"{app_name} is starting dirty...")
self._dirty_start = True
self._write_new_pid()
return
print("PID is alive... Let downstream errors (sans debug args) handle app closure propigation.")
def _write_new_pid(self):
pid = os.getpid()
self._write_pid(pid)
def _clean_pid(self):
os.unlink(self._PID_FILE)
def _write_pid(self, pid):
with open(self._PID_FILE, "w") as _pid:
_pid.write(f"{pid}")