Files
Python-With-Gtk-Template/plugins/code/ui/lsp_manager/lsp_controller.py

101 lines
3.9 KiB
Python
Raw Normal View History

# Python imports
# Lib imports
# Application imports
from libs.dto.code.lsp.lsp_message_structs import LSPResponseTypes, LSPResponseRequest, LSPResponseNotification
from .provider import Provider
from .provider_response_cache import ProviderResponseCache
from .lsp_manager_ui import LSPManagerUI
from .lsp_client_controller import LSPClientController
from .handlers.registry import HandlerRegistry
class LSPController:
def __init__(self):
super(LSPController, self).__init__()
self._init()
self._load_widgets()
self._do_bind_mapping()
def _init(self):
self.provider: Provider = Provider()
self.response_cache: ProviderResponseCache = ProviderResponseCache()
self.lsp_client_controller: LSPClientController = LSPClientController()
self.handler_registry: HandlerRegistry = HandlerRegistry()
def _load_widgets(self):
self.lsp_manager_ui: LSPManagerUI = LSPManagerUI()
self.lsp_manager_ui.create_client = self.create_client
self.lsp_manager_ui.close_client = self.close_client
def _do_bind_mapping(self):
self.response_cache.process_file_load = self.lsp_client_controller.process_file_load
self.response_cache.process_file_close = self.lsp_client_controller.process_file_close
self.response_cache.process_file_save = self.lsp_client_controller.process_file_save
self.response_cache.process_file_change = self.lsp_client_controller.process_file_change
self.provider.response_cache = self.response_cache
def create_client(
self,
lang_id: str = "python",
workspace_uri: str = "",
init_opts: dict = {}
) -> bool:
client = self.lsp_client_controller.create_client(
lang_id, workspace_uri, init_opts
)
handler = self.handler_registry.get_handler(lang_id)
self.lsp_client_controller.active_language_id = lang_id
if not client or not handler:
logger.error(f"LSP Controller: Either 'client' or 'handler' didn't get created...'")
self.close_client(lang_id)
return False
handler.set_context(self.handler_registry)
handler.set_response_cache(self.response_cache)
client.handle_lsp_response = self.server_response
client.send_initialize_message(init_opts, "", f"file://{workspace_uri}")
return True
def close_client(self, lang_id: str) -> bool:
self.lsp_client_controller.close_client(lang_id)
self.handler_registry.close_handler(lang_id)
return True
def server_response(self, lsp_response: LSPResponseTypes):
logger.debug(f"LSP Response: { lsp_response }")
if isinstance(lsp_response, LSPResponseRequest):
if not self.lsp_client_controller.active_language_id in self.lsp_client_controller.clients:
logger.debug(f"No LSP client for '{self.lsp_client_controller.active_language_id}', skipping 'server_response'")
return
controller = self.lsp_client_controller.get_active_client()
event = controller.get_event_by_id(lsp_response.id)
handler = self.handler_registry.get_handler(
self.lsp_client_controller.active_language_id, event
)
if not handler: return
handler.handle(event, lsp_response.result, controller)
elif isinstance(lsp_response, LSPResponseNotification):
handler = self.handler_registry.get_handler("default", lsp_response.method)
if not handler: return
# TODO: Need to make default singleton so as to not need to set these here
handler.set_context(self.handler_registry)
handler.set_response_cache(self.response_cache)
handler.handle(lsp_response.method, lsp_response.params, None)