1 """Implements a Nvim host for python plugins."""
8 from functools import partial
9 from traceback import format_exc
11 from pynvim.api import decode_if_bytes, walk
12 from pynvim.compat import IS_PYTHON3, find_module
13 from pynvim.msgpack_rpc import ErrorResponse
14 from pynvim.plugin import script_host
15 from pynvim.util import format_exc_skip, get_client_info
19 logger = logging.getLogger(__name__)
20 error, debug, info, warn = (logger.error, logger.debug, logger.info,
23 host_method_spec = {"poll": {}, "specs": {"nargs": 1}, "shutdown": {}}
28 """Nvim host for python plugins.
30 Takes care of loading/unloading plugins and routing msgpack-rpc
31 requests/notifications to the appropriate handlers.
34 def __init__(self, nvim):
35 """Set handlers for plugin_load/plugin_unload."""
39 self._load_errors = {}
40 self._notification_handlers = {
41 'nvim_error_event': self._on_error_event
43 self._request_handlers = {
45 'specs': self._on_specs_request,
46 'shutdown': self.shutdown
49 # Decode per default for Python3
50 self._decode_default = IS_PYTHON3
52 def _on_async_err(self, msg):
53 # uncaught python exception
54 self.nvim.err_write(msg, async_=True)
56 def _on_error_event(self, kind, msg):
57 # error from nvim due to async request
58 # like nvim.command(..., async_=True)
59 errmsg = "{}: Async request caused an error:\n{}\n".format(
60 self.name, decode_if_bytes(msg))
61 self.nvim.err_write(errmsg, async_=True)
64 def start(self, plugins):
65 """Start listening for msgpack-rpc requests and notifications."""
66 self.nvim.run_loop(self._on_request,
67 self._on_notification,
68 lambda: self._load(plugins),
69 err_cb=self._on_async_err)
72 """Shutdown the host."""
76 def _wrap_delayed_function(self, cls, delayed_handlers, name, sync,
77 module_handlers, path, *args):
78 # delete the delayed handlers to be sure
79 for handler in delayed_handlers:
80 method_name = handler._nvim_registered_name
81 if handler._nvim_rpc_sync:
82 del self._request_handlers[method_name]
84 del self._notification_handlers[method_name]
85 # create an instance of the plugin and pass the nvim object
86 plugin = cls(self._configure_nvim_for(cls))
88 # discover handlers in the plugin instance
89 self._discover_functions(plugin, module_handlers, path, False)
92 self._request_handlers[name](*args)
94 self._notification_handlers[name](*args)
96 def _wrap_function(self, fn, sync, decode, nvim_bind, name, *args):
98 args = walk(decode_if_bytes, args, decode)
99 if nvim_bind is not None:
100 args.insert(0, nvim_bind)
105 msg = ("error caught in request handler '{} {}':\n{}"
106 .format(name, args, format_exc_skip(1)))
107 raise ErrorResponse(msg)
109 msg = ("error caught in async handler '{} {}'\n{}\n"
110 .format(name, args, format_exc_skip(1)))
111 self._on_async_err(msg + "\n")
113 def _on_request(self, name, args):
114 """Handle a msgpack-rpc request."""
116 name = decode_if_bytes(name)
117 handler = self._request_handlers.get(name, None)
119 msg = self._missing_handler_error(name, 'request')
120 pass # replaces next logging statement
122 raise ErrorResponse(msg)
124 pass # replaces next logging statement
125 #debug('calling request handler for "%s", args: "%s"', name, args)
127 pass # replaces next logging statement
128 #debug("request handler for '%s %s' returns: %s", name, args, rv)
131 def _on_notification(self, name, args):
132 """Handle a msgpack-rpc notification."""
134 name = decode_if_bytes(name)
135 handler = self._notification_handlers.get(name, None)
137 msg = self._missing_handler_error(name, 'notification')
138 pass # replaces next logging statement
140 self._on_async_err(msg + "\n")
143 pass # replaces next logging statement
144 #debug('calling notification handler for "%s", args: "%s"', name, args)
147 def _missing_handler_error(self, name, kind):
148 msg = 'no {} handler registered for "{}"'.format(kind, name)
149 pathmatch = re.match(r'(.+):[^:]+:[^:]+', name)
151 loader_error = self._load_errors.get(pathmatch.group(1))
152 if loader_error is not None:
153 msg = msg + "\n" + loader_error
156 def _load(self, plugins):
160 if path in self._loaded:
161 pass # replaces next logging statement
162 #error('{} is already loaded'.format(path))
165 if path == "script_host.py":
169 directory, name = os.path.split(os.path.splitext(path)[0])
170 file, pathname, descr = find_module(name, [directory])
171 module = imp.load_module(name, file, pathname, descr)
173 self._discover_classes(module, handlers, path)
174 self._discover_functions(module, handlers, path, False)
176 pass # replaces next logging statement
177 #error('{} exports no handlers'.format(path))
179 self._loaded[path] = {'handlers': handlers, 'module': module}
180 except Exception as e:
181 err = ('Encountered {} loading plugin at {}: {}\n{}'
182 .format(type(e).__name__, path, e, format_exc(5)))
183 pass # replaces next logging statement
185 self._load_errors[path] = err
187 kind = ("script-host" if len(plugins) == 1 and has_script
189 info = get_client_info(kind, 'host', host_method_spec)
191 self.nvim.api.set_client_info(*info, async_=True)
194 for path, plugin in self._loaded.items():
195 handlers = plugin['handlers']
196 for handler in handlers:
197 method_name = handler._nvim_registered_name
198 if hasattr(handler, '_nvim_shutdown_hook'):
200 elif handler._nvim_rpc_sync:
201 del self._request_handlers[method_name]
203 del self._notification_handlers[method_name]
207 def _discover_classes(self, module, handlers, plugin_path):
208 for _, cls in inspect.getmembers(module, inspect.isclass):
209 if getattr(cls, '_nvim_plugin', False):
210 # discover handlers in the plugin instance
211 self._discover_functions(cls, handlers, plugin_path, True)
213 def _discover_functions(self, obj, handlers, plugin_path, delay):
215 return hasattr(o, '_nvim_rpc_method_name')
219 objdecode = getattr(obj, '_nvim_decode', self._decode_default)
220 for _, fn in inspect.getmembers(obj, predicate):
221 method = fn._nvim_rpc_method_name
222 if fn._nvim_prefix_plugin_path:
223 method = '{}:{}'.format(plugin_path, method)
224 sync = fn._nvim_rpc_sync
226 fn_wrapped = partial(self._wrap_delayed_function, obj,
227 cls_handlers, method, sync,
228 handlers, plugin_path)
230 decode = getattr(fn, '_nvim_decode', objdecode)
233 nvim_bind = self._configure_nvim_for(fn)
235 fn_wrapped = partial(self._wrap_function, fn,
236 sync, decode, nvim_bind, method)
237 self._copy_attributes(fn, fn_wrapped)
238 fn_wrapped._nvim_registered_name = method
239 # register in the rpc handler dict
241 if method in self._request_handlers:
242 raise Exception(('Request handler for "{}" is '
243 + 'already registered').format(method))
244 self._request_handlers[method] = fn_wrapped
246 if method in self._notification_handlers:
247 raise Exception(('Notification handler for "{}" is '
248 + 'already registered').format(method))
249 self._notification_handlers[method] = fn_wrapped
250 if hasattr(fn, '_nvim_rpc_spec'):
251 specs.append(fn._nvim_rpc_spec)
252 handlers.append(fn_wrapped)
253 cls_handlers.append(fn_wrapped)
255 self._specs[plugin_path] = specs
257 def _copy_attributes(self, fn, fn2):
258 # Copy _nvim_* attributes from the original function
260 if attr.startswith('_nvim_'):
261 setattr(fn2, attr, getattr(fn, attr))
263 def _on_specs_request(self, path):
265 path = decode_if_bytes(path)
266 if path in self._load_errors:
267 self.nvim.out_write(self._load_errors[path] + '\n')
268 return self._specs.get(path, 0)
270 def _configure_nvim_for(self, obj):
271 # Configure a nvim instance for obj (checks encoding configuration)
273 decode = getattr(obj, '_nvim_decode', self._decode_default)
275 nvim = nvim.with_decode(decode)