Moved LSP logic to controller class

This commit is contained in:
itdominator 2024-09-08 23:22:33 -05:00
parent 2cdb7e7321
commit 08d8656d9c
4 changed files with 237 additions and 196 deletions

View File

@ -0,0 +1,212 @@
# Python imports
import os
import signal
import json
import subprocess
import threading
# Lib imports
from gi.repository import GLib
# Application imports
from libs.dto.lsp_message_structs import MessageEncoder, LSPRequest, LSPNotification, LSPResponse
LEN_HEADER = "Content-Length: "
TYPE_HEADER = "Content-Type: "
class LSPController:
def __init__(self):
super(LSPController).__init__()
self._start_command = None
self._lsp_pid = -1
self._message_id = 0
# https://github.com/microsoft/multilspy/tree/main/src/multilspy/language_servers
# initialize-params-slim.json was created off of jedi_language_server one
self._lsp_init_ops = settings_manager.get_lsp_init_data()
self.read_lock = threading.Lock()
self.write_lock = threading.Lock()
def set_log_list(self, log_list):
self.log_list = log_list
def set_start_command(self, start_command: []):
self._start_command = start_command
def unset_start_command(self):
self._start_command = None
def send_initialize_message(self, init_ops: str, workspace_file: str, workspace_uri: str):
folder_name = os.path.basename(workspace_file)
self._lsp_init_ops["processId"] = settings_manager.get_app_pid()
self._lsp_init_ops["rootPath"] = workspace_file
self._lsp_init_ops["rootUri"] = workspace_uri
self._lsp_init_ops["workspaceFolders"] = [
{
"name": folder_name,
"uri": workspace_uri
}
]
self._lsp_init_ops["initializationOptions"] = json.loads(init_ops)
self.send_request("initialize", self._lsp_init_ops)
def send_initialized_message(self):
self.send_notification("initialized")
def send_notification(self, method: str, params: {} = {}):
self._send_message( LSPNotification(method, params) )
def send_request(self, method: str, params: {} = {}):
self._monitor_lsp_response()
self._send_message( LSPRequest(self._message_id, method, params) )
self._message_id += 1
def _send_message(self, data: LSPRequest or LSPNotification):
if not data or not hasattr(self, "lsp_process"): return
message_str = json.dumps(data, cls = MessageEncoder)
message_size = len(message_str)
message = f"Content-Length: {message_size}\r\n\r\n{message_str}"
self.log_list.add_log_entry("Client", data)
with self.write_lock:
self.lsp_process.stdin.write( message.encode("utf-8") )
self.lsp_process.stdin.flush()
def button_send_notification(self):
message = json.loads( self.lsp_msg_src_vw.get_text_str() )
self.send_notification(message["method"], message["params"])
def button_send_request(self):
message = json.loads( self.lsp_msg_src_vw.get_text_str() )
self.send_request(message["method"], message["params"])
def get_message_id(self) -> int:
return self._message_id
def start_stop_lsp(self):
if self._lsp_pid == -1:
pid = self.start_lsp()
if not pid: return
self._lsp_pid = pid
self._monitor_lsp_response()
else:
self.stop_lsp()
self.log_list.clear()
def start_lsp(self):
if not self._start_command: return
try:
self.lsp_process = subprocess.Popen(
self._start_command,
stdout = subprocess.PIPE,
stdin = subprocess.PIPE
)
except Exception as e:
self.log_list.add_log_entry(
"LSP Client Error",
LSPResponse(
None,
{
"error": repr(e)
}
)
)
return
return self.lsp_process.pid
def stop_lsp(self):
if self._lsp_pid == -1: return
self._lsp_pid = -1
self._message_id = 0
self.lsp_process.terminate()
# https://github.com/sr-lab/coqpyt/blob/master/coqpyt/lsp/json_rpc_endpoint.py#L65
# Virtually this whole method unabashedly taken from ^ ...
@daemon_threaded
def _monitor_lsp_response(self):
if not hasattr(self, "lsp_process"): return
with self.read_lock:
message_size = None
while True:
line = self.lsp_process.stdout.readline()
if not line: return None # Quit listener...
line = line.decode("utf-8")
if not line.endswith("\r\n"):
raise Exception(
"Bad header: missing newline"
)
line = line[:-2] # Strip the "\r\n"
if line == "": # We're done with the headers...
break
elif line.startswith(LEN_HEADER):
line = line[len(LEN_HEADER) :]
if not line.isdigit():
raise Exception(
"Bad header: size is not int",
)
message_size = int(line)
elif line.startswith(TYPE_HEADER):
# Not doing anything with type header, currently...
pass
else:
line = line.split(LEN_HEADER)
if len(line) == 2:
message_size = line[1]
raise Exception(
"Bad header: unknown header"
)
if not message_size: return
data = self.lsp_process.stdout.read(message_size)
jsonrpc_res = data.decode("utf-8")
lsp_response = json.loads( jsonrpc_res )
response_id = -1
if not lsp_response: return
if "id" in lsp_response.keys(): response_id = lsp_response["id"]
GLib.idle_add(self.handle_lsp_response, LSPResponse(response_id, lsp_response))
def handle_lsp_response(self, lsp_response: LSPResponse):
self.log_list.add_log_entry("LSP Response", lsp_response)
result = lsp_response.result
keys = result.keys()
if "error" in keys:
lsp_response = result["error"]
logger.debug(f"LSP Error Code: {result['code']}")
logger.debug(f"LSP Error Message:\n{result['message']}")
return
if "result" in keys:
result = result["result"]
if isinstance(result, dict):
keys = result.keys()
if "capabilities" in keys:
...
if isinstance(result, list):
...
if isinstance(result, tuple):
...

View File

@ -61,7 +61,6 @@ class BottomButtonBox(Gtk.Box):
self.full_init_btn
]:
self.add(child)
# self.set_child_secondary(child, True)
child.set_always_show_image(True)
child.set_image( Gtk.Image.new_from_icon_name("gtk-media-play", Gtk.IconSize.BUTTON) )
@ -101,7 +100,7 @@ class BottomButtonBox(Gtk.Box):
self.rest_buttons()
self.get_parent().start_stop_lsp()
if not hasattr(self.get_parent(), "lsp_process"):
if not hasattr(self.get_parent().lsp_controller, "lsp_process"):
self.rest_buttons()
return
@ -109,7 +108,7 @@ class BottomButtonBox(Gtk.Box):
button.hide()
self.start_stop_lsp(self.start_stop_lsp_btn)
if not hasattr(self.get_parent(), "lsp_process"):
if not hasattr(self.get_parent().lsp_controller, "lsp_process"):
self.rest_buttons()
return

View File

@ -1,45 +1,25 @@
# Python imports
import os
import signal
import json
import subprocess
import threading
# Lib imports
import gi
gi.require_version('Gtk', '3.0')
gi.require_version('GtkSource', '4')
from gi.repository import Gtk
from gi.repository import GLib
from gi.repository import GtkSource
# Application imports
# from libs.dto.lsp_structs import TextDocumentItem
from libs.dto.lsp_message_structs import MessageEncoder, LSPRequest, LSPNotification, LSPResponse
from core.controllers.lsp_controller import LSPController
from .buttons.top_button_box import TopButtonBox
from .enteries.lsp_message_source_view import LspMessageSourceView
from .buttons.bottom_button_box import BottomButtonBox
LEN_HEADER = "Content-Length: "
TYPE_HEADER = "Content-Type: "
class LSPMessageBox(Gtk.Box):
def __init__(self):
def __init__(self, log_list):
super(LSPMessageBox, self).__init__()
self._lsp_pid = -1
self._message_id = 0
# https://github.com/microsoft/multilspy/tree/main/src/multilspy/language_servers
# initialize-params-slim.json was created off of jedi_language_server one
self._lsp_init_ops = settings_manager.get_lsp_init_data()
self.read_lock = threading.Lock()
self.write_lock = threading.Lock()
self.lsp_controller = LSPController()
self.lsp_controller.set_log_list(log_list)
self._setup_styling()
self._setup_signals()
@ -77,189 +57,39 @@ class LSPMessageBox(Gtk.Box):
self.add(bottom_buttons)
def update_message_id_label(self):
self.top_buttons.update_message_id_lbl(self._message_id)
self.top_buttons.update_message_id_lbl(
self.lsp_controller.get_message_id()
)
def send_initialize_message(self):
parent = self.get_parent()
init_ops = parent.init_ops_src_vw.get_text_str()
workspace_file = self.top_buttons.get_workspace_path()
workspace_uri = self.top_buttons.get_workspace_uri()
folder_name = os.path.basename(workspace_file)
self._lsp_init_ops["processId"] = settings_manager.get_app_pid()
self._lsp_init_ops["rootPath"] = workspace_file
self._lsp_init_ops["rootUri"] = workspace_uri
self._lsp_init_ops["workspaceFolders"] = [
{
"name": folder_name,
"uri": workspace_uri
}
]
self._lsp_init_ops["initializationOptions"] = json.loads(init_ops)
self.send_request("initialize", self._lsp_init_ops)
del init_ops
del workspace_file
del workspace_uri
del folder_name
del parent
def send_initialized_message(self):
self.send_notification("initialized")
def send_notification(self, method: str, params: {} = {}):
self._send_message( LSPNotification(method, params) )
def send_request(self, method: str, params: {} = {}):
self._monitor_lsp_response()
self._send_message( LSPRequest(self._message_id, method, params) )
self._message_id += 1
self.lsp_controller.send_initialize_message(init_ops, workspace_file, workspace_uri)
self.update_message_id_label()
def _send_message(self, data: LSPRequest or LSPNotification):
if not data or not hasattr(self, "lsp_process"): return
message_str = json.dumps(data, cls = MessageEncoder)
message_size = len(message_str)
message = f"Content-Length: {message_size}\r\n\r\n{message_str}"
self.get_parent().log_list.add_log_entry("Client", data)
with self.write_lock:
self.lsp_process.stdin.write( message.encode("utf-8") )
self.lsp_process.stdin.flush()
del message_str
del message_size
del message
def send_initialized_message(self):
self.lsp_controller.send_initialized_message()
def button_send_notification(self):
message = json.loads( self.lsp_msg_src_vw.get_text_str() )
self.send_notification(message["method"], message["params"])
self.lsp_controller.send_notification(message["method"], message["params"])
def button_send_request(self):
message = json.loads( self.lsp_msg_src_vw.get_text_str() )
self.send_request(message["method"], message["params"])
def start_stop_lsp(self):
if self._lsp_pid == -1:
pid = self.start_lsp()
if not pid: return
self._lsp_pid = pid
self._monitor_lsp_response()
else:
self.stop_lsp()
self.get_parent().log_list.clear()
def start_lsp(self):
parent = self.get_parent()
_command: str = parent.alt_command_entry.get_text()
# _command: str = parent.command_entry.get_text()
# _command: str = parent.socket_entry.get_text()
command: [] = _command.split() if len( _command.split() ) > 0 else [ _command ]
try:
self.lsp_process = subprocess.Popen(
command,
stdout = subprocess.PIPE,
stdin = subprocess.PIPE
)
except Exception as e:
self.get_parent().log_list.add_log_entry(
"LSP Client Error",
LSPResponse(
None,
{
"error": repr(e)
}
)
)
return
return self.lsp_process.pid
def stop_lsp(self):
if self._lsp_pid == -1: return
self._lsp_pid = -1
self._message_id = 0
self.lsp_process.terminate()
self.lsp_controller.send_request(message["method"], message["params"])
self.update_message_id_label()
# https://github.com/sr-lab/coqpyt/blob/master/coqpyt/lsp/json_rpc_endpoint.py#L65
# Virtually this whole method unabashedly taken from ^ ...
@daemon_threaded
def _monitor_lsp_response(self):
if not hasattr(self, "lsp_process"): return
def start_stop_lsp(self):
parent = self.get_parent()
_command: str = parent.alt_command_entry.get_text()
# _command: str = parent.command_entry.get_text()
# _command: str = parent.socket_entry.get_text()
command: [] = _command.split() if len( _command.split() ) > 0 else [ _command ]
with self.read_lock:
message_size = None
while True:
line = self.lsp_process.stdout.readline()
if not line: return None # Quit listener...
line = line.decode("utf-8")
if not line.endswith("\r\n"):
raise Exception(
"Bad header: missing newline"
)
line = line[:-2] # Strip the "\r\n"
if line == "": # We're done with the headers...
break
elif line.startswith(LEN_HEADER):
line = line[len(LEN_HEADER) :]
if not line.isdigit():
raise Exception(
"Bad header: size is not int",
)
message_size = int(line)
elif line.startswith(TYPE_HEADER):
# Not doing anything with type header, currently...
pass
else:
line = line.split(LEN_HEADER)
if len(line) == 2:
message_size = line[1]
raise Exception(
"Bad header: unknown header"
)
if not message_size: return
data = self.lsp_process.stdout.read(message_size)
jsonrpc_res = data.decode("utf-8")
lsp_response = json.loads( jsonrpc_res )
response_id = -1
if not lsp_response: return
if "id" in lsp_response.keys(): response_id = lsp_response["id"]
GLib.idle_add(self.handle_lsp_response, LSPResponse(response_id, lsp_response))
def handle_lsp_response(self, lsp_response: LSPResponse):
self.get_parent().log_list.add_log_entry("LSP Response", lsp_response)
result = lsp_response.result
keys = result.keys()
if "error" in keys:
lsp_response = result["error"]
logger.debug(f"LSP Error Code: {result['code']}")
logger.debug(f"LSP Error Message:\n{result['message']}")
return
if "result" in keys:
result = result["result"]
if isinstance(result, dict):
keys = result.keys()
if "capabilities" in keys:
...
if isinstance(result, list):
...
if isinstance(result, tuple):
...
self.lsp_controller.set_start_command(command)
self.lsp_controller.start_stop_lsp()
self.lsp_controller.unset_start_command()
self.update_message_id_label()

View File

@ -65,7 +65,7 @@ class LSPUI(Gtk.Grid):
self.command_entry = CommandEntry( self._data["command"] )
self.socket_entry = SocketEntry( self._data["socket"] )
self.init_ops_src_vw = InitOptionsSourceView( self._data["initialization-options"] )
self.message_box = LSPMessageBox()
self.message_box = LSPMessageBox(self.log_list)
init_options_lbl.set_margin_top(10)
message_box_lbl.set_margin_top(10)