Newton_Editor/plugins/gtksourceview/lsp_manager/client_ipc.py

173 lines
6.3 KiB
Python
Raw Normal View History

# Python imports
import os
import threading
import time
import json
import base64
from multiprocessing.connection import Client
from multiprocessing.connection import Listener
# Lib imports
# Application imports
from .lsp_message_structs import LSPResponseRequest, LSPResponseNotification, get_message_obj
class ClientIPC:
""" Create a Messenger so talk to LSP Manager. """
def __init__(self, ipc_address: str = '127.0.0.1', conn_type: str = "socket"):
self.is_ipc_alive = False
self._ipc_port = 4848
self._ipc_address = ipc_address
self._conn_type = conn_type
self._ipc_authkey = b'' + bytes(f'lsp-client-endpoint-ipc', 'utf-8')
self._manager_ipc_authkey = b'' + bytes(f'lsp-manager-endpoint-ipc', 'utf-8')
self._ipc_timeout = 15.0
2024-09-15 05:26:39 +00:00
self._event_system = None
if conn_type == "socket":
self._ipc_address = f'/tmp/lsp-client-endpoint-ipc.sock'
self._manager_ipc_address = f'/tmp/lsp-manager-endpoint-ipc.sock'
elif conn_type == "full_network":
self._ipc_address = '0.0.0.0'
elif conn_type == "full_network_unsecured":
self._ipc_authkey = None
self._ipc_address = '0.0.0.0'
elif conn_type == "local_network_unsecured":
self._ipc_authkey = None
2024-09-15 05:26:39 +00:00
def set_event_system(self, event_system):
self._event_system = event_system
def create_ipc_listener(self) -> None:
if self._conn_type == "socket":
if os.path.exists(self._ipc_address) and settings_manager.is_dirty_start():
os.unlink(self._ipc_address)
listener = Listener(address=self._ipc_address, family="AF_UNIX", authkey=self._ipc_authkey)
elif "unsecured" not in self._conn_type:
listener = Listener((self._ipc_address, self._ipc_port), authkey=self._ipc_authkey)
else:
listener = Listener((self._ipc_address, self._ipc_port))
self.is_ipc_alive = True
self._run_ipc_loop(listener)
@daemon_threaded
def _run_ipc_loop(self, listener) -> None:
# NOTE: Not thread safe if using with Gtk. Need to import GLib and use idle_add
while self.is_ipc_alive:
try:
conn = listener.accept()
start_time = time.perf_counter()
self._handle_ipc_message(conn, start_time)
except Exception as e:
logger.debug( repr(e) )
listener.close()
def _handle_ipc_message(self, conn, start_time) -> None:
while self.is_ipc_alive:
msg = conn.recv()
if "MANAGER|" in msg:
data = msg.split("MANAGER|")[1].strip()
2024-09-15 05:26:39 +00:00
if data:
data_str = base64.b64decode(data.encode("utf-8")).decode("utf-8")
lsp_response = None
keys = None
2024-09-15 05:26:39 +00:00
logger.debug(data_str)
try:
lsp_response = json.loads(data_str)
keys = lsp_response.keys()
except Exception as e:
2024-09-15 05:26:39 +00:00
raise e
if "result" in keys:
2024-09-15 05:26:39 +00:00
lsp_response = LSPResponseRequest(**get_message_obj(data_str))
if "method" in keys:
2024-09-15 05:26:39 +00:00
lsp_response = LSPResponseNotification(**get_message_obj(data_str))
if "notification" in keys:
...
if "response" in keys:
...
if "ignorable" in keys:
...
if lsp_response:
2024-09-15 05:26:39 +00:00
self._event_system.emit("handle-lsp-message", (lsp_response,))
conn.close()
break
if msg in ['close connection', 'close server']:
conn.close()
break
# NOTE: Not perfect but insures we don't lock up the connection for too long.
end_time = time.perf_counter()
if (end_time - start_time) > self._ipc_timeout:
conn.close()
break
def send_manager_ipc_message(self, message: str) -> None:
try:
if self._conn_type == "socket":
conn = Client(address=self._manager_ipc_address, family="AF_UNIX", authkey=self._manager_ipc_authkey)
elif "unsecured" not in self._conn_type:
conn = Client((self._ipc_address, self._ipc_port), authkey=self._ipc_authkey)
else:
conn = Client((self._ipc_address, self._ipc_port))
conn.send( f"CLIENT|{ base64.b64encode(message.encode("utf-8")).decode("utf-8") }" )
conn.close()
except ConnectionRefusedError as e:
logger.error("Connection refused...")
except Exception as e:
logger.error( repr(e) )
def send_ipc_message(self, message: str = "Empty Data...") -> None:
try:
if self._conn_type == "socket":
conn = Client(address=self._ipc_address, family="AF_UNIX", authkey=self._ipc_authkey)
elif "unsecured" not in self._conn_type:
conn = Client((self._ipc_address, self._ipc_port), authkey=self._ipc_authkey)
else:
conn = Client((self._ipc_address, self._ipc_port))
conn.send(message)
conn.close()
except ConnectionRefusedError as e:
logger.error("Connection refused...")
except Exception as e:
logger.error( repr(e) )
def send_test_ipc_message(self, message: str = "Empty Data...") -> None:
try:
if self._conn_type == "socket":
conn = Client(address=self._ipc_address, family="AF_UNIX", authkey=self._ipc_authkey)
elif "unsecured" not in self._conn_type:
conn = Client((self._ipc_address, self._ipc_port), authkey=self._ipc_authkey)
else:
conn = Client((self._ipc_address, self._ipc_port))
conn.send(message)
conn.close()
except ConnectionRefusedError as e:
if self._conn_type == "socket":
logger.error("LSP Socket no longer valid.... Removing.")
os.unlink(self._ipc_address)
except Exception as e:
2024-09-15 05:26:39 +00:00
logger.error( repr(e) )