moved utils --> libs

This commit is contained in:
2024-01-11 19:35:04 -06:00
parent 7e9dd31d01
commit 1876ba3fd0
24 changed files with 17 additions and 17 deletions

3
src/libs/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
"""
Utils module
"""

42
src/libs/db.py Normal file
View File

@@ -0,0 +1,42 @@
# Python imports
from typing import Optional
from os import path
# Lib imports
from sqlmodel import Session, create_engine
# Application imports
from .models import SQLModel, User
class DB:
def __init__(self):
super(DB, self).__init__()
self.engine = None
self.create_engine()
def create_engine(self):
db_path = f"sqlite:///{settings_manager.get_home_config_path()}/database.db"
self.engine = create_engine(db_path)
SQLModel.metadata.create_all(self.engine)
def _add_entry(self, entry):
with Session(self.engine) as session:
session.add(entry)
session.commit()
def add_user_entry(self, name = None, password = None, email = None):
if not name or not password or not email: return
user = User()
user.name = name
user.password = password
user.email = email
self._add_entry(user)

52
src/libs/debugging.py Normal file
View File

@@ -0,0 +1,52 @@
# Python imports
# Lib imports
# Application imports
# Break into a Python console upon SIGUSR1 (Linux) or SIGBREAK (Windows:
# CTRL+Pause/Break). To be included in all production code, just in case.
def debug_signal_handler(signal, frame):
del signal
del frame
try:
import rpdb2
logger.debug("\n\nStarting embedded RPDB2 debugger. Password is 'foobar'\n\n")
rpdb2.start_embedded_debugger("foobar", True, True)
rpdb2.setbreak(depth=1)
return
except StandardError:
...
try:
from rfoo.utils import rconsole
logger.debug("\n\nStarting embedded rconsole debugger...\n\n")
rconsole.spawn_server()
return
except StandardError as ex:
...
try:
from pudb import set_trace
logger.debug("\n\nStarting PuDB debugger...\n\n")
set_trace(paused = True)
return
except StandardError as ex:
...
try:
import pdb
logger.debug("\n\nStarting embedded PDB debugger...\n\n")
pdb.Pdb(skip=['gi.*']).set_trace()
return
except StandardError as ex:
...
try:
import code
code.interact()
except StandardError as ex:
logger.debug(f"{ex}, returning to normal program flow...")

View File

@@ -0,0 +1,22 @@
# Python imports
# Lib imports
# Application imports
from .singleton import Singleton
class EndpointRegistry(Singleton):
def __init__(self):
self._endpoints = {}
def register(self, rule, **options):
def decorator(f):
self._endpoints[rule] = f
return f
return decorator
def get_endpoints(self):
return self._endpoints

73
src/libs/event_system.py Normal file
View File

@@ -0,0 +1,73 @@
# Python imports
from collections import defaultdict
# Lib imports
# Application imports
from .singleton import Singleton
class EventSystem(Singleton):
""" Create event system. """
def __init__(self):
self.subscribers = defaultdict(list)
self._is_paused = False
self._subscribe_to_events()
def _subscribe_to_events(self):
self.subscribe("pause_event_processing", self._pause_processing_events)
self.subscribe("resume_event_processing", self._resume_processing_events)
def _pause_processing_events(self):
self._is_paused = True
def _resume_processing_events(self):
self._is_paused = False
def subscribe(self, event_type, fn):
self.subscribers[event_type].append(fn)
def unsubscribe(self, event_type, fn):
self.subscribers[event_type].remove(fn)
def unsubscribe_all(self, event_type):
self.subscribers.pop(event_type, None)
def emit(self, event_type, data = None):
if self._is_paused and event_type != "resume_event_processing":
return
if event_type in self.subscribers:
for fn in self.subscribers[event_type]:
if data:
if hasattr(data, '__iter__') and not type(data) is str:
fn(*data)
else:
fn(data)
else:
fn()
def emit_and_await(self, event_type, data = None):
if self._is_paused and event_type != "resume_event_processing":
return
""" NOTE: Should be used when signal has only one listener and vis-a-vis """
if event_type in self.subscribers:
response = None
for fn in self.subscribers[event_type]:
if data:
if hasattr(data, '__iter__') and not type(data) is str:
response = fn(*data)
else:
response = fn(data)
else:
response = fn()
if not response in (None, ''):
break
return response

132
src/libs/ipc_server.py Normal file
View File

@@ -0,0 +1,132 @@
# Python imports
import os
import threading
import time
from multiprocessing.connection import Client
from multiprocessing.connection import Listener
# Lib imports
# Application imports
from .singleton import Singleton
class IPCServer(Singleton):
""" Create a listener so that other {app_name} instances send requests back to existing instance. """
def __init__(self, ipc_address: str = '127.0.0.1', conn_type: str = "socket"):
self.is_ipc_alive = False
self._ipc_port = 4848
self._ipc_address = ipc_address
self._conn_type = conn_type
self._ipc_authkey = b'' + bytes(f'{app_name}-ipc', 'utf-8')
self._ipc_timeout = 15.0
if conn_type == "socket":
self._ipc_address = f'/tmp/{app_name}-ipc.sock'
elif conn_type == "full_network":
self._ipc_address = '0.0.0.0'
elif conn_type == "full_network_unsecured":
self._ipc_authkey = None
self._ipc_address = '0.0.0.0'
elif conn_type == "local_network_unsecured":
self._ipc_authkey = None
self._subscribe_to_events()
def _subscribe_to_events(self):
event_system.subscribe("post_file_to_ipc", self.send_ipc_message)
def create_ipc_listener(self) -> None:
if self._conn_type == "socket":
if os.path.exists(self._ipc_address) and settings_manager.is_dirty_start():
os.unlink(self._ipc_address)
listener = Listener(address=self._ipc_address, family="AF_UNIX", authkey=self._ipc_authkey)
elif "unsecured" not in self._conn_type:
listener = Listener((self._ipc_address, self._ipc_port), authkey=self._ipc_authkey)
else:
listener = Listener((self._ipc_address, self._ipc_port))
self.is_ipc_alive = True
self._run_ipc_loop(listener)
@daemon_threaded
def _run_ipc_loop(self, listener) -> None:
# NOTE: Not thread safe if using with Gtk. Need to import GLib and use idle_add
while True:
try:
conn = listener.accept()
start_time = time.perf_counter()
self._handle_ipc_message(conn, start_time)
except Exception as e:
logger.debug( repr(e) )
listener.close()
def _handle_ipc_message(self, conn, start_time) -> None:
while True:
msg = conn.recv()
logger.debug(msg)
if "FILE|" in msg:
file = msg.split("FILE|")[1].strip()
if file:
event_system.emit("handle_file_from_ipc", file)
if "DIR|" in msg:
file = msg.split("DIR|")[1].strip()
if file:
event_system.emit("handle_dir_from_ipc", file)
conn.close()
break
if msg in ['close connection', 'close server']:
conn.close()
break
# NOTE: Not perfect but insures we don't lock up the connection for too long.
end_time = time.perf_counter()
if (end_time - start_time) > self._ipc_timeout:
conn.close()
break
def send_ipc_message(self, message: str = "Empty Data...") -> None:
try:
if self._conn_type == "socket":
conn = Client(address=self._ipc_address, family="AF_UNIX", authkey=self._ipc_authkey)
elif "unsecured" not in self._conn_type:
conn = Client((self._ipc_address, self._ipc_port), authkey=self._ipc_authkey)
else:
conn = Client((self._ipc_address, self._ipc_port))
conn.send(message)
conn.close()
except ConnectionRefusedError as e:
logger.error("Connection refused...")
except Exception as e:
logger.error( repr(e) )
def send_test_ipc_message(self, message: str = "Empty Data...") -> None:
try:
if self._conn_type == "socket":
conn = Client(address=self._ipc_address, family="AF_UNIX", authkey=self._ipc_authkey)
elif "unsecured" not in self._conn_type:
conn = Client((self._ipc_address, self._ipc_port), authkey=self._ipc_authkey)
else:
conn = Client((self._ipc_address, self._ipc_port))
conn.send(message)
conn.close()
except ConnectionRefusedError as e:
if self._conn_type == "socket":
logger.error("IPC Socket no longer valid.... Removing.")
os.unlink(self._ipc_address)
except Exception as e:
logger.error( repr(e) )

138
src/libs/keybindings.py Normal file
View File

@@ -0,0 +1,138 @@
# Python imports
import re
# Lib imports
import gi
gi.require_version('Gdk', '3.0')
from gi.repository import Gdk
# Application imports
from .singleton import Singleton
def logger(log = ""):
print(log)
class KeymapError(Exception):
""" Custom exception for errors in keybinding configurations """
MODIFIER = re.compile('<([^<]+)>')
class Keybindings(Singleton):
""" Class to handle loading and lookup of Terminator keybindings """
modifiers = {
'ctrl': Gdk.ModifierType.CONTROL_MASK,
'control': Gdk.ModifierType.CONTROL_MASK,
'primary': Gdk.ModifierType.CONTROL_MASK,
'shift': Gdk.ModifierType.SHIFT_MASK,
'alt': Gdk.ModifierType.MOD1_MASK,
'super': Gdk.ModifierType.SUPER_MASK,
'hyper': Gdk.ModifierType.HYPER_MASK,
'mod2': Gdk.ModifierType.MOD2_MASK
}
empty = {}
keys = None
_masks = None
_lookup = None
def __init__(self):
self.keymap = Gdk.Keymap.get_default()
self.configure({})
def print_keys(self):
print(self.keys)
def append_bindings(self, combos):
""" Accept new binding(s) and reload """
for item in combos:
method, keys = item.split(":")
self.keys[method] = keys
self.reload()
def configure(self, bindings):
""" Accept new bindings and reconfigure with them """
self.keys = bindings
self.reload()
def reload(self):
""" Parse bindings and mangle into an appropriate form """
self._lookup = {}
self._masks = 0
for action, bindings in list(self.keys.items()):
if isinstance(bindings, list):
bindings = (*bindings,)
elif not isinstance(bindings, tuple):
bindings = (bindings,)
for binding in bindings:
if not binding or binding == "None":
continue
try:
keyval, mask = self._parsebinding(binding)
# Does much the same, but with worse error handling.
# keyval, mask = Gtk.accelerator_parse(binding)
except KeymapError as e:
logger(f"Keybinding reload failed to parse binding '{binding}': {e}")
else:
if mask & Gdk.ModifierType.SHIFT_MASK:
if keyval == Gdk.KEY_Tab:
keyval = Gdk.KEY_ISO_Left_Tab
mask &= ~Gdk.ModifierType.SHIFT_MASK
else:
keyvals = Gdk.keyval_convert_case(keyval)
if keyvals[0] != keyvals[1]:
keyval = keyvals[1]
mask &= ~Gdk.ModifierType.SHIFT_MASK
else:
keyval = Gdk.keyval_to_lower(keyval)
self._lookup.setdefault(mask, {})
self._lookup[mask][keyval] = action
self._masks |= mask
def _parsebinding(self, binding):
""" Parse an individual binding using Gtk's binding function """
mask = 0
modifiers = re.findall(MODIFIER, binding)
if modifiers:
for modifier in modifiers:
mask |= self._lookup_modifier(modifier)
key = re.sub(MODIFIER, '', binding)
if key == '':
raise KeymapError('No key found!')
keyval = Gdk.keyval_from_name(key)
if keyval == 0:
raise KeymapError(f"Key '{key}' is unrecognised...")
return (keyval, mask)
def _lookup_modifier(self, modifier):
""" Map modifier names to gtk values """
try:
return self.modifiers[modifier.lower()]
except KeyError:
raise KeymapError(f"Unhandled modifier '<{modifier}>'")
def lookup(self, event):
""" Translate a keyboard event into a mapped key """
try:
_found, keyval, _egp, _lvl, consumed = self.keymap.translate_keyboard_state(
event.hardware_keycode,
Gdk.ModifierType(event.get_state() & ~Gdk.ModifierType.LOCK_MASK),
event.group)
except TypeError:
logger(f"Keybinding lookup failed to translate keyboard event: {dir(event)}")
return None
mask = (event.get_state() & ~consumed) & self._masks
return self._lookup.get(mask, self.empty).get(keyval, None)

61
src/libs/logger.py Normal file
View File

@@ -0,0 +1,61 @@
# Python imports
import os
import logging
# Lib imports
# Application imports
from .singleton import Singleton
class Logger(Singleton):
"""
Create a new logging object and return it.
:note:
NOSET # Don't know the actual log level of this... (defaulting or literally none?)
Log Levels (From least to most)
Type Value
CRITICAL 50
ERROR 40
WARNING 30
INFO 20
DEBUG 10
:param loggerName: Sets the name of the logger object. (Used in log lines)
:param createFile: Whether we create a log file or just pump to terminal
:return: the logging object we created
"""
def __init__(self, config_path: str, _ch_log_lvl = logging.CRITICAL, _fh_log_lvl = logging.INFO):
self._CONFIG_PATH = config_path
self.global_lvl = logging.DEBUG # Keep this at highest so that handlers can filter to their desired levels
self.ch_log_lvl = _ch_log_lvl # Prety much the only one we ever change
self.fh_log_lvl = _fh_log_lvl
def get_logger(self, loggerName: str = "NO_LOGGER_NAME_PASSED", createFile: bool = True) -> logging.Logger:
log = logging.getLogger(loggerName)
log.setLevel(self.global_lvl)
# Set our log output styles
fFormatter = logging.Formatter('[%(asctime)s] %(pathname)s:%(lineno)d %(levelname)s - %(message)s', '%m-%d %H:%M:%S')
cFormatter = logging.Formatter('%(pathname)s:%(lineno)d] %(levelname)s - %(message)s')
ch = logging.StreamHandler()
ch.setLevel(level=self.ch_log_lvl)
ch.setFormatter(cFormatter)
log.addHandler(ch)
if createFile:
folder = self._CONFIG_PATH
file = f"{folder}/application.log"
if not os.path.exists(folder):
os.mkdir(folder)
fh = logging.FileHandler(file)
fh.setLevel(level=self.fh_log_lvl)
fh.setFormatter(fFormatter)
log.addHandler(fh)
return log

View File

@@ -0,0 +1,3 @@
"""
Utils/Mixins module
"""

View File

@@ -0,0 +1,70 @@
# Python imports
# Lib imports
import gi
gi.require_version('Gtk', '3.0')
gi.require_version('Gdk', '3.0')
from gi.repository import Gtk
from gi.repository import Gdk
from gi.repository import Gio
# Application imports
class DnDMixin:
def _setup_dnd(self):
flags = Gtk.DestDefaults.ALL
PLAIN_TEXT_TARGET_TYPE = 70
URI_TARGET_TYPE = 80
text_target = Gtk.TargetEntry.new('text/plain', Gtk.TargetFlags(0), PLAIN_TEXT_TARGET_TYPE)
uri_target = Gtk.TargetEntry.new('text/uri-list', Gtk.TargetFlags(0), URI_TARGET_TYPE)
# targets = [ text_target, uri_target ]
targets = [ uri_target ]
action = Gdk.DragAction.COPY
self.drag_dest_set_target_list(targets)
# self.drag_dest_set(flags, targets, action)
self._setup_dnd_signals()
def _setup_dnd_signals(self):
self.connect("drag-motion", self._on_drag_motion)
self.connect('drag-drop', self._on_drag_set)
self.connect("drag-data-received", self._on_drag_data_received)
def _on_drag_motion(self, widget, drag_context, x, y, time):
Gdk.drag_status(drag_context, drag_context.get_actions(), time)
return False
def _on_drag_set(self, widget, drag_context, data, info, time):
self.drag_get_data(drag_context, drag_context.list_targets()[-1], time)
return False
def _on_drag_data_received(self, widget, drag_context, x, y, data, info, time):
if info == 70: return
if info == 80:
uris = data.get_uris()
files = []
if len(uris) == 0:
uris = data.get_text().split("\n")
for uri in uris:
gfile = None
try:
gfile = Gio.File.new_for_uri(uri)
except Exception as e:
gfile = Gio.File.new_for_path(uri)
files.append(gfile)
event_system.emit('set_pre_drop_dnd', (files,))

15
src/libs/models.py Normal file
View File

@@ -0,0 +1,15 @@
# Python imports
from typing import Optional
# Lib imports
from sqlmodel import SQLModel, Field
# Application imports
class User(SQLModel, table = True):
id: Optional[int] = Field(default = None, primary_key = True)
name: str
password: str
email: Optional[str] = None

View File

@@ -0,0 +1,4 @@
"""
Settings module
"""
from .manager import SettingsManager

View File

@@ -0,0 +1,193 @@
# Python imports
import signal
import io
import json
import inspect
import zipfile
from os import path
from os import mkdir
# Lib imports
# Application imports
from ..singleton import Singleton
from .start_check_mixin import StartCheckMixin
from .markdown_template_mixin import MarkdownTemplateMixin
from .options.settings import Settings
class MissingConfigError(Exception):
pass
class SettingsManager(StartCheckMixin, MarkdownTemplateMixin, Singleton):
def __init__(self):
self._SCRIPT_PTH = path.dirname(path.realpath(__file__))
self._USER_HOME = path.expanduser('~')
self._HOME_CONFIG_PATH = f"{self._USER_HOME}/.config/{app_name.lower()}"
self._USR_PATH = f"/usr/share/{app_name.lower()}"
self._USR_CONFIG_FILE = f"{self._USR_PATH}/settings.json"
self._CONTEXT_PATH = f"{self._HOME_CONFIG_PATH}/context_path"
self._PLUGINS_PATH = f"{self._HOME_CONFIG_PATH}/plugins"
self._DEFAULT_ICONS = f"{self._HOME_CONFIG_PATH}/icons"
self._CONFIG_FILE = f"{self._HOME_CONFIG_PATH}/settings.json"
self._GLADE_FILE = f"{self._HOME_CONFIG_PATH}/Main_Window.glade"
self._CSS_FILE = f"{self._HOME_CONFIG_PATH}/stylesheet.css"
self._KEY_BINDINGS_FILE = f"{self._HOME_CONFIG_PATH}/key-bindings.json"
self._PID_FILE = f"{self._HOME_CONFIG_PATH}/{app_name.lower()}.pid"
self._UI_WIDEGTS_PATH = f"{self._HOME_CONFIG_PATH}/ui_widgets"
self._CONTEXT_MENU = f"{self._HOME_CONFIG_PATH}/contexct_menu.json"
self._WINDOW_ICON = f"{self._DEFAULT_ICONS}/{app_name.lower()}.png"
# self._USR_CONFIG_FILE = f"{self._USR_PATH}/settings.json"
# self._PLUGINS_PATH = f"plugins"
# self._CONFIG_FILE = f"settings.json"
# self._GLADE_FILE = f"Main_Window.glade"
# self._CSS_FILE = f"stylesheet.css"
# self._KEY_BINDINGS_FILE = f"key-bindings.json"
# self._PID_FILE = f"{app_name.lower()}.pid"
# self._WINDOW_ICON = f"{app_name.lower()}.png"
# self._UI_WIDEGTS_PATH = f"ui_widgets"
# self._CONTEXT_MENU = f"contexct_menu.json"
# self._DEFAULT_ICONS = f"icons"
# with zipfile.ZipFile("files.zip", mode="r", allowZip64=True) as zf:
# with io.TextIOWrapper(zf.open("text1.txt"), encoding="utf-8") as f:
if not path.exists(self._HOME_CONFIG_PATH):
mkdir(self._HOME_CONFIG_PATH)
if not path.exists(self._PLUGINS_PATH):
mkdir(self._PLUGINS_PATH)
if not path.exists(self._DEFAULT_ICONS):
self._DEFAULT_ICONS = f"{self._USR_PATH}/icons"
if not path.exists(self._DEFAULT_ICONS):
raise MissingConfigError("Unable to find the application icons directory.")
if not path.exists(self._GLADE_FILE):
self._GLADE_FILE = f"{self._USR_PATH}/Main_Window.glade"
if not path.exists(self._GLADE_FILE):
raise MissingConfigError("Unable to find the application Glade file.")
if not path.exists(self._KEY_BINDINGS_FILE):
self._KEY_BINDINGS_FILE = f"{self._USR_PATH}/key-bindings.json"
if not path.exists(self._KEY_BINDINGS_FILE):
raise MissingConfigError("Unable to find the application Keybindings file.")
if not path.exists(self._CSS_FILE):
self._CSS_FILE = f"{self._USR_PATH}/stylesheet.css"
if not path.exists(self._CSS_FILE):
raise MissingConfigError("Unable to find the application Stylesheet file.")
if not path.exists(self._WINDOW_ICON):
self._WINDOW_ICON = f"{self._USR_PATH}/icons/{app_name.lower()}.png"
if not path.exists(self._WINDOW_ICON):
raise MissingConfigError("Unable to find the application icon.")
if not path.exists(self._UI_WIDEGTS_PATH):
self._UI_WIDEGTS_PATH = f"{self._USR_PATH}/ui_widgets"
if not path.exists(self._CONTEXT_MENU):
self._CONTEXT_MENU = f"{self._USR_PATH}/contexct_menu.json"
try:
with open(self._KEY_BINDINGS_FILE) as file:
bindings = json.load(file)["keybindings"]
keybindings.configure(bindings)
except Exception as e:
print( f"Settings Manager: {self._KEY_BINDINGS_FILE}\n\t\t{repr(e)}" )
try:
with open(self._CONTEXT_MENU) as file:
self._context_menu_data = json.load(file)
except Exception as e:
print( f"Settings Manager: {self._CONTEXT_MENU}\n\t\t{repr(e)}" )
self.settings: Settings = None
self._main_window = None
self._builder = None
self.PAINT_BG_COLOR = (0, 0, 0, 0.0)
self._trace_debug = False
self._debug = False
self._dirty_start = False
def register_signals_to_builder(self, classes=None):
handlers = {}
for c in classes:
methods = None
try:
methods = inspect.getmembers(c, predicate=inspect.ismethod)
handlers.update(methods)
except Exception as e:
...
self._builder.connect_signals(handlers)
def set_main_window(self, window): self._main_window = window
def set_builder(self, builder) -> any: self._builder = builder
def get_monitor_data(self) -> list:
screen = self._main_window.get_screen()
monitors = []
for m in range(screen.get_n_monitors()):
monitors.append(screen.get_monitor_geometry(m))
print("{}x{}+{}+{}".format(monitor.width, monitor.height, monitor.x, monitor.y))
return monitors
def get_main_window(self) -> any: return self._main_window
def get_builder(self) -> any: return self._builder
def get_paint_bg_color(self) -> any: return self.PAINT_BG_COLOR
def get_glade_file(self) -> str: return self._GLADE_FILE
def get_ui_widgets_path(self) -> str: return self._UI_WIDEGTS_PATH
def get_context_menu_data(self) -> str: return self._context_menu_data
def get_context_path(self) -> str: return self._CONTEXT_PATH
def get_plugins_path(self) -> str: return self._PLUGINS_PATH
def get_icon_theme(self) -> str: return self._ICON_THEME
def get_css_file(self) -> str: return self._CSS_FILE
def get_home_config_path(self) -> str: return self._HOME_CONFIG_PATH
def get_window_icon(self) -> str: return self._WINDOW_ICON
def get_home_path(self) -> str: return self._USER_HOME
def is_trace_debug(self) -> str: return self._trace_debug
def is_debug(self) -> str: return self._debug
def call_method(self, target_class = None, _method_name = None, data = None):
method_name = str(_method_name)
method = getattr(target_class, method_name, lambda data: f"No valid key passed...\nkey={method_name}\nargs={data}")
return method(data) if data else method()
def set_main_window_x(self, x = 0): self.settings.config.main_window_x = x
def set_main_window_y(self, y = 0): self.settings.config.main_window_y = y
def set_main_window_width(self, width = 800): self.settings.config.main_window_width = width
def set_main_window_height(self, height = 600): self.settings.config.main_window_height = height
def set_main_window_min_width(self, width = 720): self.settings.config.main_window_min_width = width
def set_main_window_min_height(self, height = 480): self.settings.config.main_window_min_height = height
def set_trace_debug(self, trace_debug):
self._trace_debug = trace_debug
def set_debug(self, debug):
self._debug = debug
def load_settings(self):
if not path.exists(self._CONFIG_FILE):
self.settings = Settings()
return
with open(self._CONFIG_FILE) as file:
data = json.load(file)
data["load_defaults"] = False
self.settings = Settings(**data)
def save_settings(self):
with open(self._CONFIG_FILE, 'w') as outfile:
json.dump(self.settings.as_dict(), outfile, separators=(',', ':'), indent=4)

View File

@@ -0,0 +1,60 @@
# Python imports
# Lib imports
# Application imports
class MarkdownTemplateMixin:
def wrap_html_to_body(self, html):
return f"""\
<!DOCTYPE html>
<html lang="en" dir="ltr">
<head>
<meta charset="utf-8">
<title>Markdown View</title>
<style media="screen">
html, body {{
display: block;
background-color: #32383e00;
color: #ffffff;
text-wrap: wrap;
}}
img {{
width: 100%;
height: auto;
}}
code {{
border: 1px solid #32383e;
background-color: #32383e;
padding: 4px;
}}
</style>
</head>
<body>
{html}
<span>Button in WebView
<button onclick="say_hello()">Button in WebView</button>
</span>
<!-- For internal scripts... -->
<script src="js/libs/jquery-3.7.1.min.js"></script>
<!-- For Bootstrap... -->
<script src="resources/js/libs/bootstrap5/bootstrap.bundle.min.js"></script>
<!-- For Application... -->
<script src="resources/js/ui-logic.js"></script>
<script src="resources/js/post-ajax.js"></script>
<script src="resources/js/ajax.js"></script>
<script src="resources/js/events.js"></script>
</body>
</html>
"""

View File

@@ -0,0 +1,8 @@
"""
Options module
"""
from .settings import Settings
from .config import Config
from .filters import Filters
from .theming import Theming
from .debugging import Debugging

View File

@@ -0,0 +1,39 @@
# Python imports
from dataclasses import dataclass, field
# Lib imports
# Application imports
@dataclass
class Config:
base_of_home: str = ""
hide_hidden_files: str = "true"
thumbnailer_path: str = "ffmpegthumbnailer"
blender_thumbnailer_path: str = ""
go_past_home: str = "true"
lock_folder: str = "false"
locked_folders: list = field(default_factory=lambda: [ "venv", "flasks" ])
mplayer_options: str = "-quiet -really-quiet -xy 1600 -geometry 50%:50%",
music_app: str = "/opt/deadbeef/bin/deadbeef"
media_app: str = "mpv"
image_app: str = "mirage"
office_app: str = "libreoffice"
pdf_app: str = "evince"
code_app: str = "atom"
text_app: str = "leafpad"
file_manager_app: str = "solarfm"
terminal_app: str = "terminator"
remux_folder_max_disk_usage: str = "8589934592"
make_transparent: int = 0
main_window_x: int = 721
main_window_y: int = 465
main_window_min_width: int = 720
main_window_min_height: int = 480
main_window_width: int = 800
main_window_height: int = 600
application_dirs: list = field(default_factory=lambda: [
"/usr/share/applications",
f"{settings_manager.get_home_path()}/.local/share/applications"
])

View File

@@ -0,0 +1,12 @@
# Python imports
from dataclasses import dataclass
# Lib imports
# Application imports
@dataclass
class Debugging:
ch_log_lvl: int = 10
fh_log_lvl: int = 20

View File

@@ -0,0 +1,90 @@
# Python imports
from dataclasses import dataclass, field
# Lib imports
# Application imports
@dataclass
class Filters:
meshs: list = field(default_factory=lambda: [
".blend",
".dae",
".fbx",
".gltf",
".obj",
".stl"
])
code: list = field(default_factory=lambda: [
".cpp",
".css",
".c",
".go",
".html",
".htm",
".java",
".js",
".json",
".lua",
".md",
".py",
".rs",
".toml",
".xml",
".pom"
])
videos: list = field(default_factory=lambda:[
".mkv",
".mp4",
".webm",
".avi",
".mov",
".m4v",
".mpg",
".mpeg",
".wmv",
".flv"
])
office: list = field(default_factory=lambda: [
".doc",
".docx",
".xls",
".xlsx",
".xlt",
".xltx",
".xlm",
".ppt",
".pptx",
".pps",
".ppsx",
".odt",
".rtf"
])
images: list = field(default_factory=lambda: [
".png",
".jpg",
".jpeg",
".gif",
".ico",
".tga",
".webp"
])
text: list = field(default_factory=lambda: [
".txt",
".text",
".sh",
".cfg",
".conf",
".log"
])
music: list = field(default_factory=lambda: [
".psf",
".mp3",
".ogg",
".flac",
".m4a"
])
pdf: list = field(default_factory=lambda: [
".pdf"
])

View File

@@ -0,0 +1,31 @@
# Python imports
from dataclasses import dataclass, field
from dataclasses import asdict
# Gtk imports
# Application imports
from .config import Config
from .filters import Filters
from .theming import Theming
from .debugging import Debugging
@dataclass
class Settings:
load_defaults: bool = True
config: Config = field(default_factory=lambda: Config())
filters: Filters = field(default_factory=lambda: Filters())
theming: Theming = field(default_factory=lambda: Theming())
debugging: Debugging = field(default_factory=lambda: Debugging())
def __post_init__(self):
if not self.load_defaults:
self.load_defaults = False
self.config = Config(**self.config)
self.filters = Filters(**self.filters)
self.theming = Theming(**self.theming)
self.debugging = Debugging(**self.debugging)
def as_dict(self):
return asdict(self)

View File

@@ -0,0 +1,14 @@
# Python imports
from dataclasses import dataclass
# Lib imports
# Application imports
@dataclass
class Theming:
transparency: int = 64
success_color: str = "#88cc27"
warning_color: str = "#ffa800"
error_color: str = "#ff0000"

View File

@@ -0,0 +1,63 @@
# Python imports
import os
import json
import inspect
# Lib imports
# Application imports
class StartCheckMixin:
def is_dirty_start(self) -> bool:
return self._dirty_start
def clear_pid(self):
if not self.is_trace_debug():
self._clean_pid()
def do_dirty_start_check(self):
if self.is_trace_debug():
pid = os.getpid()
self._print_pid(pid)
return
if os.path.exists(self._PID_FILE):
with open(self._PID_FILE, "r") as f:
pid = f.readline().strip()
if pid not in ("", None):
if self.is_pid_alive( int(pid) ):
print("PID file exists and PID is alive... Letting downstream errors (sans debug args) handle app closure propigation.")
return
self._write_new_pid()
""" Check For the existence of a unix pid. """
def is_pid_alive(self, pid):
print(f"PID Found: {pid}")
try:
os.kill(pid, 0)
except OSError:
print(f"{app_name} PID file exists but PID is irrelevant; starting dirty...")
self._dirty_start = True
return False
return True
def _write_new_pid(self):
pid = os.getpid()
self._write_pid(pid)
self._print_pid(pid)
def _print_pid(self, pid):
print(f"{app_name} PID: {pid}")
def _clean_pid(self):
os.unlink(self._PID_FILE)
def _write_pid(self, pid):
with open(self._PID_FILE, "w") as _pid:
_pid.write(f"{pid}")

24
src/libs/singleton.py Normal file
View File

@@ -0,0 +1,24 @@
# Python imports
# Lib imports
# Application imports
class SingletonError(Exception):
pass
class Singleton:
ccount = 0
def __new__(cls, *args, **kwargs):
obj = super(Singleton, cls).__new__(cls)
cls.ccount += 1
if cls.ccount == 2:
raise SingletonError(f"Exceeded {cls.__name__} instantiation limit...")
return obj