#!/usr/bin/python3 # Python imports import os import traceback import argparse import subprocess import json import base64 import time from datetime import datetime from setproctitle import setproctitle from multiprocessing.connection import Client # Lib imports # Application imports _ipc_address = f'/tmp/solarfm-search_grep-ipc.sock' _ipc_authkey = b'' + bytes(f'solarfm-search_grep-ipc', 'utf-8') filter = (".cpp", ".css", ".c", ".go", ".html", ".htm", ".java", ".js", ".json", ".lua", ".md", ".py", ".rs", ".toml", ".xml", ".pom") + \ (".txt", ".text", ".sh", ".cfg", ".conf", ".log") # NOTE: Create timestamp of when this launched. Is used in IPC to see if # we are stale and that new call didn't fully kill this or older processes. dt = datetime.now() ts = datetime.timestamp(dt) def send_ipc_message(message) -> None: conn = Client(address=_ipc_address, family="AF_UNIX", authkey=_ipc_authkey) conn.send(message) conn.close() # NOTE: Kinda important as this prevents overloading the UI thread time.sleep(0.05) def file_search(path, query): try: for _path, _dir, _files in os.walk(path, topdown = True): for file in _files: if query in file.lower(): target = os.path.join(_path, file) data = f"SEARCH|{ts}|{json.dumps([target, file])}" send_ipc_message(data) except Exception as e: print("Couldn't traverse to path. Might be permissions related...") traceback.print_exc() def grep_search(target=None, query=None): if not query or not target: return # NOTE: -n = provide line numbers, -R = Search recursive in given target # -i = insensitive, -F = don't do regex parsing. (Treat as raw string) command = ["grep", "-n", "-R", "-i", "-F", query, target] proc = subprocess.Popen(command, stdout=subprocess.PIPE, encoding="utf-8") raw_data = proc.communicate()[0].strip() proc_data = raw_data.split("\n") # NOTE: Will return data AFTER completion (if any) collection = {} for line in proc_data: try: parts = line.split(":", 2) if not len(parts) == 3: continue file, line_no, data = parts b64_file = base64.urlsafe_b64encode(file.encode('utf-8')).decode('utf-8') b64_data = base64.urlsafe_b64encode(data.encode('utf-8')).decode('utf-8') if b64_file in collection.keys(): collection[f"{b64_file}"][f"{line_no}"] = b64_data else: collection[f"{b64_file}"] = {} collection[f"{b64_file}"] = { f"{line_no}": b64_data} except Exception as e: traceback.print_exc() data = f"GREP|{ts}|{json.dumps(collection, separators=(',', ':'), indent=4)}" send_ipc_message(data) collection = {} def search(args): if args.type == "file_search": file_search(args.dir, args.query.lower()) if args.type == "grep_search": grep_search(args.dir, args.query.encode("utf-8")) if __name__ == "__main__": try: setproctitle('SolarFM: File Search - Grepy') parser = argparse.ArgumentParser() # Add long and short arguments parser.add_argument("--type", "-t", default=None, help="Type of search to do.") parser.add_argument("--dir", "-d", default=None, help="Directory root for search type.") parser.add_argument("--query", "-q", default=None, help="Query search is working against.") # Read arguments (If any...) args = parser.parse_args() search(args) data = f"SEARCH_DONE|{ts}|0" send_ipc_message(data) except Exception as e: traceback.print_exc() data = f"SEARCH_DONE|{ts}|1" send_ipc_message(data)