generated from itdominator/Python-With-Gtk-Template
Moved LSP logic to controller class
This commit is contained in:
parent
2cdb7e7321
commit
6df84e9f23
|
@ -0,0 +1,205 @@
|
||||||
|
# Python imports
|
||||||
|
import os
|
||||||
|
import signal
|
||||||
|
import json
|
||||||
|
import subprocess
|
||||||
|
import threading
|
||||||
|
|
||||||
|
# Lib imports
|
||||||
|
from gi.repository import GLib
|
||||||
|
|
||||||
|
# Application imports
|
||||||
|
from libs.dto.lsp_message_structs import MessageEncoder, LSPRequest, LSPNotification, LSPResponse
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
LEN_HEADER = "Content-Length: "
|
||||||
|
TYPE_HEADER = "Content-Type: "
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class LSPController:
|
||||||
|
def __init__(self):
|
||||||
|
super(LSPController).__init__()
|
||||||
|
|
||||||
|
# https://github.com/microsoft/multilspy/tree/main/src/multilspy/language_servers
|
||||||
|
# initialize-params-slim.json was created off of jedi_language_server one
|
||||||
|
self._init_params = settings_manager.get_lsp_init_data()
|
||||||
|
|
||||||
|
self._start_command = None
|
||||||
|
self._lsp_pid = -1
|
||||||
|
self._message_id = 0
|
||||||
|
|
||||||
|
self.read_lock = threading.Lock()
|
||||||
|
self.write_lock = threading.Lock()
|
||||||
|
|
||||||
|
|
||||||
|
def set_log_list(self, log_list):
|
||||||
|
self.log_list = log_list
|
||||||
|
|
||||||
|
def set_start_command(self, start_command: []):
|
||||||
|
self._start_command = start_command
|
||||||
|
|
||||||
|
def unset_start_command(self):
|
||||||
|
self._start_command = None
|
||||||
|
|
||||||
|
def send_initialize_message(self, init_ops: str, workspace_file: str, workspace_uri: str):
|
||||||
|
folder_name = os.path.basename(workspace_file)
|
||||||
|
|
||||||
|
self._init_params["processId"] = settings_manager.get_app_pid()
|
||||||
|
self._init_params["rootPath"] = workspace_file
|
||||||
|
self._init_params["rootUri"] = workspace_uri
|
||||||
|
self._init_params["workspaceFolders"] = [
|
||||||
|
{
|
||||||
|
"name": folder_name,
|
||||||
|
"uri": workspace_uri
|
||||||
|
}
|
||||||
|
]
|
||||||
|
|
||||||
|
self._init_params["initializationOptions"] = json.loads(init_ops)
|
||||||
|
self.send_request("initialize", self._init_params)
|
||||||
|
|
||||||
|
def send_initialized_message(self):
|
||||||
|
self.send_notification("initialized")
|
||||||
|
|
||||||
|
def send_notification(self, method: str, params: {} = {}):
|
||||||
|
self._send_message( LSPNotification(method, params) )
|
||||||
|
|
||||||
|
def send_request(self, method: str, params: {} = {}):
|
||||||
|
self._monitor_lsp_response()
|
||||||
|
self._send_message( LSPRequest(self._message_id, method, params) )
|
||||||
|
|
||||||
|
self._message_id += 1
|
||||||
|
|
||||||
|
def _send_message(self, data: LSPRequest or LSPNotification):
|
||||||
|
if not data or not hasattr(self, "lsp_process"): return
|
||||||
|
|
||||||
|
message_str = json.dumps(data, cls = MessageEncoder)
|
||||||
|
message_size = len(message_str)
|
||||||
|
message = f"Content-Length: {message_size}\r\n\r\n{message_str}"
|
||||||
|
|
||||||
|
self.log_list.add_log_entry("Client", data)
|
||||||
|
|
||||||
|
with self.write_lock:
|
||||||
|
self.lsp_process.stdin.write( message.encode("utf-8") )
|
||||||
|
self.lsp_process.stdin.flush()
|
||||||
|
|
||||||
|
def get_message_id(self) -> int:
|
||||||
|
return self._message_id
|
||||||
|
|
||||||
|
def start_stop_lsp(self):
|
||||||
|
if self._lsp_pid == -1:
|
||||||
|
pid = self.start_lsp()
|
||||||
|
if not pid: return
|
||||||
|
|
||||||
|
self._lsp_pid = pid
|
||||||
|
self._monitor_lsp_response()
|
||||||
|
else:
|
||||||
|
self.stop_lsp()
|
||||||
|
self.log_list.clear()
|
||||||
|
|
||||||
|
def start_lsp(self):
|
||||||
|
if not self._start_command: return
|
||||||
|
try:
|
||||||
|
self.lsp_process = subprocess.Popen(
|
||||||
|
self._start_command,
|
||||||
|
stdout = subprocess.PIPE,
|
||||||
|
stdin = subprocess.PIPE
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
self.log_list.add_log_entry(
|
||||||
|
"LSP Client Error",
|
||||||
|
LSPResponse(
|
||||||
|
None,
|
||||||
|
{
|
||||||
|
"error": repr(e)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
return
|
||||||
|
|
||||||
|
return self.lsp_process.pid
|
||||||
|
|
||||||
|
def stop_lsp(self):
|
||||||
|
if self._lsp_pid == -1: return
|
||||||
|
|
||||||
|
self._lsp_pid = -1
|
||||||
|
self._message_id = 0
|
||||||
|
self.lsp_process.terminate()
|
||||||
|
|
||||||
|
# https://github.com/sr-lab/coqpyt/blob/master/coqpyt/lsp/json_rpc_endpoint.py#L65
|
||||||
|
# Virtually this whole method unabashedly taken from ^ ...
|
||||||
|
@daemon_threaded
|
||||||
|
def _monitor_lsp_response(self):
|
||||||
|
if not hasattr(self, "lsp_process"): return
|
||||||
|
|
||||||
|
with self.read_lock:
|
||||||
|
message_size = None
|
||||||
|
while True:
|
||||||
|
line = self.lsp_process.stdout.readline()
|
||||||
|
if not line: return None # Quit listener...
|
||||||
|
|
||||||
|
line = line.decode("utf-8")
|
||||||
|
if not line.endswith("\r\n"):
|
||||||
|
raise Exception(
|
||||||
|
"Bad header: missing newline"
|
||||||
|
)
|
||||||
|
line = line[:-2] # Strip the "\r\n"
|
||||||
|
if line == "": # We're done with the headers...
|
||||||
|
break
|
||||||
|
elif line.startswith(LEN_HEADER):
|
||||||
|
line = line[len(LEN_HEADER) :]
|
||||||
|
if not line.isdigit():
|
||||||
|
raise Exception(
|
||||||
|
"Bad header: size is not int",
|
||||||
|
)
|
||||||
|
message_size = int(line)
|
||||||
|
elif line.startswith(TYPE_HEADER):
|
||||||
|
# Not doing anything with type header, currently...
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
line = line.split(LEN_HEADER)
|
||||||
|
if len(line) == 2:
|
||||||
|
message_size = line[1]
|
||||||
|
raise Exception(
|
||||||
|
"Bad header: unknown header"
|
||||||
|
)
|
||||||
|
|
||||||
|
if not message_size: return
|
||||||
|
|
||||||
|
data = self.lsp_process.stdout.read(message_size)
|
||||||
|
jsonrpc_res = data.decode("utf-8")
|
||||||
|
lsp_response = json.loads( jsonrpc_res )
|
||||||
|
response_id = -1
|
||||||
|
|
||||||
|
if not lsp_response: return
|
||||||
|
if "id" in lsp_response.keys(): response_id = lsp_response["id"]
|
||||||
|
|
||||||
|
GLib.idle_add(self.handle_lsp_response, LSPResponse(response_id, lsp_response))
|
||||||
|
|
||||||
|
def handle_lsp_response(self, lsp_response: LSPResponse):
|
||||||
|
self.log_list.add_log_entry("LSP Response", lsp_response)
|
||||||
|
|
||||||
|
result = lsp_response.result
|
||||||
|
keys = result.keys()
|
||||||
|
|
||||||
|
if "error" in keys:
|
||||||
|
error = result["error"]
|
||||||
|
logger.debug(f"LSP Error Code: {error['code']}")
|
||||||
|
logger.debug(f"LSP Error Message:\n{error['message']}")
|
||||||
|
return
|
||||||
|
|
||||||
|
if "result" in keys:
|
||||||
|
result = result["result"]
|
||||||
|
|
||||||
|
if isinstance(result, dict):
|
||||||
|
keys = result.keys()
|
||||||
|
if "capabilities" in keys:
|
||||||
|
...
|
||||||
|
|
||||||
|
if isinstance(result, list):
|
||||||
|
...
|
||||||
|
|
||||||
|
if isinstance(result, tuple):
|
||||||
|
...
|
|
@ -61,7 +61,6 @@ class BottomButtonBox(Gtk.Box):
|
||||||
self.full_init_btn
|
self.full_init_btn
|
||||||
]:
|
]:
|
||||||
self.add(child)
|
self.add(child)
|
||||||
# self.set_child_secondary(child, True)
|
|
||||||
child.set_always_show_image(True)
|
child.set_always_show_image(True)
|
||||||
child.set_image( Gtk.Image.new_from_icon_name("gtk-media-play", Gtk.IconSize.BUTTON) )
|
child.set_image( Gtk.Image.new_from_icon_name("gtk-media-play", Gtk.IconSize.BUTTON) )
|
||||||
|
|
||||||
|
@ -101,7 +100,7 @@ class BottomButtonBox(Gtk.Box):
|
||||||
self.rest_buttons()
|
self.rest_buttons()
|
||||||
|
|
||||||
self.get_parent().start_stop_lsp()
|
self.get_parent().start_stop_lsp()
|
||||||
if not hasattr(self.get_parent(), "lsp_process"):
|
if not hasattr(self.get_parent().lsp_controller, "lsp_process"):
|
||||||
self.rest_buttons()
|
self.rest_buttons()
|
||||||
return
|
return
|
||||||
|
|
||||||
|
@ -109,7 +108,7 @@ class BottomButtonBox(Gtk.Box):
|
||||||
button.hide()
|
button.hide()
|
||||||
self.start_stop_lsp(self.start_stop_lsp_btn)
|
self.start_stop_lsp(self.start_stop_lsp_btn)
|
||||||
|
|
||||||
if not hasattr(self.get_parent(), "lsp_process"):
|
if not hasattr(self.get_parent().lsp_controller, "lsp_process"):
|
||||||
self.rest_buttons()
|
self.rest_buttons()
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
|
@ -1,45 +1,25 @@
|
||||||
# Python imports
|
# Python imports
|
||||||
import os
|
|
||||||
import signal
|
|
||||||
import json
|
import json
|
||||||
import subprocess
|
|
||||||
import threading
|
|
||||||
|
|
||||||
# Lib imports
|
# Lib imports
|
||||||
import gi
|
import gi
|
||||||
gi.require_version('Gtk', '3.0')
|
gi.require_version('Gtk', '3.0')
|
||||||
gi.require_version('GtkSource', '4')
|
|
||||||
from gi.repository import Gtk
|
from gi.repository import Gtk
|
||||||
from gi.repository import GLib
|
|
||||||
from gi.repository import GtkSource
|
|
||||||
|
|
||||||
# Application imports
|
# Application imports
|
||||||
# from libs.dto.lsp_structs import TextDocumentItem
|
from core.controllers.lsp_controller import LSPController
|
||||||
from libs.dto.lsp_message_structs import MessageEncoder, LSPRequest, LSPNotification, LSPResponse
|
|
||||||
from .buttons.top_button_box import TopButtonBox
|
from .buttons.top_button_box import TopButtonBox
|
||||||
from .enteries.lsp_message_source_view import LspMessageSourceView
|
from .enteries.lsp_message_source_view import LspMessageSourceView
|
||||||
from .buttons.bottom_button_box import BottomButtonBox
|
from .buttons.bottom_button_box import BottomButtonBox
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
LEN_HEADER = "Content-Length: "
|
|
||||||
TYPE_HEADER = "Content-Type: "
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class LSPMessageBox(Gtk.Box):
|
class LSPMessageBox(Gtk.Box):
|
||||||
def __init__(self):
|
def __init__(self, log_list):
|
||||||
super(LSPMessageBox, self).__init__()
|
super(LSPMessageBox, self).__init__()
|
||||||
|
|
||||||
self._lsp_pid = -1
|
self.lsp_controller = LSPController()
|
||||||
self._message_id = 0
|
self.lsp_controller.set_log_list(log_list)
|
||||||
|
|
||||||
# https://github.com/microsoft/multilspy/tree/main/src/multilspy/language_servers
|
|
||||||
# initialize-params-slim.json was created off of jedi_language_server one
|
|
||||||
self._lsp_init_ops = settings_manager.get_lsp_init_data()
|
|
||||||
|
|
||||||
self.read_lock = threading.Lock()
|
|
||||||
self.write_lock = threading.Lock()
|
|
||||||
|
|
||||||
self._setup_styling()
|
self._setup_styling()
|
||||||
self._setup_signals()
|
self._setup_signals()
|
||||||
|
@ -77,189 +57,39 @@ class LSPMessageBox(Gtk.Box):
|
||||||
self.add(bottom_buttons)
|
self.add(bottom_buttons)
|
||||||
|
|
||||||
def update_message_id_label(self):
|
def update_message_id_label(self):
|
||||||
self.top_buttons.update_message_id_lbl(self._message_id)
|
self.top_buttons.update_message_id_lbl(
|
||||||
|
self.lsp_controller.get_message_id()
|
||||||
|
)
|
||||||
|
|
||||||
def send_initialize_message(self):
|
def send_initialize_message(self):
|
||||||
parent = self.get_parent()
|
parent = self.get_parent()
|
||||||
init_ops = parent.init_ops_src_vw.get_text_str()
|
init_ops = parent.init_ops_src_vw.get_text_str()
|
||||||
workspace_file = self.top_buttons.get_workspace_path()
|
workspace_file = self.top_buttons.get_workspace_path()
|
||||||
workspace_uri = self.top_buttons.get_workspace_uri()
|
workspace_uri = self.top_buttons.get_workspace_uri()
|
||||||
folder_name = os.path.basename(workspace_file)
|
|
||||||
|
|
||||||
self._lsp_init_ops["processId"] = settings_manager.get_app_pid()
|
self.lsp_controller.send_initialize_message(init_ops, workspace_file, workspace_uri)
|
||||||
self._lsp_init_ops["rootPath"] = workspace_file
|
|
||||||
self._lsp_init_ops["rootUri"] = workspace_uri
|
|
||||||
self._lsp_init_ops["workspaceFolders"] = [
|
|
||||||
{
|
|
||||||
"name": folder_name,
|
|
||||||
"uri": workspace_uri
|
|
||||||
}
|
|
||||||
]
|
|
||||||
|
|
||||||
self._lsp_init_ops["initializationOptions"] = json.loads(init_ops)
|
|
||||||
self.send_request("initialize", self._lsp_init_ops)
|
|
||||||
|
|
||||||
del init_ops
|
|
||||||
del workspace_file
|
|
||||||
del workspace_uri
|
|
||||||
del folder_name
|
|
||||||
del parent
|
|
||||||
|
|
||||||
def send_initialized_message(self):
|
|
||||||
self.send_notification("initialized")
|
|
||||||
|
|
||||||
def send_notification(self, method: str, params: {} = {}):
|
|
||||||
self._send_message( LSPNotification(method, params) )
|
|
||||||
|
|
||||||
def send_request(self, method: str, params: {} = {}):
|
|
||||||
self._monitor_lsp_response()
|
|
||||||
self._send_message( LSPRequest(self._message_id, method, params) )
|
|
||||||
|
|
||||||
self._message_id += 1
|
|
||||||
self.update_message_id_label()
|
self.update_message_id_label()
|
||||||
|
|
||||||
def _send_message(self, data: LSPRequest or LSPNotification):
|
def send_initialized_message(self):
|
||||||
if not data or not hasattr(self, "lsp_process"): return
|
self.lsp_controller.send_initialized_message()
|
||||||
|
|
||||||
message_str = json.dumps(data, cls = MessageEncoder)
|
|
||||||
message_size = len(message_str)
|
|
||||||
message = f"Content-Length: {message_size}\r\n\r\n{message_str}"
|
|
||||||
|
|
||||||
self.get_parent().log_list.add_log_entry("Client", data)
|
|
||||||
|
|
||||||
with self.write_lock:
|
|
||||||
self.lsp_process.stdin.write( message.encode("utf-8") )
|
|
||||||
self.lsp_process.stdin.flush()
|
|
||||||
|
|
||||||
del message_str
|
|
||||||
del message_size
|
|
||||||
del message
|
|
||||||
|
|
||||||
def button_send_notification(self):
|
def button_send_notification(self):
|
||||||
message = json.loads( self.lsp_msg_src_vw.get_text_str() )
|
message = json.loads( self.lsp_msg_src_vw.get_text_str() )
|
||||||
self.send_notification(message["method"], message["params"])
|
self.lsp_controller.send_notification(message["method"], message["params"])
|
||||||
|
|
||||||
def button_send_request(self):
|
def button_send_request(self):
|
||||||
message = json.loads( self.lsp_msg_src_vw.get_text_str() )
|
message = json.loads( self.lsp_msg_src_vw.get_text_str() )
|
||||||
self.send_request(message["method"], message["params"])
|
self.lsp_controller.send_request(message["method"], message["params"])
|
||||||
|
self.update_message_id_label()
|
||||||
|
|
||||||
def start_stop_lsp(self):
|
def start_stop_lsp(self):
|
||||||
if self._lsp_pid == -1:
|
|
||||||
pid = self.start_lsp()
|
|
||||||
if not pid: return
|
|
||||||
|
|
||||||
self._lsp_pid = pid
|
|
||||||
self._monitor_lsp_response()
|
|
||||||
else:
|
|
||||||
self.stop_lsp()
|
|
||||||
self.get_parent().log_list.clear()
|
|
||||||
|
|
||||||
def start_lsp(self):
|
|
||||||
parent = self.get_parent()
|
parent = self.get_parent()
|
||||||
_command: str = parent.alt_command_entry.get_text()
|
_command: str = parent.alt_command_entry.get_text()
|
||||||
# _command: str = parent.command_entry.get_text()
|
# _command: str = parent.command_entry.get_text()
|
||||||
# _command: str = parent.socket_entry.get_text()
|
# _command: str = parent.socket_entry.get_text()
|
||||||
command: [] = _command.split() if len( _command.split() ) > 0 else [ _command ]
|
command: [] = _command.split() if len( _command.split() ) > 0 else [ _command ]
|
||||||
try:
|
|
||||||
self.lsp_process = subprocess.Popen(
|
|
||||||
command,
|
|
||||||
stdout = subprocess.PIPE,
|
|
||||||
stdin = subprocess.PIPE
|
|
||||||
)
|
|
||||||
except Exception as e:
|
|
||||||
self.get_parent().log_list.add_log_entry(
|
|
||||||
"LSP Client Error",
|
|
||||||
LSPResponse(
|
|
||||||
None,
|
|
||||||
{
|
|
||||||
"error": repr(e)
|
|
||||||
}
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
return
|
self.lsp_controller.set_start_command(command)
|
||||||
|
self.lsp_controller.start_stop_lsp()
|
||||||
return self.lsp_process.pid
|
self.lsp_controller.unset_start_command()
|
||||||
|
|
||||||
def stop_lsp(self):
|
|
||||||
if self._lsp_pid == -1: return
|
|
||||||
|
|
||||||
self._lsp_pid = -1
|
|
||||||
self._message_id = 0
|
|
||||||
self.lsp_process.terminate()
|
|
||||||
self.update_message_id_label()
|
self.update_message_id_label()
|
||||||
|
|
||||||
# https://github.com/sr-lab/coqpyt/blob/master/coqpyt/lsp/json_rpc_endpoint.py#L65
|
|
||||||
# Virtually this whole method unabashedly taken from ^ ...
|
|
||||||
@daemon_threaded
|
|
||||||
def _monitor_lsp_response(self):
|
|
||||||
if not hasattr(self, "lsp_process"): return
|
|
||||||
|
|
||||||
with self.read_lock:
|
|
||||||
message_size = None
|
|
||||||
while True:
|
|
||||||
line = self.lsp_process.stdout.readline()
|
|
||||||
if not line: return None # Quit listener...
|
|
||||||
|
|
||||||
line = line.decode("utf-8")
|
|
||||||
if not line.endswith("\r\n"):
|
|
||||||
raise Exception(
|
|
||||||
"Bad header: missing newline"
|
|
||||||
)
|
|
||||||
line = line[:-2] # Strip the "\r\n"
|
|
||||||
if line == "": # We're done with the headers...
|
|
||||||
break
|
|
||||||
elif line.startswith(LEN_HEADER):
|
|
||||||
line = line[len(LEN_HEADER) :]
|
|
||||||
if not line.isdigit():
|
|
||||||
raise Exception(
|
|
||||||
"Bad header: size is not int",
|
|
||||||
)
|
|
||||||
message_size = int(line)
|
|
||||||
elif line.startswith(TYPE_HEADER):
|
|
||||||
# Not doing anything with type header, currently...
|
|
||||||
pass
|
|
||||||
else:
|
|
||||||
line = line.split(LEN_HEADER)
|
|
||||||
if len(line) == 2:
|
|
||||||
message_size = line[1]
|
|
||||||
raise Exception(
|
|
||||||
"Bad header: unknown header"
|
|
||||||
)
|
|
||||||
|
|
||||||
if not message_size: return
|
|
||||||
|
|
||||||
data = self.lsp_process.stdout.read(message_size)
|
|
||||||
jsonrpc_res = data.decode("utf-8")
|
|
||||||
lsp_response = json.loads( jsonrpc_res )
|
|
||||||
response_id = -1
|
|
||||||
|
|
||||||
if not lsp_response: return
|
|
||||||
if "id" in lsp_response.keys(): response_id = lsp_response["id"]
|
|
||||||
|
|
||||||
GLib.idle_add(self.handle_lsp_response, LSPResponse(response_id, lsp_response))
|
|
||||||
|
|
||||||
def handle_lsp_response(self, lsp_response: LSPResponse):
|
|
||||||
self.get_parent().log_list.add_log_entry("LSP Response", lsp_response)
|
|
||||||
result = lsp_response.result
|
|
||||||
keys = result.keys()
|
|
||||||
|
|
||||||
if "error" in keys:
|
|
||||||
lsp_response = result["error"]
|
|
||||||
logger.debug(f"LSP Error Code: {result['code']}")
|
|
||||||
logger.debug(f"LSP Error Message:\n{result['message']}")
|
|
||||||
return
|
|
||||||
|
|
||||||
if "result" in keys:
|
|
||||||
result = result["result"]
|
|
||||||
|
|
||||||
if isinstance(result, dict):
|
|
||||||
keys = result.keys()
|
|
||||||
if "capabilities" in keys:
|
|
||||||
...
|
|
||||||
|
|
||||||
if isinstance(result, list):
|
|
||||||
...
|
|
||||||
|
|
||||||
if isinstance(result, tuple):
|
|
||||||
...
|
|
|
@ -65,7 +65,7 @@ class LSPUI(Gtk.Grid):
|
||||||
self.command_entry = CommandEntry( self._data["command"] )
|
self.command_entry = CommandEntry( self._data["command"] )
|
||||||
self.socket_entry = SocketEntry( self._data["socket"] )
|
self.socket_entry = SocketEntry( self._data["socket"] )
|
||||||
self.init_ops_src_vw = InitOptionsSourceView( self._data["initialization-options"] )
|
self.init_ops_src_vw = InitOptionsSourceView( self._data["initialization-options"] )
|
||||||
self.message_box = LSPMessageBox()
|
self.message_box = LSPMessageBox(self.log_list)
|
||||||
|
|
||||||
init_options_lbl.set_margin_top(10)
|
init_options_lbl.set_margin_top(10)
|
||||||
message_box_lbl.set_margin_top(10)
|
message_box_lbl.set_margin_top(10)
|
||||||
|
|
Loading…
Reference in New Issue