SolarFM/plugins/searcher/utils/search.py

133 lines
4.1 KiB
Python
Executable File

#!/usr/bin/python3
# Python imports
import os
import traceback
import argparse
import subprocess
import json
import base64
import time
from datetime import datetime
from setproctitle import setproctitle
from multiprocessing.connection import Client
# Lib imports
# Application imports
_ipc_address = f'/tmp/solarfm-search_grep-ipc.sock'
_ipc_authkey = b'' + bytes(f'solarfm-search_grep-ipc', 'utf-8')
filter = (".cpp", ".css", ".c", ".go", ".html", ".htm", ".java", ".js", ".json", ".lua", ".md", ".py", ".rs", ".toml", ".xml", ".pom") + \
(".txt", ".text", ".sh", ".cfg", ".conf", ".log")
# NOTE: Create timestamp of when this launched. Is used in IPC to see if
# we are stale and that new call didn't fully kill this or older processes.
dt = datetime.now()
ts = datetime.timestamp(dt)
def _log(message: str = "No message passed in...") -> None:
print(message)
def send_ipc_message(message) -> None:
conn = Client(address = _ipc_address, family = "AF_UNIX", authkey = _ipc_authkey)
conn.send(message)
conn.close()
# NOTE: Kinda important as this prevents overloading the UI thread
time.sleep(0.05)
def file_search(path: str = None, query: str = None) -> None:
if not path or not query: return
try:
for _path, _dir, _files in os.walk(path, topdown = True, onerror = _log, followlinks = True):
for file in _files:
if query in file.lower():
target = os.path.join(_path, file)
data = f"SEARCH|{ts}|{json.dumps([target, file])}"
send_ipc_message(data)
except Exception as e:
print("Couldn't traverse to path. Might be permissions related...")
traceback.print_exc()
def grep_search(target: str = None, query: str = None):
if not target or not query: return
# NOTE: -n = provide line numbers, -R = Search recursive in given target
# -i = insensitive, -F = don't do regex parsing. (Treat as raw string)
command = ["grep", "-n", "-R", "-i", "-F", query, target]
proc = subprocess.Popen(command, stdout = subprocess.PIPE, encoding = "utf-8")
raw_data = proc.communicate()[0].strip()
proc_data = raw_data.split("\n") # NOTE: Will return data AFTER completion (if any)
collection = {}
for line in proc_data:
try:
parts = line.split(":", 2)
if not len(parts) == 3:
continue
file, line_no, data = parts
b64_file = base64.urlsafe_b64encode(file.encode('utf-8')).decode('utf-8')
b64_data = base64.urlsafe_b64encode(data.encode('utf-8')).decode('utf-8')
if b64_file in collection.keys():
collection[f"{b64_file}"][f"{line_no}"] = b64_data
else:
collection[f"{b64_file}"] = {}
collection[f"{b64_file}"] = { f"{line_no}": b64_data}
except Exception as e:
traceback.print_exc()
proc.terminate()
data = f"GREP|{ts}|{json.dumps(collection, separators=(',', ':'), indent=4)}"
send_ipc_message(data)
collection = {}
def search(args):
path = args.dir
if (path[0] == "'" and path[-1] == "'") or \
path[0] == '"' and path[-1] == '"':
path = path[1:-1]
if args.type == "file_search":
file_search(path, args.query.lower())
if args.type == "grep_search":
grep_search(path, args.query.encode("utf-8"))
if __name__ == "__main__":
try:
setproctitle('SolarFM: File Search - Grepy')
parser = argparse.ArgumentParser()
# Add long and short arguments
parser.add_argument("--type", "-t", default=None, help="Type of search to do.")
parser.add_argument("--dir", "-d", default=None, help="Directory root for search type.")
parser.add_argument("--query", "-q", default=None, help="Query search is working against.")
# Read arguments (If any...)
args = parser.parse_args()
search(args)
data = f"SEARCH_DONE|{ts}|0"
send_ipc_message(data)
except Exception as e:
traceback.print_exc()
data = f"SEARCH_DONE|{ts}|1"
send_ipc_message(data)