initial push...
This commit is contained in:
42
src/__builtins__.py
Normal file
42
src/__builtins__.py
Normal file
@@ -0,0 +1,42 @@
|
||||
import builtins, threading
|
||||
|
||||
# Python imports
|
||||
import builtins
|
||||
|
||||
# Lib imports
|
||||
|
||||
# Application imports
|
||||
from utils.event_system import EventSystem
|
||||
from utils.endpoint_registry import EndpointRegistry
|
||||
from utils.keybindings import Keybindings
|
||||
from utils.settings import Settings
|
||||
|
||||
|
||||
|
||||
# NOTE: Threads WILL NOT die with parent's destruction.
|
||||
def threaded_wrapper(fn):
|
||||
def wrapper(*args, **kwargs):
|
||||
threading.Thread(target=fn, args=args, kwargs=kwargs, daemon=False).start()
|
||||
return wrapper
|
||||
|
||||
# NOTE: Threads WILL die with parent's destruction.
|
||||
def daemon_threaded_wrapper(fn):
|
||||
def wrapper(*args, **kwargs):
|
||||
threading.Thread(target=fn, args=args, kwargs=kwargs, daemon=True).start()
|
||||
return wrapper
|
||||
|
||||
|
||||
|
||||
|
||||
# NOTE: Just reminding myself we can add to builtins two different ways...
|
||||
# __builtins__.update({"event_system": Builtins()})
|
||||
builtins.app_name = "UTop"
|
||||
builtins.keybindings = Keybindings()
|
||||
builtins.event_system = EventSystem()
|
||||
builtins.endpoint_registry = EndpointRegistry()
|
||||
builtins.settings = Settings()
|
||||
builtins.logger = settings.get_logger()
|
||||
|
||||
builtins.threaded = threaded_wrapper
|
||||
builtins.daemon_threaded = daemon_threaded_wrapper
|
||||
builtins.event_sleep_time = 0.05
|
3
src/__init__.py
Normal file
3
src/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
||||
"""
|
||||
Start of package.
|
||||
"""
|
52
src/__main__.py
Normal file
52
src/__main__.py
Normal file
@@ -0,0 +1,52 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
# Python imports
|
||||
import argparse
|
||||
import faulthandler
|
||||
import traceback
|
||||
from setproctitle import setproctitle
|
||||
|
||||
import tracemalloc
|
||||
tracemalloc.start()
|
||||
|
||||
|
||||
# Lib imports
|
||||
import gi
|
||||
gi.require_version('Gtk', '3.0')
|
||||
from gi.repository import Gtk
|
||||
|
||||
# Application imports
|
||||
from __builtins__ import *
|
||||
from app import Application
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
''' Set process title, get arguments, and create GTK main thread. '''
|
||||
|
||||
try:
|
||||
setproctitle(f'{app_name}')
|
||||
faulthandler.enable() # For better debug info
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
# Add long and short arguments
|
||||
parser.add_argument("--debug", "-d", default="false", help="Do extra console messaging.")
|
||||
parser.add_argument("--trace-debug", "-td", default="false", help="Disable saves, ignore IPC lock, do extra console messaging.")
|
||||
parser.add_argument("--file", "-f", default="default", help="JUST SOME FILE ARG.")
|
||||
parser.add_argument("--new-tab", "-t", default="", help="Open a file into new tab.")
|
||||
parser.add_argument("--new-window", "-w", default="", help="Open a file into a new window.")
|
||||
|
||||
# Read arguments (If any...)
|
||||
args, unknownargs = parser.parse_known_args()
|
||||
|
||||
if args.debug == "true":
|
||||
settings.set_debug(True)
|
||||
|
||||
if args.trace_debug == "true":
|
||||
settings.set_trace_debug(True)
|
||||
|
||||
settings.do_dirty_start_check()
|
||||
Application(args, unknownargs)
|
||||
Gtk.main()
|
||||
except Exception as e:
|
||||
traceback.print_exc()
|
||||
quit()
|
39
src/app.py
Normal file
39
src/app.py
Normal file
@@ -0,0 +1,39 @@
|
||||
# Python imports
|
||||
import os
|
||||
import time
|
||||
|
||||
# Lib imports
|
||||
|
||||
# Application imports
|
||||
from utils.ipc_server import IPCServer
|
||||
from core.window import Window
|
||||
|
||||
|
||||
|
||||
class AppLaunchException(Exception):
|
||||
...
|
||||
|
||||
class ControllerStartExceptiom(Exception):
|
||||
...
|
||||
|
||||
|
||||
class Application(IPCServer):
|
||||
''' Create Settings and Controller classes. Bind signal to Builder. Inherit from Builtins to bind global methods and classes.'''
|
||||
|
||||
def __init__(self, args, unknownargs):
|
||||
super(Application, self).__init__()
|
||||
if not settings.is_trace_debug():
|
||||
try:
|
||||
self.create_ipc_listener()
|
||||
except Exception:
|
||||
...
|
||||
|
||||
if not self.is_ipc_alive:
|
||||
for arg in unknownargs + [args.new_tab,]:
|
||||
if os.path.isdir(arg):
|
||||
message = f"FILE|{arg}"
|
||||
self.send_ipc_message(message)
|
||||
|
||||
raise AppLaunchException(f"{app_name} IPC Server Exists: Will send path(s) to it and close...")
|
||||
|
||||
Window(args, unknownargs)
|
3
src/core/__init__.py
Normal file
3
src/core/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
||||
"""
|
||||
Gtk Bound Signal Module
|
||||
"""
|
146
src/core/controller.py
Normal file
146
src/core/controller.py
Normal file
@@ -0,0 +1,146 @@
|
||||
# Python imports
|
||||
import os, subprocess
|
||||
|
||||
# Gtk imports
|
||||
import gi
|
||||
|
||||
gi.require_version('Gtk', '3.0')
|
||||
gi.require_version('Gdk', '3.0')
|
||||
gi.require_version('GdkPixbuf', '2.0')
|
||||
|
||||
from gi.repository import Gtk
|
||||
from gi.repository import Gdk
|
||||
from gi.repository import Gio
|
||||
from gi.repository import GLib
|
||||
from gi.repository import GdkPixbuf
|
||||
|
||||
# Application imports
|
||||
from .controller_data import ControllerData
|
||||
from .core_widget import CoreWidget
|
||||
|
||||
|
||||
|
||||
class Controller(ControllerData):
|
||||
def __init__(self, args, unknownargs):
|
||||
self._setup_styling()
|
||||
self._setup_signals()
|
||||
self._subscribe_to_events()
|
||||
|
||||
self.setup_controller_data()
|
||||
|
||||
|
||||
def _setup_styling(self):
|
||||
...
|
||||
|
||||
def _setup_signals(self):
|
||||
...
|
||||
|
||||
|
||||
|
||||
# NOTE: To be filled out after app data is actually working...
|
||||
def search_for_entry(self, widget, data=None):
|
||||
...
|
||||
|
||||
def set_list_group(self, widget):
|
||||
group = widget.get_label().strip()
|
||||
group_items = self.core_widget.get_group(group)
|
||||
grid = self.builder.get_object("programListBttns")
|
||||
|
||||
children = grid.get_children()
|
||||
for child in children:
|
||||
grid.remove(child)
|
||||
|
||||
for item in group_items:
|
||||
title = item["title"]
|
||||
if not item["exec"] in ("", None):
|
||||
exec = item["exec"]
|
||||
else:
|
||||
exec = item["tryExec"]
|
||||
|
||||
button = Gtk.Button(label=title)
|
||||
button.connect("clicked", self.test_exec, exec)
|
||||
if self.show_image:
|
||||
if os.path.exists(item["icon"]):
|
||||
pixbuf = GdkPixbuf.PixbufAnimation.new_from_file(item["icon"]) \
|
||||
.get_static_image() \
|
||||
.scale_simple(64, 64, \
|
||||
GdkPixbuf.InterpType.BILINEAR)
|
||||
|
||||
icon = Gtk.Image.new_from_pixbuf(pixbuf)
|
||||
else:
|
||||
gio_icon = Gio.Icon.new_for_string(item["icon"])
|
||||
icon = Gtk.Image.new_from_gicon(gio_icon, 64)
|
||||
|
||||
button.set_image(icon)
|
||||
button.set_always_show_image(True)
|
||||
|
||||
button.show_all()
|
||||
grid.add(button)
|
||||
|
||||
def test_exec(self, widget, _command):
|
||||
command = _command.split("%")[0]
|
||||
DEVNULL = open(os.devnull, 'w')
|
||||
subprocess.Popen(command.split(), start_new_session=True, stdout=DEVNULL, stderr=DEVNULL)
|
||||
|
||||
|
||||
|
||||
def _subscribe_to_events(self):
|
||||
event_system.subscribe("handle_file_from_ipc", self.handle_file_from_ipc)
|
||||
|
||||
def handle_file_from_ipc(self, path: str) -> None:
|
||||
print(f"Path From IPC: {path}")
|
||||
|
||||
def load_glade_file(self):
|
||||
self.builder = Gtk.Builder()
|
||||
self.builder.add_from_file(settings.get_glade_file())
|
||||
settings.set_builder(self.builder)
|
||||
self.core_widget = CoreWidget()
|
||||
|
||||
settings.register_signals_to_builder([self, self.core_widget])
|
||||
|
||||
def get_core_widget(self):
|
||||
self.setup_toggle_event()
|
||||
return self.core_widget
|
||||
|
||||
def on_hide_window(self, data=None):
|
||||
"""Handle a request to hide/show the window"""
|
||||
if not self.window.get_property('visible'):
|
||||
self.window.show()
|
||||
self.window.grab_focus()
|
||||
# NOTE: Need here to enforce sticky after hide and reshow.
|
||||
self.window.stick()
|
||||
else:
|
||||
self.hidefunc()
|
||||
|
||||
def on_global_key_release_controller(self, widget: type, event: type) -> None:
|
||||
"""Handler for keyboard events"""
|
||||
keyname = Gdk.keyval_name(event.keyval).lower()
|
||||
if keyname.replace("_l", "").replace("_r", "") in ["control", "alt", "shift"]:
|
||||
if "control" in keyname:
|
||||
self.ctrl_down = False
|
||||
if "shift" in keyname:
|
||||
self.shift_down = False
|
||||
if "alt" in keyname:
|
||||
self.alt_down = False
|
||||
|
||||
|
||||
mapping = keybindings.lookup(event)
|
||||
if mapping:
|
||||
getattr(self, mapping)()
|
||||
return True
|
||||
else:
|
||||
print(f"on_global_key_release_controller > key > {keyname}")
|
||||
print(f"Add logic or remove this from: {self.__class__}")
|
||||
|
||||
|
||||
def get_clipboard_data(self) -> str:
|
||||
proc = subprocess.Popen(['xclip','-selection', 'clipboard', '-o'], stdout=subprocess.PIPE)
|
||||
retcode = proc.wait()
|
||||
data = proc.stdout.read()
|
||||
return data.decode("utf-8").strip()
|
||||
|
||||
def set_clipboard_data(self, data: type) -> None:
|
||||
proc = subprocess.Popen(['xclip','-selection','clipboard'], stdin=subprocess.PIPE)
|
||||
proc.stdin.write(data)
|
||||
proc.stdin.close()
|
||||
retcode = proc.wait()
|
99
src/core/controller_data.py
Normal file
99
src/core/controller_data.py
Normal file
@@ -0,0 +1,99 @@
|
||||
# Python imports
|
||||
import os
|
||||
|
||||
# Lib imports
|
||||
import gi
|
||||
from gi.repository import GObject
|
||||
|
||||
# Application imports
|
||||
from plugins.plugins_controller import PluginsController
|
||||
|
||||
|
||||
|
||||
|
||||
try:
|
||||
from gi.repository import GdkX11
|
||||
except ImportError:
|
||||
logger.debug("Could not import X11 gir module...")
|
||||
|
||||
def display_manager():
|
||||
""" Try to detect which display manager we are running under... """
|
||||
if os.environ.get('WAYLAND_DISPLAY'):
|
||||
return 'WAYLAND'
|
||||
return 'X11' # Fallback assumption of X11
|
||||
|
||||
|
||||
if display_manager() == 'X11':
|
||||
try:
|
||||
gi.require_version('Keybinder', '3.0')
|
||||
from gi.repository import Keybinder
|
||||
Keybinder.init()
|
||||
Keybinder.set_use_cooked_accelerators(False)
|
||||
except (ImportError, ValueError):
|
||||
logger.debug('Unable to load Keybinder module. This means the hide_window shortcut will be unavailable')
|
||||
|
||||
|
||||
|
||||
|
||||
class ControllerData:
|
||||
''' ControllerData contains most of the state of the app at ay given time. It also has some support methods. '''
|
||||
|
||||
def setup_controller_data(self) -> None:
|
||||
self.builder = None
|
||||
self.core_widget = None
|
||||
|
||||
self.load_glade_file()
|
||||
self.plugins = PluginsController()
|
||||
|
||||
self.hidefunc = None
|
||||
self.show_image = True
|
||||
self.guake_key = settings.get_guake_key()
|
||||
|
||||
|
||||
def setup_toggle_event(self) -> None:
|
||||
self.window = settings.get_builder().get_object(f"{app_name.lower()}")
|
||||
|
||||
# Attempt to grab a global hotkey for hiding the window.
|
||||
# If we fail, we'll never hide the window, iconifying instead.
|
||||
if self.guake_key and display_manager() == 'X11':
|
||||
try:
|
||||
hidebound = Keybinder.bind(self.guake_key, self.on_hide_window)
|
||||
except (KeyError, NameError):
|
||||
pass
|
||||
|
||||
if not hidebound:
|
||||
logger.debug('Unable to bind hide_window key, another instance/window has it.')
|
||||
self.hidefunc = self.window.iconify
|
||||
else:
|
||||
self.hidefunc = self.window.hide
|
||||
|
||||
|
||||
def clear_console(self) -> None:
|
||||
''' Clears the terminal screen. '''
|
||||
os.system('cls' if os.name == 'nt' else 'clear')
|
||||
|
||||
def call_method(self, _method_name: str, data: type) -> type:
|
||||
'''
|
||||
Calls a method from scope of class.
|
||||
|
||||
Parameters:
|
||||
a (obj): self
|
||||
b (str): method name to be called
|
||||
c (*): Data (if any) to be passed to the method.
|
||||
Note: It must be structured according to the given methods requirements.
|
||||
|
||||
Returns:
|
||||
Return data is that which the calling method gives.
|
||||
'''
|
||||
method_name = str(_method_name)
|
||||
method = getattr(self, method_name, lambda data: f"No valid key passed...\nkey={method_name}\nargs={data}")
|
||||
return method(*data) if data else method()
|
||||
|
||||
def has_method(self, obj: type, method: type) -> type:
|
||||
''' Checks if a given method exists. '''
|
||||
return callable(getattr(obj, method, None))
|
||||
|
||||
def clear_children(self, widget: type) -> None:
|
||||
''' Clear children of a gtk widget. '''
|
||||
for child in widget.get_children():
|
||||
widget.remove(child)
|
182
src/core/core_widget.py
Normal file
182
src/core/core_widget.py
Normal file
@@ -0,0 +1,182 @@
|
||||
# Python imports
|
||||
from datetime import datetime
|
||||
|
||||
# Gtk imports
|
||||
import gi
|
||||
|
||||
gi.require_version('Gtk', '3.0')
|
||||
gi.require_version('Wnck', '3.0')
|
||||
|
||||
from gi.repository import Gtk
|
||||
from gi.repository import Wnck
|
||||
from gi.repository import GObject
|
||||
from xdg.DesktopEntry import DesktopEntry
|
||||
|
||||
|
||||
# Application imports
|
||||
from .desktop_parsing.app_finder import find_apps
|
||||
|
||||
|
||||
|
||||
|
||||
class CoreWidget(Gtk.Box):
|
||||
def __init__(self):
|
||||
super(CoreWidget, self).__init__()
|
||||
self.builder = settings.get_builder()
|
||||
self.time_label = self.builder.get_object("timeLabel")
|
||||
|
||||
self.orientation = 1 # 0 = horizontal, 1 = vertical
|
||||
|
||||
self._setup_styling()
|
||||
self._setup_signals()
|
||||
self._load_widgets()
|
||||
|
||||
self.show_all()
|
||||
|
||||
self.menu_objects = {
|
||||
"Accessories": [],
|
||||
"Multimedia": [],
|
||||
"Graphics": [],
|
||||
"Game": [],
|
||||
"Office": [],
|
||||
"Development": [],
|
||||
"Internet": [],
|
||||
"Settings": [],
|
||||
"System": [],
|
||||
"Wine": [],
|
||||
"Other": []
|
||||
}
|
||||
apps = find_apps()
|
||||
self.fill_menu_objects(apps)
|
||||
|
||||
|
||||
def fill_menu_objects(self, apps=[]):
|
||||
for app in apps:
|
||||
fPath = app.get_filename()
|
||||
xdgObj = DesktopEntry( fPath )
|
||||
|
||||
title = xdgObj.getName()
|
||||
groups = xdgObj.getCategories()
|
||||
comment = xdgObj.getComment()
|
||||
icon = xdgObj.getIcon()
|
||||
mainExec = xdgObj.getExec()
|
||||
tryExec = xdgObj.getTryExec()
|
||||
|
||||
group = ""
|
||||
if "Accessories" in groups or "Utility" in groups:
|
||||
group = "Accessories"
|
||||
elif "Multimedia" in groups or "Video" in groups or "Audio" in groups:
|
||||
group = "Multimedia"
|
||||
elif "Development" in groups:
|
||||
group = "Development"
|
||||
elif "Game" in groups:
|
||||
group = "Game"
|
||||
elif "Internet" in groups or "Network" in groups:
|
||||
group = "Internet"
|
||||
elif "Graphics" in groups:
|
||||
group = "Graphics"
|
||||
elif "Office" in groups:
|
||||
group = "Office"
|
||||
elif "System" in groups:
|
||||
group = "System"
|
||||
elif "Settings" in groups:
|
||||
group = "Settings"
|
||||
elif "Wine" in groups:
|
||||
group = "Wine"
|
||||
else:
|
||||
group = "Other"
|
||||
|
||||
self.menu_objects[group].append( {"title": title, "groups": groups,
|
||||
"comment": comment, "exec": mainExec,
|
||||
"tryExec": tryExec, "fileName": fPath.split("/")[-1],
|
||||
"filePath": fPath, "icon": icon})
|
||||
|
||||
def get_group(self, group):
|
||||
return self.menu_objects[group]
|
||||
|
||||
|
||||
|
||||
def _setup_styling(self):
|
||||
self.set_orientation(1)
|
||||
self.set_vexpand(True)
|
||||
self.set_hexpand(True)
|
||||
|
||||
def _setup_signals(self):
|
||||
...
|
||||
|
||||
def _load_widgets(self):
|
||||
widget_grid_container = self.builder.get_object("widget_grid_container")
|
||||
|
||||
timeLabelEveBox = self.builder.get_object("timeLabelEveBox")
|
||||
timeLabelEveBox.connect("button_release_event", self._toggle_cal_popover)
|
||||
|
||||
widget_grid_container.set_vexpand(True)
|
||||
widget_grid_container.set_hexpand(True)
|
||||
|
||||
self.add( widget_grid_container )
|
||||
self.set_pager_widget()
|
||||
self.set_task_list_widget()
|
||||
|
||||
# Must be after pager and task list inits
|
||||
self.display_clock()
|
||||
self._start_clock()
|
||||
|
||||
|
||||
def set_pager_widget(self):
|
||||
pager = Wnck.Pager.new()
|
||||
|
||||
if self.orientation == 0:
|
||||
self.builder.get_object('taskBarWorkspacesHor').add(pager)
|
||||
else:
|
||||
self.builder.get_object('taskBarWorkspacesVer').add(pager)
|
||||
|
||||
pager.set_hexpand(True)
|
||||
pager.show()
|
||||
|
||||
def set_task_list_widget(self):
|
||||
tasklist = Wnck.Tasklist.new()
|
||||
tasklist.set_scroll_enabled(False)
|
||||
tasklist.set_button_relief(2) # 0 = normal relief, 2 = no relief
|
||||
tasklist.set_grouping(1) # 0 = mever group, 1 auto group, 2 = always group
|
||||
|
||||
if self.orientation == 0:
|
||||
self.builder.get_object('taskBarButtonsHor').add(tasklist)
|
||||
else:
|
||||
self.builder.get_object('taskBarButtonsVer').add(tasklist)
|
||||
|
||||
tasklist.set_vexpand(True)
|
||||
tasklist.set_include_all_workspaces(False)
|
||||
tasklist.set_orientation(self.orientation)
|
||||
tasklist.show()
|
||||
|
||||
# Displays Timer
|
||||
def display_clock(self):
|
||||
now = datetime.now()
|
||||
hms = now.strftime("%I:%M %p")
|
||||
mdy = now.strftime("%m/%d/%Y")
|
||||
timeStr = hms + "\n" + mdy
|
||||
self.time_label.set_label(timeStr)
|
||||
return True
|
||||
|
||||
def _start_clock(self):
|
||||
GObject.timeout_add(59000, self.display_clock)
|
||||
|
||||
|
||||
def _close_popup(self, widget, data=None):
|
||||
widget.hide()
|
||||
|
||||
def _toggle_cal_popover(self, widget, eve):
|
||||
calendarPopup = self.builder.get_object('calendarPopup')
|
||||
if (calendarPopup.get_visible() == False):
|
||||
calendarWid = self.builder.get_object('calendarWid')
|
||||
now = datetime.now()
|
||||
timeStr = now.strftime("%m/%d/%Y")
|
||||
parts = timeStr.split("/")
|
||||
month = int(parts[0]) - 1
|
||||
day = int(parts[1])
|
||||
year = int(parts[2])
|
||||
calendarWid.select_day(day)
|
||||
calendarWid.select_month(month, year)
|
||||
calendarPopup.popup()
|
||||
else:
|
||||
calendarPopup.popdown()
|
94
src/core/desktop_parsing/DesktopParser.py
Normal file
94
src/core/desktop_parsing/DesktopParser.py
Normal file
@@ -0,0 +1,94 @@
|
||||
# Python imports
|
||||
import os
|
||||
|
||||
# Lib imports
|
||||
|
||||
# Application imports
|
||||
|
||||
|
||||
|
||||
|
||||
class DesktopParser:
|
||||
DESKTOP_SECTION = '[Desktop Entry]'
|
||||
|
||||
__property_list = None
|
||||
|
||||
def __init__(self, filename):
|
||||
self.__property_list = []
|
||||
self.set_filename(filename)
|
||||
self.read()
|
||||
super(DesktopParser, self).__init__()
|
||||
|
||||
def set_filename(self, filename):
|
||||
self._filename = filename
|
||||
|
||||
def read(self):
|
||||
"""
|
||||
Read [Desktop Entry] section and save key=values pairs to __property_list
|
||||
"""
|
||||
with open(self._filename, 'r') as f:
|
||||
is_desktop_section = False
|
||||
for line in f.readlines():
|
||||
line = line.strip(' ' + os.linesep)
|
||||
if line == self.DESKTOP_SECTION:
|
||||
is_desktop_section = True
|
||||
continue
|
||||
if line.startswith('['):
|
||||
# another section begins
|
||||
is_desktop_section = False
|
||||
continue
|
||||
if is_desktop_section and '=' in line:
|
||||
(key, value) = line.split('=', 1)
|
||||
self.set(key.strip(), value.strip())
|
||||
|
||||
def write(self):
|
||||
"""
|
||||
Write properties to the file...
|
||||
"""
|
||||
dir = os.path.dirname(self._filename)
|
||||
if not os.path.exists(dir):
|
||||
os.makedirs(dir)
|
||||
|
||||
with open(self._filename, 'w') as f:
|
||||
f.write(os.linesep.join((self.DESKTOP_SECTION, os.linesep.join(['='.join((str(k), str(v).strip()))
|
||||
for k, v in self.__property_list]))))
|
||||
|
||||
def get(self, name):
|
||||
"""
|
||||
Raises KeyError if name is not found
|
||||
"""
|
||||
|
||||
for key, value in self.__property_list:
|
||||
if key.lower() == name.lower():
|
||||
return value
|
||||
raise KeyError('%s' % name)
|
||||
|
||||
def set(self, name, value):
|
||||
if not name:
|
||||
raise ValueError("Invalid value for name: '%s'" % name)
|
||||
|
||||
for i, (key, _) in enumerate(self.__property_list):
|
||||
if key.lower() == name.lower():
|
||||
self.__property_list[i] = (key, value)
|
||||
return
|
||||
|
||||
self.__property_list.append((name, value))
|
||||
|
||||
def get_boolean(self, name):
|
||||
"""
|
||||
Returns True if value is "1", "yes", "true", or "on"
|
||||
|
||||
Returns False if value is "0", "no", "false", or "off"
|
||||
|
||||
String values are checked in a case-insensitive manner.
|
||||
|
||||
Any other value will cause it to raise ValueError.
|
||||
"""
|
||||
|
||||
value = self.get(name).lower()
|
||||
if value in ("1", "yes", "true", "on"):
|
||||
return True
|
||||
if value in ("0", "no", "false", "off"):
|
||||
return False
|
||||
|
||||
raise ValueError("Cannot coerce '%s' to boolean" % value)
|
0
src/core/desktop_parsing/__init__.py
Normal file
0
src/core/desktop_parsing/__init__.py
Normal file
147
src/core/desktop_parsing/app_finder.py
Normal file
147
src/core/desktop_parsing/app_finder.py
Normal file
@@ -0,0 +1,147 @@
|
||||
# Python imports
|
||||
import os
|
||||
import logging
|
||||
from collections import OrderedDict
|
||||
from itertools import chain
|
||||
from typing import Generator, List
|
||||
|
||||
# Lib imports
|
||||
from gi.repository import Gio
|
||||
from xdg.BaseDirectory import xdg_config_home, xdg_cache_home, xdg_data_dirs, xdg_data_home
|
||||
|
||||
|
||||
# Application imports
|
||||
from .file_finder import find_files
|
||||
from .db.KeyValueDb import KeyValueDb
|
||||
|
||||
|
||||
|
||||
|
||||
DEFAULT_BLACKLISTED_DIRS = [
|
||||
'/usr/share/locale',
|
||||
'/usr/share/app-install',
|
||||
'/usr/share/kservices5',
|
||||
'/usr/share/fk5',
|
||||
'/usr/share/kservicetypes5',
|
||||
'/usr/share/applications/screensavers',
|
||||
'/usr/share/kde4',
|
||||
'/usr/share/mimelnk'
|
||||
]
|
||||
|
||||
# Use utop_cache dir because of the WebKit bug
|
||||
# https://bugs.webkit.org/show_bug.cgi?id=151646
|
||||
CACHE_DIR = os.path.join(xdg_cache_home, 'utop_cache')
|
||||
|
||||
# Spec: https://specifications.freedesktop.org/menu-spec/latest/ar01s02.html
|
||||
DESKTOP_DIRS = list(filter(os.path.exists, [os.path.join(dir, "applications") for dir in xdg_data_dirs]))
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
|
||||
|
||||
def find_desktop_files(dirs: List[str] = None, pattern: str = "*.desktop") -> Generator[str, None, None]:
|
||||
"""
|
||||
Returns deduped list of .desktop files (full paths)
|
||||
|
||||
:param list dirs:
|
||||
:rtype: list
|
||||
"""
|
||||
|
||||
if dirs is None:
|
||||
dirs = DESKTOP_DIRS
|
||||
|
||||
all_files = chain.from_iterable(
|
||||
map(lambda f: os.path.join(f_path, f), find_files(f_path, pattern)) for f_path in dirs
|
||||
)
|
||||
|
||||
# NOTE: Dedup desktop file according to follow XDG data dir order.
|
||||
# Specifically the first file name (i.e. firefox.desktop) take precedence
|
||||
# and other files with the same name should be ignored
|
||||
deduped_file_dict = OrderedDict()
|
||||
for file_path in all_files:
|
||||
file_name = os.path.basename(file_path)
|
||||
if file_name not in deduped_file_dict:
|
||||
deduped_file_dict[file_name] = file_path
|
||||
|
||||
deduped_files = deduped_file_dict.values()
|
||||
|
||||
blacklisted_dirs = DEFAULT_BLACKLISTED_DIRS
|
||||
for file in deduped_files:
|
||||
try:
|
||||
if any([file.startswith(dir) for dir in blacklisted_dirs]):
|
||||
continue
|
||||
except UnicodeDecodeError:
|
||||
continue
|
||||
|
||||
yield file
|
||||
|
||||
|
||||
def filter_app(app):
|
||||
"""
|
||||
:param Gio.DesktopAppInfo app:
|
||||
:returns: True if app can be added to the database
|
||||
"""
|
||||
return app and not (app.get_is_hidden() or app.get_nodisplay()
|
||||
or app.get_string('Type') != 'Application'
|
||||
or not app.get_string('Name'))
|
||||
|
||||
|
||||
def read_desktop_file(file):
|
||||
"""
|
||||
:param str file: path to .desktop
|
||||
:rtype: :class:`Gio.DesktopAppInfo` or :code:`None`
|
||||
"""
|
||||
try:
|
||||
return Gio.DesktopAppInfo.new_from_filename(file)
|
||||
except Exception as e:
|
||||
logger.info('Could not read "%s": %s', file, e)
|
||||
return None
|
||||
|
||||
|
||||
def find_apps(dirs=None):
|
||||
"""
|
||||
:param list dirs: list of paths to `*.desktop` files
|
||||
:returns: list of :class:`Gio.DesktopAppInfo` objects
|
||||
"""
|
||||
if dirs is None:
|
||||
dirs = DESKTOP_DIRS
|
||||
|
||||
return list(filter(filter_app, map(read_desktop_file, find_desktop_files(dirs))))
|
||||
|
||||
|
||||
def find_apps_cached(dirs=None):
|
||||
"""
|
||||
:param list dirs: list of paths to `*.desktop` files
|
||||
:returns: list of :class:`Gio.DesktopAppInfo` objects
|
||||
|
||||
Pseudo code:
|
||||
>>> if cache hit:
|
||||
>>> take list of paths from cache
|
||||
>>> yield from filter(filter_app, map(read_desktop_file, cached_paths))
|
||||
>>> yield from find_apps()
|
||||
>>> save new paths to the cache
|
||||
"""
|
||||
if dirs is None:
|
||||
dirs = DESKTOP_DIRS
|
||||
|
||||
desktop_file_cache_dir = os.path.join(CACHE_DIR, 'desktop_dirs.db')
|
||||
cache = KeyValueDb(desktop_file_cache_dir).open()
|
||||
desktop_dirs = cache.find('desktop_dirs')
|
||||
|
||||
if desktop_dirs:
|
||||
for dir in desktop_dirs:
|
||||
app_info = read_desktop_file(dir)
|
||||
if filter_app(app_info):
|
||||
yield app_info
|
||||
logger.info('Found %s apps in cache', len(desktop_dirs))
|
||||
|
||||
new_desktop_dirs = []
|
||||
|
||||
for app_info in find_apps(DESKTOP_DIRS):
|
||||
yield app_info
|
||||
new_desktop_dirs.append(app_info.get_filename())
|
||||
|
||||
cache.put('desktop_dirs', new_desktop_dirs)
|
||||
cache.commit()
|
||||
logger.info('Found %s apps in the system', len(new_desktop_dirs))
|
83
src/core/desktop_parsing/db/KeyValueDb.py
Normal file
83
src/core/desktop_parsing/db/KeyValueDb.py
Normal file
@@ -0,0 +1,83 @@
|
||||
# Python imports
|
||||
import os
|
||||
import pickle
|
||||
from typing import Dict, TypeVar, Generic, Optional
|
||||
from distutils.dir_util import mkpath
|
||||
|
||||
# Lib imports
|
||||
|
||||
# Application imports
|
||||
|
||||
|
||||
|
||||
|
||||
Key = TypeVar('Key')
|
||||
Value = TypeVar('Value')
|
||||
Records = Dict[Key, Value]
|
||||
|
||||
|
||||
class KeyValueDb(Generic[Key, Value]):
|
||||
"""
|
||||
Key-value in-memory database
|
||||
Use open() method to load DB from a file and commit() to save it
|
||||
"""
|
||||
|
||||
_name = None # type: str
|
||||
_records = None # type: Records
|
||||
|
||||
def __init__(self, basename: str):
|
||||
"""
|
||||
:param str basename: path to db file
|
||||
"""
|
||||
self._name = basename
|
||||
self.set_records({})
|
||||
|
||||
def open(self) -> 'KeyValueDb':
|
||||
""" Create a new data base or open existing one... """
|
||||
if os.path.exists(self._name):
|
||||
if not os.path.isfile(self._name):
|
||||
raise IOError("%s exists and is not a file" % self._name)
|
||||
|
||||
if os.path.getsize(self._name) == 0:
|
||||
return self
|
||||
|
||||
with open(self._name, 'rb') as _in: # binary mode
|
||||
self.set_records(pickle.load(_in))
|
||||
else:
|
||||
mkpath(os.path.dirname(self._name))
|
||||
self.commit()
|
||||
|
||||
return self
|
||||
|
||||
def commit(self) -> 'KeyValueDb':
|
||||
""" Write the database to a file... """
|
||||
with open(self._name, 'wb') as out:
|
||||
pickle_protocol = 2 # use 2 for BC with Ulauncher 4
|
||||
pickle.dump(self._records, out, pickle_protocol)
|
||||
out.close()
|
||||
|
||||
return self
|
||||
|
||||
def remove(self, key: Key) -> bool:
|
||||
"""
|
||||
:param str key:
|
||||
:type: bool
|
||||
:return: True if record was removed
|
||||
"""
|
||||
try:
|
||||
del self._records[key]
|
||||
return True
|
||||
except KeyError:
|
||||
return False
|
||||
|
||||
def set_records(self, records: Records):
|
||||
self._records = records
|
||||
|
||||
def get_records(self) -> Records:
|
||||
return self._records
|
||||
|
||||
def put(self, key: Key, value: Value) -> None:
|
||||
self._records[key] = value
|
||||
|
||||
def find(self, key: Key, default: Value = None) -> Optional[Value]:
|
||||
return self._records.get(key, default)
|
42
src/core/desktop_parsing/db/KeyValueJsonDb.py
Normal file
42
src/core/desktop_parsing/db/KeyValueJsonDb.py
Normal file
@@ -0,0 +1,42 @@
|
||||
# Python imports
|
||||
import os
|
||||
import json
|
||||
from distutils.dir_util import mkpath
|
||||
|
||||
# Lib imports
|
||||
|
||||
|
||||
# Application imports
|
||||
from .KeyValueDb import KeyValueDb, Key, Value
|
||||
|
||||
|
||||
class KeyValueJsonDb(KeyValueDb[Key, Value]):
|
||||
"""
|
||||
Key-value JSON database
|
||||
Use open() method to load DB from a file and commit() to save it
|
||||
"""
|
||||
|
||||
def open(self) -> 'KeyValueJsonDb':
|
||||
"""Create a new data base or open existing one"""
|
||||
if os.path.exists(self._name):
|
||||
if not os.path.isfile(self._name):
|
||||
raise IOError("%s exists and is not a file" % self._name)
|
||||
|
||||
try:
|
||||
with open(self._name, 'r') as _in:
|
||||
self.set_records(json.load(_in))
|
||||
except json.JSONDecodeError:
|
||||
self.commit()
|
||||
else:
|
||||
mkpath(os.path.dirname(self._name))
|
||||
self.commit()
|
||||
|
||||
return self
|
||||
|
||||
def commit(self) -> 'KeyValueJsonDb':
|
||||
""" Write the database to a file... """
|
||||
with open(self._name, 'w') as out:
|
||||
json.dump(self._records, out, indent=4)
|
||||
out.close()
|
||||
|
||||
return self
|
0
src/core/desktop_parsing/db/__init__.py
Normal file
0
src/core/desktop_parsing/db/__init__.py
Normal file
21
src/core/desktop_parsing/file_finder.py
Normal file
21
src/core/desktop_parsing/file_finder.py
Normal file
@@ -0,0 +1,21 @@
|
||||
# Python imports
|
||||
import os
|
||||
from fnmatch import fnmatch
|
||||
from typing import Callable, Generator
|
||||
|
||||
# Lib imports
|
||||
|
||||
|
||||
# Application imports
|
||||
|
||||
|
||||
def find_files(directory: str, pattern: str = None, filter_fn: Callable = None) -> Generator[str, None, None]:
|
||||
"""
|
||||
Search files in `directory`
|
||||
`filter_fn` takes two arguments: directory, filename.
|
||||
If return value is False, file will be ignored
|
||||
"""
|
||||
for root, _, files in os.walk(directory):
|
||||
for basename in files:
|
||||
if (not pattern or fnmatch(basename, pattern)) and (not filter_fn or filter_fn(root, basename)):
|
||||
yield os.path.join(root, basename)
|
3
src/core/mixins/__init__.py
Normal file
3
src/core/mixins/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
||||
"""
|
||||
Generic Mixins Module
|
||||
"""
|
88
src/core/window.py
Normal file
88
src/core/window.py
Normal file
@@ -0,0 +1,88 @@
|
||||
# Python imports
|
||||
import time
|
||||
import signal
|
||||
|
||||
# Lib imports
|
||||
import gi
|
||||
import cairo
|
||||
|
||||
gi.require_version('Gtk', '3.0')
|
||||
gi.require_version('Gdk', '3.0')
|
||||
|
||||
from gi.repository import Gtk
|
||||
from gi.repository import Gdk
|
||||
from gi.repository import GLib
|
||||
|
||||
# Application imports
|
||||
from core.controller import Controller
|
||||
|
||||
|
||||
|
||||
|
||||
class Window(Gtk.ApplicationWindow):
|
||||
"""docstring for Window."""
|
||||
|
||||
def __init__(self, args, unknownargs):
|
||||
super(Window, self).__init__()
|
||||
|
||||
self._set_window_data()
|
||||
self._setup_styling()
|
||||
self._setup_signals()
|
||||
self._load_widgets(args, unknownargs)
|
||||
|
||||
self.show_all()
|
||||
|
||||
|
||||
def _setup_styling(self):
|
||||
self.set_decorated(False)
|
||||
self.set_skip_pager_hint(True)
|
||||
self.set_skip_taskbar_hint(True)
|
||||
|
||||
self.set_keep_above(True)
|
||||
self.stick()
|
||||
|
||||
self.set_default_size(1200, 600)
|
||||
self.set_title(f"{app_name}")
|
||||
self.set_icon_from_file( settings.get_window_icon() )
|
||||
self.set_gravity(5) # 5 = CENTER
|
||||
self.set_position(1) # 1 = CENTER, 4 = CENTER_ALWAYS
|
||||
|
||||
def _setup_signals(self):
|
||||
self.connect("delete-event", self._tear_down)
|
||||
GLib.unix_signal_add(GLib.PRIORITY_DEFAULT, signal.SIGINT, self._tear_down)
|
||||
|
||||
def _load_widgets(self, args, unknownargs):
|
||||
controller = Controller(args, unknownargs)
|
||||
|
||||
self.set_name(f"{app_name.lower()}")
|
||||
settings.get_builder().expose_object(f"{app_name.lower()}", self)
|
||||
|
||||
self.add( controller.get_core_widget() )
|
||||
|
||||
def _set_window_data(self) -> None:
|
||||
screen = self.get_screen()
|
||||
visual = screen.get_rgba_visual()
|
||||
|
||||
if visual != None and screen.is_composited():
|
||||
self.set_visual(visual)
|
||||
self.set_app_paintable(True)
|
||||
self.connect("draw", self._area_draw)
|
||||
|
||||
# bind css file
|
||||
cssProvider = Gtk.CssProvider()
|
||||
cssProvider.load_from_path( settings.get_css_file() )
|
||||
screen = Gdk.Screen.get_default()
|
||||
styleContext = Gtk.StyleContext()
|
||||
styleContext.add_provider_for_screen(screen, cssProvider, Gtk.STYLE_PROVIDER_PRIORITY_USER)
|
||||
|
||||
def _area_draw(self, widget: Gtk.ApplicationWindow, cr: cairo.Context) -> None:
|
||||
cr.set_source_rgba(0, 0, 0, 0.54)
|
||||
cr.set_operator(cairo.OPERATOR_SOURCE)
|
||||
cr.paint()
|
||||
cr.set_operator(cairo.OPERATOR_OVER)
|
||||
|
||||
|
||||
def _tear_down(self, widget=None, eve=None):
|
||||
settings.clear_pid()
|
||||
time.sleep(event_sleep_time)
|
||||
Gtk.main_quit()
|
3
src/plugins/__init__.py
Normal file
3
src/plugins/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
||||
"""
|
||||
Gtk Bound Plugins Module
|
||||
"""
|
63
src/plugins/manifest.py
Normal file
63
src/plugins/manifest.py
Normal file
@@ -0,0 +1,63 @@
|
||||
# Python imports
|
||||
import os, json
|
||||
from os.path import join
|
||||
|
||||
# Lib imports
|
||||
|
||||
# Application imports
|
||||
|
||||
|
||||
|
||||
|
||||
class ManifestProcessor(Exception):
|
||||
...
|
||||
|
||||
|
||||
class Plugin:
|
||||
path: str = None
|
||||
name: str = None
|
||||
author: str = None
|
||||
version: str = None
|
||||
support: str = None
|
||||
requests:{} = None
|
||||
reference: type = None
|
||||
|
||||
|
||||
class ManifestProcessor:
|
||||
def __init__(self, path, builder):
|
||||
manifest = join(path, "manifest.json")
|
||||
if not os.path.exists(manifest):
|
||||
raise Exception("Invalid Plugin Structure: Plugin doesn't have 'manifest.json'. Aboarting load...")
|
||||
|
||||
self._path = path
|
||||
self._builder = builder
|
||||
with open(manifest) as f:
|
||||
data = json.load(f)
|
||||
self._manifest = data["manifest"]
|
||||
self._plugin = self.collect_info()
|
||||
|
||||
def collect_info(self) -> Plugin:
|
||||
plugin = Plugin()
|
||||
plugin.path = self._path
|
||||
plugin.name = self._manifest["name"]
|
||||
plugin.author = self._manifest["author"]
|
||||
plugin.version = self._manifest["version"]
|
||||
plugin.support = self._manifest["support"]
|
||||
plugin.requests = self._manifest["requests"]
|
||||
|
||||
return plugin
|
||||
|
||||
def get_loading_data(self):
|
||||
loading_data = {}
|
||||
requests = self._plugin.requests
|
||||
keys = requests.keys()
|
||||
|
||||
if "pass_events" in keys:
|
||||
if requests["pass_events"] in ["true"]:
|
||||
loading_data["pass_events"] = True
|
||||
|
||||
if "bind_keys" in keys:
|
||||
if isinstance(requests["bind_keys"], list):
|
||||
loading_data["bind_keys"] = requests["bind_keys"]
|
||||
|
||||
return self._plugin, loading_data
|
60
src/plugins/plugin_base.py
Normal file
60
src/plugins/plugin_base.py
Normal file
@@ -0,0 +1,60 @@
|
||||
# Python imports
|
||||
import os, time
|
||||
|
||||
# Lib imports
|
||||
|
||||
# Application imports
|
||||
|
||||
|
||||
class PluginBaseException(Exception):
|
||||
...
|
||||
|
||||
|
||||
class PluginBase:
|
||||
def __init__(self):
|
||||
self.name = "Example Plugin" # NOTE: Need to remove after establishing private bidirectional 1-1 message bus
|
||||
# where self.name should not be needed for message comms
|
||||
|
||||
self._builder = None
|
||||
self._ui_objects = None
|
||||
self._event_system = None
|
||||
|
||||
|
||||
def run(self):
|
||||
"""
|
||||
Must define regardless if needed and can 'pass' if plugin doesn't need it.
|
||||
Is intended to be used to setup internal signals or custom Gtk Builders/UI logic.
|
||||
"""
|
||||
raise PluginBaseException("Method hasn't been overriden...")
|
||||
|
||||
def generate_reference_ui_element(self):
|
||||
"""
|
||||
Requests Key: 'ui_target': "plugin_control_list",
|
||||
Must define regardless if needed and can 'pass' if plugin doesn't use it.
|
||||
Must return a widget if "ui_target" is set.
|
||||
"""
|
||||
raise PluginBaseException("Method hasn't been overriden...")
|
||||
|
||||
def set_event_system(self, event_system):
|
||||
"""
|
||||
Requests Key: 'pass_events': "true"
|
||||
Must define in plugin if "pass_events" is set to "true" string.
|
||||
"""
|
||||
self._event_system = event_system
|
||||
|
||||
def set_ui_object_collection(self, ui_objects):
|
||||
"""
|
||||
Requests Key: "pass_ui_objects": [""]
|
||||
Request reference to a UI component. Will be passed back as array to plugin.
|
||||
Must define in plugin if set and an array of valid glade UI IDs is given.
|
||||
"""
|
||||
self._ui_objects = ui_objects
|
||||
|
||||
def subscribe_to_events(self):
|
||||
...
|
||||
|
||||
|
||||
def clear_children(self, widget: type) -> None:
|
||||
""" Clear children of a gtk widget. """
|
||||
for child in widget.get_children():
|
||||
widget.remove(child)
|
113
src/plugins/plugins_controller.py
Normal file
113
src/plugins/plugins_controller.py
Normal file
@@ -0,0 +1,113 @@
|
||||
# Python imports
|
||||
import os, sys, importlib, traceback
|
||||
from os.path import join, isdir
|
||||
|
||||
# Lib imports
|
||||
import gi
|
||||
gi.require_version('Gtk', '3.0')
|
||||
from gi.repository import Gtk, Gio
|
||||
|
||||
# Application imports
|
||||
from .manifest import Plugin, ManifestProcessor
|
||||
|
||||
|
||||
|
||||
|
||||
class InvalidPluginException(Exception):
|
||||
...
|
||||
|
||||
|
||||
class PluginsController:
|
||||
"""PluginsController controller"""
|
||||
|
||||
def __init__(self):
|
||||
path = os.path.dirname(os.path.realpath(__file__))
|
||||
sys.path.insert(0, path) # NOTE: I think I'm not using this correctly...
|
||||
|
||||
self._builder = settings.get_builder()
|
||||
self._plugins_path = settings.get_plugins_path()
|
||||
|
||||
self._plugins_dir_watcher = None
|
||||
self._plugin_collection = []
|
||||
|
||||
|
||||
def launch_plugins(self) -> None:
|
||||
self._set_plugins_watcher()
|
||||
self.load_plugins()
|
||||
|
||||
def _set_plugins_watcher(self) -> None:
|
||||
self._plugins_dir_watcher = Gio.File.new_for_path(self._plugins_path) \
|
||||
.monitor_directory(Gio.FileMonitorFlags.WATCH_MOVES, Gio.Cancellable())
|
||||
self._plugins_dir_watcher.connect("changed", self._on_plugins_changed, ())
|
||||
|
||||
def _on_plugins_changed(self, file_monitor, file, other_file=None, eve_type=None, data=None):
|
||||
if eve_type in [Gio.FileMonitorEvent.CREATED, Gio.FileMonitorEvent.DELETED,
|
||||
Gio.FileMonitorEvent.RENAMED, Gio.FileMonitorEvent.MOVED_IN,
|
||||
Gio.FileMonitorEvent.MOVED_OUT]:
|
||||
self.reload_plugins(file)
|
||||
|
||||
def load_plugins(self, file: str = None) -> None:
|
||||
print(f"Loading plugins...")
|
||||
parent_path = os.getcwd()
|
||||
|
||||
for path, folder in [[join(self._plugins_path, item), item] if os.path.isdir(join(self._plugins_path, item)) else None for item in os.listdir(self._plugins_path)]:
|
||||
try:
|
||||
target = join(path, "plugin.py")
|
||||
manifest = ManifestProcessor(path, self._builder)
|
||||
|
||||
if not os.path.exists(target):
|
||||
raise InvalidPluginException("Invalid Plugin Structure: Plugin doesn't have 'plugin.py'. Aboarting load...")
|
||||
|
||||
plugin, loading_data = manifest.get_loading_data()
|
||||
module = self.load_plugin_module(path, folder, target)
|
||||
self.execute_plugin(module, plugin, loading_data)
|
||||
except Exception as e:
|
||||
print(f"Malformed Plugin: Not loading -->: '{folder}' !")
|
||||
traceback.print_exc()
|
||||
|
||||
os.chdir(parent_path)
|
||||
|
||||
|
||||
def load_plugin_module(self, path, folder, target):
|
||||
os.chdir(path)
|
||||
|
||||
locations = []
|
||||
self.collect_search_locations(path, locations)
|
||||
|
||||
spec = importlib.util.spec_from_file_location(folder, target, submodule_search_locations = locations)
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
sys.modules[folder] = module
|
||||
spec.loader.exec_module(module)
|
||||
|
||||
return module
|
||||
|
||||
def collect_search_locations(self, path, locations):
|
||||
locations.append(path)
|
||||
for file in os.listdir(path):
|
||||
_path = os.path.join(path, file)
|
||||
if os.path.isdir(_path):
|
||||
self.collect_search_locations(_path, locations)
|
||||
|
||||
def execute_plugin(self, module: type, plugin: Plugin, loading_data: []):
|
||||
plugin.reference = module.Plugin()
|
||||
keys = loading_data.keys()
|
||||
|
||||
if "ui_target" in keys:
|
||||
loading_data["ui_target"].add( plugin.reference.generate_reference_ui_element() )
|
||||
loading_data["ui_target"].show_all()
|
||||
|
||||
if "pass_ui_objects" in keys:
|
||||
plugin.reference.set_ui_object_collection( loading_data["pass_ui_objects"] )
|
||||
|
||||
if "pass_events" in keys:
|
||||
plugin.reference.set_fm_event_system(event_system)
|
||||
plugin.reference.subscribe_to_events()
|
||||
|
||||
if "bind_keys" in keys:
|
||||
keybindings.append_bindings( loading_data["bind_keys"] )
|
||||
|
||||
plugin.reference.run()
|
||||
self._plugin_collection.append(plugin)
|
||||
|
||||
def reload_plugins(self, file: str = None) -> None:
|
||||
print(f"Reloading plugins... stub.")
|
3
src/utils/__init__.py
Normal file
3
src/utils/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
||||
"""
|
||||
Utils module
|
||||
"""
|
22
src/utils/endpoint_registry.py
Normal file
22
src/utils/endpoint_registry.py
Normal file
@@ -0,0 +1,22 @@
|
||||
# Python imports
|
||||
|
||||
# Lib imports
|
||||
|
||||
# Application imports
|
||||
|
||||
|
||||
|
||||
|
||||
class EndpointRegistry():
|
||||
def __init__(self):
|
||||
self._endpoints = {}
|
||||
|
||||
def register(self, rule, **options):
|
||||
def decorator(f):
|
||||
self._endpoints[rule] = f
|
||||
return f
|
||||
|
||||
return decorator
|
||||
|
||||
def get_endpoints(self):
|
||||
return self._endpoints
|
30
src/utils/event_system.py
Normal file
30
src/utils/event_system.py
Normal file
@@ -0,0 +1,30 @@
|
||||
# Python imports
|
||||
from collections import defaultdict
|
||||
|
||||
# Lib imports
|
||||
|
||||
# Application imports
|
||||
|
||||
|
||||
|
||||
|
||||
class EventSystem:
|
||||
""" Create event system. """
|
||||
|
||||
def __init__(self):
|
||||
self.subscribers = defaultdict(list)
|
||||
|
||||
|
||||
def subscribe(self, event_type, fn):
|
||||
self.subscribers[event_type].append(fn)
|
||||
|
||||
def emit(self, event_type, data = None):
|
||||
if event_type in self.subscribers:
|
||||
for fn in self.subscribers[event_type]:
|
||||
if data:
|
||||
if hasattr(data, '__iter__') and not type(data) is str:
|
||||
fn(*data)
|
||||
else:
|
||||
fn(data)
|
||||
else:
|
||||
fn()
|
102
src/utils/ipc_server.py
Normal file
102
src/utils/ipc_server.py
Normal file
@@ -0,0 +1,102 @@
|
||||
# Python imports
|
||||
import os, threading, time
|
||||
from multiprocessing.connection import Listener, Client
|
||||
|
||||
# Lib imports
|
||||
|
||||
# Application imports
|
||||
|
||||
|
||||
|
||||
|
||||
class IPCServer:
|
||||
""" Create a listener so that other {app_name} instances send requests back to existing instance. """
|
||||
def __init__(self, ipc_address: str = '127.0.0.1', conn_type: str = "socket"):
|
||||
self.is_ipc_alive = False
|
||||
self._ipc_port = 4848
|
||||
self._ipc_address = ipc_address
|
||||
self._conn_type = conn_type
|
||||
self._ipc_authkey = b'' + bytes(f'{app_name}-ipc', 'utf-8')
|
||||
self._ipc_timeout = 15.0
|
||||
|
||||
if conn_type == "socket":
|
||||
self._ipc_address = f'/tmp/{app_name}-ipc.sock'
|
||||
elif conn_type == "full_network":
|
||||
self._ipc_address = '0.0.0.0'
|
||||
elif conn_type == "full_network_unsecured":
|
||||
self._ipc_authkey = None
|
||||
self._ipc_address = '0.0.0.0'
|
||||
elif conn_type == "local_network_unsecured":
|
||||
self._ipc_authkey = None
|
||||
|
||||
self._subscribe_to_events()
|
||||
|
||||
def _subscribe_to_events(self):
|
||||
event_system.subscribe("post_file_to_ipc", self.send_ipc_message)
|
||||
|
||||
|
||||
def create_ipc_listener(self) -> None:
|
||||
if self._conn_type == "socket":
|
||||
if os.path.exists(self._ipc_address) and settings.is_dirty_start():
|
||||
os.unlink(self._ipc_address)
|
||||
|
||||
listener = Listener(address=self._ipc_address, family="AF_UNIX", authkey=self._ipc_authkey)
|
||||
elif "unsecured" not in self._conn_type:
|
||||
listener = Listener((self._ipc_address, self._ipc_port), authkey=self._ipc_authkey)
|
||||
else:
|
||||
listener = Listener((self._ipc_address, self._ipc_port))
|
||||
|
||||
|
||||
self.is_ipc_alive = True
|
||||
self._run_ipc_loop(listener)
|
||||
|
||||
@daemon_threaded
|
||||
def _run_ipc_loop(self, listener) -> None:
|
||||
while True:
|
||||
conn = listener.accept()
|
||||
start_time = time.perf_counter()
|
||||
self._handle_ipc_message(conn, start_time)
|
||||
|
||||
listener.close()
|
||||
|
||||
def _handle_ipc_message(self, conn, start_time) -> None:
|
||||
while True:
|
||||
msg = conn.recv()
|
||||
if settings.is_debug():
|
||||
print(msg)
|
||||
|
||||
if "FILE|" in msg:
|
||||
file = msg.split("FILE|")[1].strip()
|
||||
if file:
|
||||
event_system.emit("handle_file_from_ipc", file)
|
||||
|
||||
conn.close()
|
||||
break
|
||||
|
||||
|
||||
if msg in ['close connection', 'close server']:
|
||||
conn.close()
|
||||
break
|
||||
|
||||
# NOTE: Not perfect but insures we don't lock up the connection for too long.
|
||||
end_time = time.perf_counter()
|
||||
if (end_time - start_time) > self._ipc_timeout:
|
||||
conn.close()
|
||||
break
|
||||
|
||||
|
||||
def send_ipc_message(self, message: str = "Empty Data...") -> None:
|
||||
try:
|
||||
if self._conn_type == "socket":
|
||||
conn = Client(address=self._ipc_address, family="AF_UNIX", authkey=self._ipc_authkey)
|
||||
elif "unsecured" not in self._conn_type:
|
||||
conn = Client((self._ipc_address, self._ipc_port), authkey=self._ipc_authkey)
|
||||
else:
|
||||
conn = Client((self._ipc_address, self._ipc_port))
|
||||
|
||||
conn.send(message)
|
||||
conn.close()
|
||||
except ConnectionRefusedError as e:
|
||||
print("Connection refused...")
|
||||
except Exception as e:
|
||||
print(repr(e))
|
127
src/utils/keybindings.py
Normal file
127
src/utils/keybindings.py
Normal file
@@ -0,0 +1,127 @@
|
||||
# Python imports
|
||||
import re
|
||||
|
||||
# Gtk imports
|
||||
import gi
|
||||
gi.require_version('Gdk', '3.0')
|
||||
from gi.repository import Gdk
|
||||
|
||||
# Application imports
|
||||
|
||||
|
||||
|
||||
def err(log = ""):
|
||||
"""Print an error message"""
|
||||
print(log)
|
||||
|
||||
|
||||
class KeymapError(Exception):
|
||||
"""Custom exception for errors in keybinding configurations"""
|
||||
|
||||
MODIFIER = re.compile('<([^<]+)>')
|
||||
class Keybindings:
|
||||
"""Class to handle loading and lookup of Terminator keybindings"""
|
||||
|
||||
modifiers = {
|
||||
'ctrl': Gdk.ModifierType.CONTROL_MASK,
|
||||
'control': Gdk.ModifierType.CONTROL_MASK,
|
||||
'primary': Gdk.ModifierType.CONTROL_MASK,
|
||||
'shift': Gdk.ModifierType.SHIFT_MASK,
|
||||
'alt': Gdk.ModifierType.MOD1_MASK,
|
||||
'super': Gdk.ModifierType.SUPER_MASK,
|
||||
'hyper': Gdk.ModifierType.HYPER_MASK,
|
||||
'mod2': Gdk.ModifierType.MOD2_MASK
|
||||
}
|
||||
|
||||
empty = {}
|
||||
keys = None
|
||||
_masks = None
|
||||
_lookup = None
|
||||
|
||||
def __init__(self):
|
||||
self.keymap = Gdk.Keymap.get_default()
|
||||
self.configure({})
|
||||
|
||||
def configure(self, bindings):
|
||||
"""Accept new bindings and reconfigure with them"""
|
||||
self.keys = bindings
|
||||
self.reload()
|
||||
|
||||
def reload(self):
|
||||
"""Parse bindings and mangle into an appropriate form"""
|
||||
self._lookup = {}
|
||||
self._masks = 0
|
||||
|
||||
for action, bindings in list(self.keys.items()):
|
||||
if isinstance(bindings, list):
|
||||
bindings = (*bindings,)
|
||||
elif not isinstance(bindings, tuple):
|
||||
bindings = (bindings,)
|
||||
|
||||
|
||||
for binding in bindings:
|
||||
if not binding or binding == "None":
|
||||
continue
|
||||
|
||||
try:
|
||||
keyval, mask = self._parsebinding(binding)
|
||||
# Does much the same, but with poorer error handling.
|
||||
#keyval, mask = Gtk.accelerator_parse(binding)
|
||||
except KeymapError as e:
|
||||
err ("keybinding reload failed to parse binding '%s': %s" % (binding, e))
|
||||
else:
|
||||
if mask & Gdk.ModifierType.SHIFT_MASK:
|
||||
if keyval == Gdk.KEY_Tab:
|
||||
keyval = Gdk.KEY_ISO_Left_Tab
|
||||
mask &= ~Gdk.ModifierType.SHIFT_MASK
|
||||
else:
|
||||
keyvals = Gdk.keyval_convert_case(keyval)
|
||||
if keyvals[0] != keyvals[1]:
|
||||
keyval = keyvals[1]
|
||||
mask &= ~Gdk.ModifierType.SHIFT_MASK
|
||||
else:
|
||||
keyval = Gdk.keyval_to_lower(keyval)
|
||||
|
||||
self._lookup.setdefault(mask, {})
|
||||
self._lookup[mask][keyval] = action
|
||||
self._masks |= mask
|
||||
|
||||
def _parsebinding(self, binding):
|
||||
"""Parse an individual binding using Gtk's binding function"""
|
||||
mask = 0
|
||||
modifiers = re.findall(MODIFIER, binding)
|
||||
|
||||
if modifiers:
|
||||
for modifier in modifiers:
|
||||
mask |= self._lookup_modifier(modifier)
|
||||
|
||||
key = re.sub(MODIFIER, '', binding)
|
||||
if key == '':
|
||||
raise KeymapError('No key found!')
|
||||
|
||||
keyval = Gdk.keyval_from_name(key)
|
||||
|
||||
if keyval == 0:
|
||||
raise KeymapError("Key '%s' is unrecognised..." % key)
|
||||
return (keyval, mask)
|
||||
|
||||
def _lookup_modifier(self, modifier):
|
||||
"""Map modifier names to gtk values"""
|
||||
try:
|
||||
return self.modifiers[modifier.lower()]
|
||||
except KeyError:
|
||||
raise KeymapError("Unhandled modifier '<%s>'" % modifier)
|
||||
|
||||
def lookup(self, event):
|
||||
"""Translate a keyboard event into a mapped key"""
|
||||
try:
|
||||
_found, keyval, _egp, _lvl, consumed = self.keymap.translate_keyboard_state(
|
||||
event.hardware_keycode,
|
||||
Gdk.ModifierType(event.get_state() & ~Gdk.ModifierType.LOCK_MASK),
|
||||
event.group)
|
||||
except TypeError:
|
||||
err ("Keybinding lookup failed to translate keyboard event: %s" % dir(event))
|
||||
return None
|
||||
|
||||
mask = (event.get_state() & ~consumed) & self._masks
|
||||
return self._lookup.get(mask, self.empty).get(keyval, None)
|
56
src/utils/logger.py
Normal file
56
src/utils/logger.py
Normal file
@@ -0,0 +1,56 @@
|
||||
# Python imports
|
||||
import os, logging
|
||||
|
||||
# Application imports
|
||||
|
||||
|
||||
class Logger:
|
||||
"""
|
||||
Create a new logging object and return it.
|
||||
:note:
|
||||
NOSET # Don't know the actual log level of this... (defaulting or literally none?)
|
||||
Log Levels (From least to most)
|
||||
Type Value
|
||||
CRITICAL 50
|
||||
ERROR 40
|
||||
WARNING 30
|
||||
INFO 20
|
||||
DEBUG 10
|
||||
:param loggerName: Sets the name of the logger object. (Used in log lines)
|
||||
:param createFile: Whether we create a log file or just pump to terminal
|
||||
|
||||
:return: the logging object we created
|
||||
"""
|
||||
|
||||
def __init__(self, config_path: str, _ch_log_lvl = logging.CRITICAL, _fh_log_lvl = logging.INFO):
|
||||
self._CONFIG_PATH = config_path
|
||||
self.global_lvl = logging.DEBUG # Keep this at highest so that handlers can filter to their desired levels
|
||||
self.ch_log_lvl = _ch_log_lvl # Prety much the only one we ever change
|
||||
self.fh_log_lvl = _fh_log_lvl
|
||||
|
||||
def get_logger(self, loggerName: str = "NO_LOGGER_NAME_PASSED", createFile: bool = True) -> logging.Logger:
|
||||
log = logging.getLogger(loggerName)
|
||||
log.setLevel(self.global_lvl)
|
||||
|
||||
# Set our log output styles
|
||||
fFormatter = logging.Formatter('[%(asctime)s] %(pathname)s:%(lineno)d %(levelname)s - %(message)s', '%m-%d %H:%M:%S')
|
||||
cFormatter = logging.Formatter('%(pathname)s:%(lineno)d] %(levelname)s - %(message)s')
|
||||
|
||||
ch = logging.StreamHandler()
|
||||
ch.setLevel(level=self.ch_log_lvl)
|
||||
ch.setFormatter(cFormatter)
|
||||
log.addHandler(ch)
|
||||
|
||||
if createFile:
|
||||
folder = self._CONFIG_PATH
|
||||
file = f"{folder}/application.log"
|
||||
|
||||
if not os.path.exists(folder):
|
||||
os.mkdir(folder)
|
||||
|
||||
fh = logging.FileHandler(file)
|
||||
fh.setLevel(level=self.fh_log_lvl)
|
||||
fh.setFormatter(fFormatter)
|
||||
log.addHandler(fh)
|
||||
|
||||
return log
|
152
src/utils/settings.py
Normal file
152
src/utils/settings.py
Normal file
@@ -0,0 +1,152 @@
|
||||
# Python imports
|
||||
import os
|
||||
import json
|
||||
import inspect
|
||||
|
||||
# Gtk imports
|
||||
|
||||
# Application imports
|
||||
from .logger import Logger
|
||||
|
||||
|
||||
|
||||
|
||||
class Settings:
|
||||
def __init__(self):
|
||||
self._SCRIPT_PTH = os.path.dirname(os.path.realpath(__file__))
|
||||
self._USER_HOME = os.path.expanduser('~')
|
||||
self._CONFIG_PATH = f"{self._USER_HOME}/.config/{app_name.lower()}"
|
||||
self._PLUGINS_PATH = f"{self._CONFIG_PATH}/plugins"
|
||||
self._GLADE_FILE = f"{self._CONFIG_PATH}/Main_Window.glade"
|
||||
self._KEY_BINDINGS_FILE = f"{self._CONFIG_PATH}/key-bindings.json"
|
||||
self._CSS_FILE = f"{self._CONFIG_PATH}/stylesheet.css"
|
||||
self._DEFAULT_ICONS = f"{self._CONFIG_PATH}/icons"
|
||||
self._PID_FILE = f"{self._CONFIG_PATH}/{app_name.lower()}.pid"
|
||||
self._WINDOW_ICON = f"{self._DEFAULT_ICONS}/{app_name.lower()}.png"
|
||||
self._USR_PATH = f"/usr/share/{app_name.lower()}"
|
||||
|
||||
if not os.path.exists(self._CONFIG_PATH):
|
||||
os.mkdir(self._CONFIG_PATH)
|
||||
if not os.path.exists(self._PLUGINS_PATH):
|
||||
os.mkdir(self._PLUGINS_PATH)
|
||||
|
||||
if not os.path.exists(self._GLADE_FILE):
|
||||
self._GLADE_FILE = f"{self._USR_PATH}/Main_Window.glade"
|
||||
if not os.path.exists(self._KEY_BINDINGS_FILE):
|
||||
self._KEY_BINDINGS_FILE = f"{self._USR_PATH}/key-bindings.json"
|
||||
if not os.path.exists(self._CSS_FILE):
|
||||
self._CSS_FILE = f"{self._USR_PATH}/stylesheet.css"
|
||||
if not os.path.exists(self._WINDOW_ICON):
|
||||
self._WINDOW_ICON = f"{self._USR_PATH}/icons/{app_name.lower()}.png"
|
||||
if not os.path.exists(self._DEFAULT_ICONS):
|
||||
self.DEFAULT_ICONS = f"{self._USR_PATH}/icons"
|
||||
|
||||
# '_filters'
|
||||
self._office_filter = ('.doc', '.docx', '.xls', '.xlsx', '.xlt', '.xltx', '.xlm', '.ppt', 'pptx', '.pps', '.ppsx', '.odt', '.rtf')
|
||||
self._vids_filter = ('.mkv', '.avi', '.flv', '.mov', '.m4v', '.mpg', '.wmv', '.mpeg', '.mp4', '.webm')
|
||||
self._txt_filter = ('.txt', '.text', '.sh', '.cfg', '.conf')
|
||||
self._music_filter = ('.psf', '.mp3', '.ogg' , '.flac')
|
||||
self._images_filter = ('.png', '.jpg', '.jpeg', '.gif', '.ico', '.tga')
|
||||
self._pdf_filter = ('.pdf')
|
||||
|
||||
self._success_color = "#88cc27"
|
||||
self._warning_color = "#ffa800"
|
||||
self._error_color = "#ff0000"
|
||||
|
||||
with open(self._KEY_BINDINGS_FILE) as file:
|
||||
bindings = json.load(file)["keybindings"]
|
||||
keybindings.configure(bindings)
|
||||
|
||||
self._guake_key = bindings["guake_key"]
|
||||
|
||||
self._builder = None
|
||||
self._logger = Logger(self._CONFIG_PATH).get_logger()
|
||||
|
||||
self._trace_debug = False
|
||||
self._debug = False
|
||||
self._dirty_start = False
|
||||
|
||||
|
||||
def do_dirty_start_check(self):
|
||||
if not os.path.exists(self._PID_FILE):
|
||||
self._write_new_pid()
|
||||
else:
|
||||
with open(self._PID_FILE, "r") as _pid:
|
||||
pid = _pid.readline().strip()
|
||||
if pid not in ("", None):
|
||||
self._check_alive_status(int(pid))
|
||||
else:
|
||||
self._write_new_pid()
|
||||
|
||||
""" Check For the existence of a unix pid. """
|
||||
def _check_alive_status(self, pid):
|
||||
print(f"PID Found: {pid}")
|
||||
try:
|
||||
os.kill(pid, 0)
|
||||
except OSError:
|
||||
print(f"{app_name} is starting dirty...")
|
||||
self._dirty_start = True
|
||||
self._write_new_pid()
|
||||
return
|
||||
|
||||
print("PID is alive... Let downstream errors (sans debug args) handle app closure propigation.")
|
||||
|
||||
def _write_new_pid(self):
|
||||
pid = os.getpid()
|
||||
self._write_pid(pid)
|
||||
|
||||
def _clean_pid(self):
|
||||
os.unlink(self._PID_FILE)
|
||||
|
||||
def _write_pid(self, pid):
|
||||
with open(self._PID_FILE, "w") as _pid:
|
||||
_pid.write(f"{pid}")
|
||||
|
||||
def register_signals_to_builder(self, classes=None):
|
||||
handlers = {}
|
||||
|
||||
for c in classes:
|
||||
methods = None
|
||||
try:
|
||||
methods = inspect.getmembers(c, predicate=inspect.ismethod)
|
||||
handlers.update(methods)
|
||||
except Exception as e:
|
||||
print(repr(e))
|
||||
|
||||
self._builder.connect_signals(handlers)
|
||||
|
||||
|
||||
def set_builder(self, builder) -> any: self._builder = builder
|
||||
def get_builder(self) -> any: return self._builder
|
||||
def get_glade_file(self) -> str: return self._GLADE_FILE
|
||||
|
||||
def get_logger(self) -> Logger: return self._logger
|
||||
def get_plugins_path(self) -> str: return self._PLUGINS_PATH
|
||||
def get_icon_theme(self) -> str: return self._ICON_THEME
|
||||
def get_css_file(self) -> str: return self._CSS_FILE
|
||||
def get_window_icon(self) -> str: return self._WINDOW_ICON
|
||||
def get_home_path(self) -> str: return self._USER_HOME
|
||||
|
||||
# Filter returns
|
||||
def get_office_filter(self) -> tuple: return self._office_filter
|
||||
def get_vids_filter(self) -> tuple: return self._vids_filter
|
||||
def get_text_filter(self) -> tuple: return self._txt_filter
|
||||
def get_music_filter(self) -> tuple: return self._music_filter
|
||||
def get_images_filter(self) -> tuple: return self._images_filter
|
||||
def get_pdf_filter(self) -> tuple: return self._pdf_filter
|
||||
def get_guake_key(self) -> tuple: return self._guake_key
|
||||
|
||||
def get_success_color(self) -> str: return self._success_color
|
||||
def get_warning_color(self) -> str: return self._warning_color
|
||||
def get_error_color(self) -> str: return self._error_color
|
||||
|
||||
def is_trace_debug(self) -> str: return self._trace_debug
|
||||
def is_debug(self) -> str: return self._debug
|
||||
def is_dirty_start(self) -> bool: return self._dirty_start
|
||||
def clear_pid(self): self._clean_pid()
|
||||
|
||||
def set_trace_debug(self, trace_debug):
|
||||
self._trace_debug = trace_debug
|
||||
|
||||
def set_debug(self, debug):
|
||||
self._debug = debug
|
Reference in New Issue
Block a user