From 867c651a046c0c6cb91b4f30956dc54922d54338 Mon Sep 17 00:00:00 2001 From: itdominator <1itdominator@gmail.com> Date: Mon, 3 Oct 2022 20:50:38 -0500 Subject: [PATCH] Changed search plugin IPC logic --- plugins/searcher/ipc_server.py | 104 +++++++++++++++++++++++++++++++++ plugins/searcher/plugin.py | 36 ++---------- plugins/searcher/search.py | 67 +++++++++++---------- 3 files changed, 143 insertions(+), 64 deletions(-) create mode 100644 plugins/searcher/ipc_server.py diff --git a/plugins/searcher/ipc_server.py b/plugins/searcher/ipc_server.py new file mode 100644 index 0000000..c654842 --- /dev/null +++ b/plugins/searcher/ipc_server.py @@ -0,0 +1,104 @@ +# Python imports +import os, threading, time, pickle +from multiprocessing.connection import Listener, Client + +# Lib imports +import gi +gi.require_version('Gtk', '3.0') +from gi.repository import Gtk, GLib + +# Application imports + + + + +class IPCServer: + """ Create a listener so that other SolarFM instances send requests back to existing instance. """ + def __init__(self, ipc_address: str = '127.0.0.1', conn_type: str = "socket"): + self.is_ipc_alive = False + self._ipc_port = 4848 + self._ipc_address = ipc_address + self._conn_type = conn_type + self._ipc_authkey = b'' + bytes(f'solarfm-search_grep-ipc', 'utf-8') + self._ipc_timeout = 15.0 + + if conn_type == "socket": + self._ipc_address = f'/tmp/solarfm-search_grep-ipc.sock' + elif conn_type == "full_network": + self._ipc_address = '0.0.0.0' + elif conn_type == "full_network_unsecured": + self._ipc_authkey = None + self._ipc_address = '0.0.0.0' + elif conn_type == "local_network_unsecured": + self._ipc_authkey = None + + + @daemon_threaded + def create_ipc_listener(self) -> None: + if self._conn_type == "socket": + if os.path.exists(self._ipc_address): + os.unlink(self._ipc_address) + + listener = Listener(address=self._ipc_address, family="AF_UNIX", authkey=self._ipc_authkey) + elif "unsecured" not in self._conn_type: + listener = Listener((self._ipc_address, self._ipc_port), authkey=self._ipc_authkey) + else: + listener = Listener((self._ipc_address, self._ipc_port)) + + + self.is_ipc_alive = True + while True: + conn = listener.accept() + start_time = time.perf_counter() + self.handle_message(conn, start_time) + + listener.close() + + def handle_message(self, conn, start_time) -> None: + while True: + msg = conn.recv() + data = msg + + if "SEARCH|" in msg: + file = msg.split("SEARCH|")[1].strip() + if file: + GLib.idle_add(self._load_file_ui, file) + + conn.close() + break + + if "GREP|" in msg: + data = msg.split("GREP|")[1].strip() + if data: + GLib.idle_add(self._load_grep_ui, data) + + conn.close() + break + + + if msg in ['close connection', 'close server']: + conn.close() + break + + # NOTE: Not perfect but insures we don't lock up the connection for too long. + end_time = time.perf_counter() + if (end_time - start_time) > self._ipc_timeout: + conn.close() + break + + + def send_ipc_message(self, message: str = "Empty Data...") -> None: + try: + if self._conn_type == "socket": + conn = Client(address=self._ipc_address, family="AF_UNIX", authkey=self._ipc_authkey) + elif "unsecured" not in self._conn_type: + conn = Client((self._ipc_address, self._ipc_port), authkey=self._ipc_authkey) + else: + conn = Client((self._ipc_address, self._ipc_port)) + + conn.send(message) + conn.close() + except ConnectionRefusedError as e: + print("Connection refused...") + except Exception as e: + print(repr(e)) diff --git a/plugins/searcher/plugin.py b/plugins/searcher/plugin.py index 3a2ae7c..cedbfdf 100644 --- a/plugins/searcher/plugin.py +++ b/plugins/searcher/plugin.py @@ -1,5 +1,5 @@ # Python imports -import os, threading, subprocess, inspect, time, json, base64, shlex, select, signal, pickle +import os, threading, subprocess, inspect, time, json, base64, shlex, select, signal # Lib imports import gi @@ -8,7 +8,7 @@ from gi.repository import Gtk, GLib # Application imports from plugins.plugin_base import PluginBase - +from .ipc_server import IPCServer @@ -74,7 +74,7 @@ class GrepPreviewWidget(Gtk.Box): pause_fifo_update = False -class Plugin(PluginBase): +class Plugin(IPCServer, PluginBase): def __init__(self): super().__init__() @@ -83,9 +83,6 @@ class Plugin(PluginBase): # where self.name should not be needed for message comms self._GLADE_FILE = f"{self.path}/search_dialog.glade" - self._files_fifo_file = f"/tmp/search_files_fifo" - self._grep_fifo_file = f"/tmp/grep_files_fifo" - self._search_dialog = None self._active_path = None self._file_list = None @@ -123,30 +120,7 @@ class Plugin(PluginBase): self._event_system.subscribe("update-file-ui", self._load_file_ui) self._event_system.subscribe("update-grep-ui", self._load_grep_ui) - if not os.path.exists(self._files_fifo_file): - os.mkfifo(self._files_fifo_file, 0o777) - if not os.path.exists(self._grep_fifo_file): - os.mkfifo(self._grep_fifo_file, 0o777) - - self.run_files_fifo_thread() - self.run_grep_fifo_thread() - - - @daemon_threaded - def run_files_fifo_thread(self): - with open(self._files_fifo_file) as fifo: - while True: - select.select([fifo],[],[fifo]) - data = fifo.read() - GLib.idle_add(self._load_file_ui, data) - - @daemon_threaded - def run_grep_fifo_thread(self): - with open(self._grep_fifo_file) as fifo: - while True: - select.select([fifo],[],[fifo]) - data = fifo.read() - GLib.idle_add(self._load_grep_ui, data) + self.create_ipc_listener() def _show_grep_list_page(self, widget=None, eve=None): @@ -166,9 +140,7 @@ class Plugin(PluginBase): query = widget.get_text() if not query in ("", None): target_dir = shlex.quote( self._fm_state.tab.get_current_directory() ) - # command = [f"{self.path}/search.sh", "-t", "file_search", "-d", f"{target_dir}", "-q", f"{query}"] command = ["python", f"{self.path}/search.py", "-t", "file_search", "-d", f"{target_dir}", "-q", f"{query}"] - process = subprocess.Popen(command, cwd=self.path, stdin=None, stdout=None, stderr=None) def _stop_find_file_query(self, widget=None, eve=None): diff --git a/plugins/searcher/search.py b/plugins/searcher/search.py index 5756dfe..f06f3fa 100755 --- a/plugins/searcher/search.py +++ b/plugins/searcher/search.py @@ -2,8 +2,9 @@ # Python imports -import os, traceback, argparse, time, json, base64 +import os, traceback, argparse, json, base64 from setproctitle import setproctitle +from multiprocessing.connection import Client # Lib imports @@ -11,8 +12,9 @@ from setproctitle import setproctitle -_files_fifo_file = f"/tmp/search_files_fifo" -_grep_fifo_file = f"/tmp/grep_files_fifo" + +_ipc_address = f'/tmp/solarfm-search_grep-ipc.sock' +_ipc_authkey = b'' + bytes(f'solarfm-search_grep-ipc', 'utf-8') filter = (".mkv", ".mp4", ".webm", ".avi", ".mov", ".m4v", ".mpg", ".mpeg", ".wmv", ".flv") + \ (".png", ".jpg", ".jpeg", ".gif", ".ico", ".tga", ".webp") + \ @@ -21,46 +23,49 @@ filter = (".mkv", ".mp4", ".webm", ".avi", ".mov", ".m4v", ".mpg", ".mpeg", ".wm file_result_set = [] -def file_search(fifo, path, query): +def send_ipc_message(message) -> None: + try: + conn = Client(address=_ipc_address, family="AF_UNIX", authkey=_ipc_authkey) + + conn.send(message) + conn.close() + except ConnectionRefusedError as e: + print("Connection refused...") + except Exception as e: + print(repr(e)) + + +def file_search(path, query): try: for file in os.listdir(path): target = os.path.join(path, file) if os.path.isdir(target): - file_search(fifo, target, query) + file_search(target, query) else: if query.lower() in file.lower(): - # file_result_set.append([target, file]) - data = json.dumps([target, file]) - fifo.write(data) - time.sleep(0.01) + data = f"SEARCH|{json.dumps([target, file])}" + send_ipc_message(data) except Exception as e: print("Couldn't traverse to path. Might be permissions related...") traceback.print_exc() def _search_for_string(file, query): - try: - b64_file = base64.urlsafe_b64encode(file.encode('utf-8')).decode('utf-8') - grep_result_set = {} + b64_file = base64.urlsafe_b64encode(file.encode('utf-8')).decode('utf-8') + grep_result_set = {} - with open(file, 'r') as fp: - for i, line in enumerate(fp): - if query in line: - b64_line = base64.urlsafe_b64encode(line.encode('utf-8')).decode('utf-8') + with open(file, 'r') as fp: + for i, line in enumerate(fp): + if query in line: + b64_line = base64.urlsafe_b64encode(line.encode('utf-8')).decode('utf-8') - if f"{b64_file}" in grep_result_set.keys(): - grep_result_set[f"{b64_file}"][f"{i+1}"] = b64_line - else: - grep_result_set[f"{b64_file}"] = {} - grep_result_set[f"{b64_file}"] = {f"{i+1}": b64_line} + if f"{b64_file}" in grep_result_set.keys(): + grep_result_set[f"{b64_file}"][f"{i+1}"] = b64_line + else: + grep_result_set[f"{b64_file}"] = {} + grep_result_set[f"{b64_file}"] = {f"{i+1}": b64_line} - # NOTE: Push to fifo here after loop - with open(_grep_fifo_file, 'w') as fifo: - data = json.dumps(grep_result_set) - fifo.write(data) - time.sleep(0.05) - except Exception as e: - print("Couldn't read file. Might be binary or other cause...") - traceback.print_exc() + data = f"GREP|{json.dumps(grep_result_set)}" + send_ipc_message(data) def grep_search(path, query): @@ -78,8 +83,7 @@ def grep_search(path, query): def search(args): if args.type == "file_search": - with open(_files_fifo_file, 'w') as fifo: - file_search(fifo, args.dir, args.query) + file_search(args.dir, args.query) if args.type == "grep_search": grep_search(args.dir, args.query) @@ -98,6 +102,5 @@ if __name__ == "__main__": # Read arguments (If any...) args = parser.parse_args() search(args) - except Exception as e: traceback.print_exc()