efficient vim config
[dotfiles/.git] / .local / lib / python3.9 / site-packages / pynvim / plugin / host.py
1 """Implements a Nvim host for python plugins."""
2 import imp
3 import inspect
4 import logging
5 import os
6 import os.path
7 import re
8 from functools import partial
9 from traceback import format_exc
10
11 from pynvim.api import decode_if_bytes, walk
12 from pynvim.compat import IS_PYTHON3, find_module
13 from pynvim.msgpack_rpc import ErrorResponse
14 from pynvim.plugin import script_host
15 from pynvim.util import format_exc_skip, get_client_info
16
17 __all__ = ('Host')
18
19 logger = logging.getLogger(__name__)
20 error, debug, info, warn = (logger.error, logger.debug, logger.info,
21                             logger.warning,)
22
23 host_method_spec = {"poll": {}, "specs": {"nargs": 1}, "shutdown": {}}
24
25
26 class Host(object):
27
28     """Nvim host for python plugins.
29
30     Takes care of loading/unloading plugins and routing msgpack-rpc
31     requests/notifications to the appropriate handlers.
32     """
33
34     def __init__(self, nvim):
35         """Set handlers for plugin_load/plugin_unload."""
36         self.nvim = nvim
37         self._specs = {}
38         self._loaded = {}
39         self._load_errors = {}
40         self._notification_handlers = {
41             'nvim_error_event': self._on_error_event
42         }
43         self._request_handlers = {
44             'poll': lambda: 'ok',
45             'specs': self._on_specs_request,
46             'shutdown': self.shutdown
47         }
48
49         # Decode per default for Python3
50         self._decode_default = IS_PYTHON3
51
52     def _on_async_err(self, msg):
53         # uncaught python exception
54         self.nvim.err_write(msg, async_=True)
55
56     def _on_error_event(self, kind, msg):
57         # error from nvim due to async request
58         # like nvim.command(..., async_=True)
59         errmsg = "{}: Async request caused an error:\n{}\n".format(
60             self.name, decode_if_bytes(msg))
61         self.nvim.err_write(errmsg, async_=True)
62         return errmsg
63
64     def start(self, plugins):
65         """Start listening for msgpack-rpc requests and notifications."""
66         self.nvim.run_loop(self._on_request,
67                            self._on_notification,
68                            lambda: self._load(plugins),
69                            err_cb=self._on_async_err)
70
71     def shutdown(self):
72         """Shutdown the host."""
73         self._unload()
74         self.nvim.stop_loop()
75
76     def _wrap_delayed_function(self, cls, delayed_handlers, name, sync,
77                                module_handlers, path, *args):
78         # delete the delayed handlers to be sure
79         for handler in delayed_handlers:
80             method_name = handler._nvim_registered_name
81             if handler._nvim_rpc_sync:
82                 del self._request_handlers[method_name]
83             else:
84                 del self._notification_handlers[method_name]
85         # create an instance of the plugin and pass the nvim object
86         plugin = cls(self._configure_nvim_for(cls))
87
88         # discover handlers in the plugin instance
89         self._discover_functions(plugin, module_handlers, path, False)
90
91         if sync:
92             self._request_handlers[name](*args)
93         else:
94             self._notification_handlers[name](*args)
95
96     def _wrap_function(self, fn, sync, decode, nvim_bind, name, *args):
97         if decode:
98             args = walk(decode_if_bytes, args, decode)
99         if nvim_bind is not None:
100             args.insert(0, nvim_bind)
101         try:
102             return fn(*args)
103         except Exception:
104             if sync:
105                 msg = ("error caught in request handler '{} {}':\n{}"
106                        .format(name, args, format_exc_skip(1)))
107                 raise ErrorResponse(msg)
108             else:
109                 msg = ("error caught in async handler '{} {}'\n{}\n"
110                        .format(name, args, format_exc_skip(1)))
111                 self._on_async_err(msg + "\n")
112
113     def _on_request(self, name, args):
114         """Handle a msgpack-rpc request."""
115         if IS_PYTHON3:
116             name = decode_if_bytes(name)
117         handler = self._request_handlers.get(name, None)
118         if not handler:
119             msg = self._missing_handler_error(name, 'request')
120             pass # replaces next logging statement
121             #error(msg)
122             raise ErrorResponse(msg)
123
124         pass # replaces next logging statement
125         #debug('calling request handler for "%s", args: "%s"', name, args)
126         rv = handler(*args)
127         pass # replaces next logging statement
128         #debug("request handler for '%s %s' returns: %s", name, args, rv)
129         return rv
130
131     def _on_notification(self, name, args):
132         """Handle a msgpack-rpc notification."""
133         if IS_PYTHON3:
134             name = decode_if_bytes(name)
135         handler = self._notification_handlers.get(name, None)
136         if not handler:
137             msg = self._missing_handler_error(name, 'notification')
138             pass # replaces next logging statement
139             #error(msg)
140             self._on_async_err(msg + "\n")
141             return
142
143         pass # replaces next logging statement
144         #debug('calling notification handler for "%s", args: "%s"', name, args)
145         handler(*args)
146
147     def _missing_handler_error(self, name, kind):
148         msg = 'no {} handler registered for "{}"'.format(kind, name)
149         pathmatch = re.match(r'(.+):[^:]+:[^:]+', name)
150         if pathmatch:
151             loader_error = self._load_errors.get(pathmatch.group(1))
152             if loader_error is not None:
153                 msg = msg + "\n" + loader_error
154         return msg
155
156     def _load(self, plugins):
157         has_script = False
158         for path in plugins:
159             err = None
160             if path in self._loaded:
161                 pass # replaces next logging statement
162                 #error('{} is already loaded'.format(path))
163                 continue
164             try:
165                 if path == "script_host.py":
166                     module = script_host
167                     has_script = True
168                 else:
169                     directory, name = os.path.split(os.path.splitext(path)[0])
170                     file, pathname, descr = find_module(name, [directory])
171                     module = imp.load_module(name, file, pathname, descr)
172                 handlers = []
173                 self._discover_classes(module, handlers, path)
174                 self._discover_functions(module, handlers, path, False)
175                 if not handlers:
176                     pass # replaces next logging statement
177                     #error('{} exports no handlers'.format(path))
178                     continue
179                 self._loaded[path] = {'handlers': handlers, 'module': module}
180             except Exception as e:
181                 err = ('Encountered {} loading plugin at {}: {}\n{}'
182                        .format(type(e).__name__, path, e, format_exc(5)))
183                 pass # replaces next logging statement
184                 #error(err)
185                 self._load_errors[path] = err
186
187         kind = ("script-host" if len(plugins) == 1 and has_script
188                 else "rplugin-host")
189         info = get_client_info(kind, 'host', host_method_spec)
190         self.name = info[0]
191         self.nvim.api.set_client_info(*info, async_=True)
192
193     def _unload(self):
194         for path, plugin in self._loaded.items():
195             handlers = plugin['handlers']
196             for handler in handlers:
197                 method_name = handler._nvim_registered_name
198                 if hasattr(handler, '_nvim_shutdown_hook'):
199                     handler()
200                 elif handler._nvim_rpc_sync:
201                     del self._request_handlers[method_name]
202                 else:
203                     del self._notification_handlers[method_name]
204         self._specs = {}
205         self._loaded = {}
206
207     def _discover_classes(self, module, handlers, plugin_path):
208         for _, cls in inspect.getmembers(module, inspect.isclass):
209             if getattr(cls, '_nvim_plugin', False):
210                 # discover handlers in the plugin instance
211                 self._discover_functions(cls, handlers, plugin_path, True)
212
213     def _discover_functions(self, obj, handlers, plugin_path, delay):
214         def predicate(o):
215             return hasattr(o, '_nvim_rpc_method_name')
216
217         cls_handlers = []
218         specs = []
219         objdecode = getattr(obj, '_nvim_decode', self._decode_default)
220         for _, fn in inspect.getmembers(obj, predicate):
221             method = fn._nvim_rpc_method_name
222             if fn._nvim_prefix_plugin_path:
223                 method = '{}:{}'.format(plugin_path, method)
224             sync = fn._nvim_rpc_sync
225             if delay:
226                 fn_wrapped = partial(self._wrap_delayed_function, obj,
227                                      cls_handlers, method, sync,
228                                      handlers, plugin_path)
229             else:
230                 decode = getattr(fn, '_nvim_decode', objdecode)
231                 nvim_bind = None
232                 if fn._nvim_bind:
233                     nvim_bind = self._configure_nvim_for(fn)
234
235                 fn_wrapped = partial(self._wrap_function, fn,
236                                      sync, decode, nvim_bind, method)
237             self._copy_attributes(fn, fn_wrapped)
238             fn_wrapped._nvim_registered_name = method
239             # register in the rpc handler dict
240             if sync:
241                 if method in self._request_handlers:
242                     raise Exception(('Request handler for "{}" is '
243                                     + 'already registered').format(method))
244                 self._request_handlers[method] = fn_wrapped
245             else:
246                 if method in self._notification_handlers:
247                     raise Exception(('Notification handler for "{}" is '
248                                     + 'already registered').format(method))
249                 self._notification_handlers[method] = fn_wrapped
250             if hasattr(fn, '_nvim_rpc_spec'):
251                 specs.append(fn._nvim_rpc_spec)
252             handlers.append(fn_wrapped)
253             cls_handlers.append(fn_wrapped)
254         if specs:
255             self._specs[plugin_path] = specs
256
257     def _copy_attributes(self, fn, fn2):
258         # Copy _nvim_* attributes from the original function
259         for attr in dir(fn):
260             if attr.startswith('_nvim_'):
261                 setattr(fn2, attr, getattr(fn, attr))
262
263     def _on_specs_request(self, path):
264         if IS_PYTHON3:
265             path = decode_if_bytes(path)
266         if path in self._load_errors:
267             self.nvim.out_write(self._load_errors[path] + '\n')
268         return self._specs.get(path, 0)
269
270     def _configure_nvim_for(self, obj):
271         # Configure a nvim instance for obj (checks encoding configuration)
272         nvim = self.nvim
273         decode = getattr(obj, '_nvim_decode', self._decode_default)
274         if decode:
275             nvim = nvim.with_decode(decode)
276         return nvim