Major completion provider overhaul; pluigin load and pattern improvements; css overhaul/cleanup; source view state modes added

This commit is contained in:
2026-02-14 21:36:06 -06:00
parent 5df23abb10
commit 80a4620724
157 changed files with 3121 additions and 650 deletions

View File

@@ -0,0 +1,3 @@
"""
Pligin Module
"""

View File

@@ -0,0 +1,3 @@
"""
Pligin Package
"""

View File

@@ -0,0 +1,182 @@
# Python imports
import traceback
import os
import threading
import time
import json
import base64
from multiprocessing.connection import Client
from multiprocessing.connection import Listener
# Lib imports
import gi
from gi.repository import GLib
# Application imports
from .lsp_message_structs import LSPResponseRequest
from .lsp_message_structs import LSPResponseNotification, LSPIDResponseNotification
from .lsp_message_structs import get_message_obj
class ClientIPC:
""" Create a Messenger so talk to LSP Manager. """
def __init__(self, ipc_address: str = '127.0.0.1', conn_type: str = "socket"):
self.is_ipc_alive = False
self._ipc_port = 4848
self._ipc_address = ipc_address
self._conn_type = conn_type
self._ipc_authkey = b'' + bytes(f'lsp-client-endpoint-ipc', 'utf-8')
self._manager_ipc_authkey = b'' + bytes(f'lsp-manager-endpoint-ipc', 'utf-8')
self._ipc_timeout = 15.0
self._event_system = None
if conn_type == "socket":
self._ipc_address = f'/tmp/lsp-client-endpoint-ipc.sock'
self._manager_ipc_address = f'/tmp/lsp-manager-endpoint-ipc.sock'
elif conn_type == "full_network":
self._ipc_address = '0.0.0.0'
elif conn_type == "full_network_unsecured":
self._ipc_authkey = None
self._ipc_address = '0.0.0.0'
elif conn_type == "local_network_unsecured":
self._ipc_authkey = None
def set_event_system(self, event_system):
self._event_system = event_system
def create_ipc_listener(self) -> None:
if self._conn_type == "socket":
if os.path.exists(self._ipc_address) and settings_manager.is_dirty_start():
os.unlink(self._ipc_address)
listener = Listener(address=self._ipc_address, family="AF_UNIX", authkey=self._ipc_authkey)
elif "unsecured" not in self._conn_type:
listener = Listener((self._ipc_address, self._ipc_port), authkey=self._ipc_authkey)
else:
listener = Listener((self._ipc_address, self._ipc_port))
self.is_ipc_alive = True
self._run_ipc_loop(listener)
@daemon_threaded
def _run_ipc_loop(self, listener) -> None:
# NOTE: Not thread safe if using with Gtk. Need to import GLib and use idle_add
while self.is_ipc_alive:
try:
conn = listener.accept()
start_time = time.perf_counter()
self._handle_ipc_message(conn, start_time)
except Exception as e:
logger.debug( traceback.print_exc() )
listener.close()
def _handle_ipc_message(self, conn, start_time) -> None:
while self.is_ipc_alive:
msg = conn.recv()
if "MANAGER|" in msg:
data = msg.split("MANAGER|")[1].strip()
if data:
data_str = base64.b64decode(data.encode("utf-8")).decode("utf-8")
lsp_response = None
keys = None
try:
lsp_response = json.loads(data_str)
keys = lsp_response.keys()
except Exception as e:
raise e
if "result" in keys:
lsp_response = LSPResponseRequest(**get_message_obj(data_str))
if "method" in keys:
lsp_response = LSPResponseNotification( **get_message_obj(data_str) ) if not "id" in keys else LSPIDResponseNotification( **get_message_obj(data_str) )
if "notification" in keys:
...
if "response" in keys:
...
if "ignorable" in keys:
...
if lsp_response:
GLib.idle_add(self._do_emit, lsp_response)
conn.close()
break
if msg in ['close connection', 'close server']:
conn.close()
break
# NOTE: Not perfect but insures we don't lock up the connection for too long.
end_time = time.perf_counter()
if (end_time - start_time) > self._ipc_timeout:
conn.close()
break
def _do_emit(self, lsp_response):
self._event_system.emit("handle-lsp-message", (lsp_response,))
def send_manager_ipc_message(self, message: str) -> None:
try:
if self._conn_type == "socket":
if not os.path.exists(self._manager_ipc_address):
logger.error(f"Socket: {self._manager_ipc_address} doesn't exist. NOT sending message...")
return
conn = Client(address=self._manager_ipc_address, family="AF_UNIX", authkey=self._manager_ipc_authkey)
elif "unsecured" not in self._conn_type:
conn = Client((self._ipc_address, self._ipc_port), authkey=self._ipc_authkey)
else:
conn = Client((self._ipc_address, self._ipc_port))
conn.send( f"CLIENT|{ base64.b64encode(message.encode('utf-8')).decode('utf-8') }" )
conn.close()
except ConnectionRefusedError as e:
logger.error("Connection refused...")
except Exception as e:
logger.error( repr(e) )
def send_ipc_message(self, message: str = "Empty Data...") -> None:
try:
if self._conn_type == "socket":
conn = Client(address=self._ipc_address, family="AF_UNIX", authkey=self._ipc_authkey)
elif "unsecured" not in self._conn_type:
conn = Client((self._ipc_address, self._ipc_port), authkey=self._ipc_authkey)
else:
conn = Client((self._ipc_address, self._ipc_port))
conn.send(message)
conn.close()
except ConnectionRefusedError as e:
logger.error("Connection refused...")
except Exception as e:
logger.error( repr(e) )
def send_test_ipc_message(self, message: str = "Empty Data...") -> None:
try:
if self._conn_type == "socket":
conn = Client(address=self._ipc_address, family="AF_UNIX", authkey=self._ipc_authkey)
elif "unsecured" not in self._conn_type:
conn = Client((self._ipc_address, self._ipc_port), authkey=self._ipc_authkey)
else:
conn = Client((self._ipc_address, self._ipc_port))
conn.send(message)
conn.close()
except ConnectionRefusedError as e:
if self._conn_type == "socket":
logger.error("LSP Socket no longer valid.... Removing.")
os.unlink(self._ipc_address)
except Exception as e:
logger.error( repr(e) )

View File

@@ -0,0 +1,7 @@
{
"lsp_manager_start_command": ["python", "/opt/lsp-manager.zip"],
"websocket": {
"host": "localhost",
"port": 8765
}
}

View File

@@ -0,0 +1,54 @@
# Python imports
from dataclasses import dataclass
import json
# Lib imports
# Application imports
def get_message_obj(data: str):
return json.loads(data)
@dataclass
class LSPResponseRequest(object):
"""
Constructs a new LSP Response Request instance.
:param id result: The id of the given message.
:param dict result: The arguments of the given method.
"""
jsonrpc: str
id: int
result: dict
@dataclass
class LSPResponseNotification(object):
"""
Constructs a new LSP Response Notification instance.
:param str method: The type of lsp notification being made.
:params dict result: The arguments of the given method.
"""
jsonrpc: str
method: str
params: dict
@dataclass
class LSPIDResponseNotification(object):
"""
Constructs a new LSP Response Notification instance.
:param str method: The type of lsp notification being made.
:params dict result: The arguments of the given method.
"""
jsonrpc: str
id: int
method: str
params: dict
class LSPResponseTypes(LSPResponseRequest, LSPResponseNotification, LSPIDResponseNotification):
...

View File

@@ -0,0 +1,16 @@
{
"name": "LSP Client",
"author": "ITDominator",
"version": "0.0.1",
"credit": "",
"support": "",
"requests": {
"pass_events": true,
"pass_ui_objects": [
"separator_right"
],
"bind_keys": [
"LSP Client Toggle||tggl_lsp_window:<Control>l"
]
}
}

View File

@@ -0,0 +1,270 @@
# Python imports
import signal
import subprocess
import json
# Lib imports
import gi
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk
# Application imports
from plugins.plugin_base import PluginBase
from .client_ipc import ClientIPC
class Plugin(PluginBase):
def __init__(self):
super().__init__()
self.name = "LSP Client" # NOTE: Need to remove after establishing private bidirectional 1-1 message bus
# where self.name should not be needed for message comms
self.config_file = "config.json"
self.config: dict = {}
self.lsp_client_proc = None
self.lsp_window = None
def generate_reference_ui_element(self):
...
def run(self):
try:
with open(self.config_file) as f:
self.config = json.load(f)
except Exception as e:
raise Exception(f"Couldn't load config.json...\n{repr(e)}")
self.lsp_window = Gtk.Window()
box1 = Gtk.Box()
box2 = Gtk.Box()
start_btn = Gtk.Button(label = "Start LSP Client")
stop_btn = Gtk.Button(label = "Stop LSP Client")
pid_label = Gtk.Label(label = "LSP PID: ")
box1.set_orientation( Gtk.Orientation.VERTICAL )
self.lsp_window.set_deletable(False)
self.lsp_window.set_skip_pager_hint(True)
self.lsp_window.set_skip_taskbar_hint(True)
self.lsp_window.set_title("LSP Manager")
self.lsp_window.set_size_request(480, 320)
start_btn.connect("clicked", self.start_lsp_manager)
stop_btn.connect("clicked", self.stop_lsp_manager)
box1.add(pid_label)
box2.add(start_btn)
box2.add(stop_btn)
box1.add(box2)
self.lsp_window.add(box1)
box1.show_all()
self.inner_subscribe_to_events()
def _shutting_down(self):
self.stop_lsp_manager()
def _tear_down(self, widget, eve):
return True
def _tggl_lsp_window(self, widget = None):
if not self.lsp_window.is_visible():
self.lsp_window.show()
else:
self.lsp_window.hide()
def subscribe_to_events(self):
self._event_system.subscribe("tggl_lsp_window", self._tggl_lsp_window)
def inner_subscribe_to_events(self):
self._event_system.subscribe("shutting_down", self._shutting_down)
self._event_system.subscribe("textDocument/didOpen", self._lsp_did_open)
self._event_system.subscribe("textDocument/didSave", self._lsp_did_save)
self._event_system.subscribe("textDocument/didClose", self._lsp_did_close)
self._event_system.subscribe("textDocument/didChange", self._lsp_did_change)
self._event_system.subscribe("textDocument/definition", self._lsp_goto)
self._event_system.subscribe("textDocument/completion", self._lsp_completion)
def start_lsp_manager(self, button):
if self.lsp_client_proc: return
self.lsp_client_proc = subprocess.Popen(self.config["lsp_manager_start_command"])
self._load_client_ipc_server()
def _load_client_ipc_server(self):
self.client_ipc = ClientIPC()
self.client_ipc.set_event_system(self._event_system)
self._ipc_realization_check(self.client_ipc)
if not self.client_ipc.is_ipc_alive:
raise AppLaunchException(f"LSP IPC Server Already Exists...")
def _ipc_realization_check(self, ipc_server):
try:
ipc_server.create_ipc_listener()
except Exception:
ipc_server.send_test_ipc_message()
try:
ipc_server.create_ipc_listener()
except Exception as e:
...
def stop_lsp_manager(self, button = None):
if not self.lsp_client_proc: return
if not self.lsp_client_proc.poll() is None:
self.lsp_client_proc = None
return
self.lsp_client_proc.terminate()
self.client_ipc.is_ipc_alive = False
self.lsp_client_proc = None
def _lsp_did_open(self, language_id: str, uri: str, text: str):
if not self.lsp_client_proc: return
data = {
"method": "textDocument/didOpen",
"language_id": language_id,
"uri": uri,
"version": -1,
"text": text,
"line": -1,
"column": -1,
"char": ""
}
self.send_message(data)
def _lsp_did_save(self, uri: str, text: str):
if not self.lsp_client_proc: return
data = {
"method": "textDocument/didSave",
"language_id": "",
"uri": uri,
"version": -1,
"text": text,
"line": -1,
"column": -1,
"char": ""
}
self.send_message(data)
def _lsp_did_close(self, uri: str):
if not self.lsp_client_proc: return
data = {
"method": "textDocument/didClose",
"language_id": "",
"uri": uri,
"version": -1,
"text": "",
"line": -1,
"column": -1,
"char": ""
}
self.send_message(data)
def _lsp_did_change(self, language_id: str, uri: str, buffer):
if not self.lsp_client_proc: return
iter = buffer.get_iter_at_mark( buffer.get_insert() )
line = iter.get_line()
column = iter.get_line_offset()
start, end = buffer.get_bounds()
text = buffer.get_text(start, end, include_hidden_chars = True)
data = {
"method": "textDocument/didChange",
"language_id": language_id,
"uri": uri,
"version": buffer.version_id,
"text": text,
"line": line,
"column": column,
"char": ""
}
self.send_message(data)
# def _lsp_did_change(self, language_id: str, uri: str, buffer):
# if not self.lsp_client_proc: return
# iter = buffer.get_iter_at_mark( buffer.get_insert() )
# line = iter.get_line()
# column = iter.get_line_offset()
# start = iter.copy()
# end = iter.copy()
# start.backward_line()
# start.forward_line()
# end.forward_line()
# text = buffer.get_text(start, end, include_hidden_chars = True)
# data = {
# "method": "textDocument/didChange",
# "language_id": language_id,
# "uri": uri,
# "version": buffer.version_id,
# "text": text,
# "line": line,
# "column": column,
# "char": ""
# }
# self.send_message(data)
def _lsp_goto(self, language_id: str, uri: str, line: int, column: int):
if not self.lsp_client_proc: return
data = {
"method": "textDocument/definition",
"language_id": language_id,
"uri": uri,
"version": -1,
"text": "",
"line": line,
"column": column,
"char": ""
}
self.send_message(data)
def _lsp_completion(self, source_view):
if not self.lsp_client_proc: return
filepath = source_view.get_current_file()
if not filepath: return
uri = filepath.get_uri()
buffer = source_view.get_buffer()
iter = buffer.get_iter_at_mark( buffer.get_insert() )
line = iter.get_line()
column = iter.get_line_offset()
char = iter.get_char()
data = {
"method": "textDocument/completion",
"language_id": source_view.get_filetype(),
"uri": uri,
"version": source_view.get_version_id(),
"text": "",
"line": line,
"column": column,
"char": char
}
self.send_message(data)
def send_message(self, data: dict):
self.client_ipc.send_manager_ipc_message( json.dumps(data) )