--- /dev/null
+"""Synchronous msgpack-rpc session layer."""
+import logging
+import threading
+from collections import deque
+from traceback import format_exc
+
+import greenlet
+
+from pynvim.compat import check_async
+
+logger = logging.getLogger(__name__)
+error, debug, info, warn = (logger.error, logger.debug, logger.info,
+ logger.warning,)
+
+
+class Session(object):
+
+ """Msgpack-rpc session layer that uses coroutines for a synchronous API.
+
+ This class provides the public msgpack-rpc API required by this library.
+ It uses the greenlet module to handle requests and notifications coming
+ from Nvim with a synchronous API.
+ """
+
+ def __init__(self, async_session):
+ """Wrap `async_session` on a synchronous msgpack-rpc interface."""
+ self._async_session = async_session
+ self._request_cb = self._notification_cb = None
+ self._pending_messages = deque()
+ self._is_running = False
+ self._setup_exception = None
+ self.loop = async_session.loop
+ self._loop_thread = None
+
+ def threadsafe_call(self, fn, *args, **kwargs):
+ """Wrapper around `AsyncSession.threadsafe_call`."""
+ def handler():
+ try:
+ fn(*args, **kwargs)
+ except Exception:
+ pass # replaces next logging statement
+ #warn("error caught while excecuting async callback\n%s\n",
+ #format_exc())
+
+ def greenlet_wrapper():
+ gr = greenlet.greenlet(handler)
+ gr.switch()
+
+ self._async_session.threadsafe_call(greenlet_wrapper)
+
+ def next_message(self):
+ """Block until a message(request or notification) is available.
+
+ If any messages were previously enqueued, return the first in queue.
+ If not, run the event loop until one is received.
+ """
+ if self._is_running:
+ raise Exception('Event loop already running')
+ if self._pending_messages:
+ return self._pending_messages.popleft()
+ self._async_session.run(self._enqueue_request_and_stop,
+ self._enqueue_notification_and_stop)
+ if self._pending_messages:
+ return self._pending_messages.popleft()
+
+ def request(self, method, *args, **kwargs):
+ """Send a msgpack-rpc request and block until as response is received.
+
+ If the event loop is running, this method must have been called by a
+ request or notification handler running on a greenlet. In that case,
+ send the quest and yield to the parent greenlet until a response is
+ available.
+
+ When the event loop is not running, it will perform a blocking request
+ like this:
+ - Send the request
+ - Run the loop until the response is available
+ - Put requests/notifications received while waiting into a queue
+
+ If the `async_` flag is present and True, a asynchronous notification
+ is sent instead. This will never block, and the return value or error
+ is ignored.
+ """
+ async_ = check_async(kwargs.pop('async_', None), kwargs, False)
+ if async_:
+ self._async_session.notify(method, args)
+ return
+
+ if kwargs:
+ raise ValueError("request got unsupported keyword argument(s): {}"
+ .format(', '.join(kwargs.keys())))
+
+ if self._is_running:
+ v = self._yielding_request(method, args)
+ else:
+ v = self._blocking_request(method, args)
+ if not v:
+ # EOF
+ raise OSError('EOF')
+ err, rv = v
+ if err:
+ pass # replaces next logging statement
+ #info("'Received error: %s", err)
+ raise self.error_wrapper(err)
+ return rv
+
+ def run(self, request_cb, notification_cb, setup_cb=None):
+ """Run the event loop to receive requests and notifications from Nvim.
+
+ Like `AsyncSession.run()`, but `request_cb` and `notification_cb` are
+ inside greenlets.
+ """
+ self._request_cb = request_cb
+ self._notification_cb = notification_cb
+ self._is_running = True
+ self._setup_exception = None
+ self._loop_thread = threading.current_thread()
+
+ def on_setup():
+ try:
+ setup_cb()
+ except Exception as e:
+ self._setup_exception = e
+ self.stop()
+
+ if setup_cb:
+ # Create a new greenlet to handle the setup function
+ gr = greenlet.greenlet(on_setup)
+ gr.switch()
+
+ if self._setup_exception:
+ pass # replaces next logging statement
+ #error('Setup error: {}'.format(self._setup_exception))
+ raise self._setup_exception
+
+ # Process all pending requests and notifications
+ while self._pending_messages:
+ msg = self._pending_messages.popleft()
+ getattr(self, '_on_{}'.format(msg[0]))(*msg[1:])
+ self._async_session.run(self._on_request, self._on_notification)
+ self._is_running = False
+ self._request_cb = None
+ self._notification_cb = None
+ self._loop_thread = None
+
+ if self._setup_exception:
+ raise self._setup_exception
+
+ def stop(self):
+ """Stop the event loop."""
+ self._async_session.stop()
+
+ def close(self):
+ """Close the event loop."""
+ self._async_session.close()
+
+ def _yielding_request(self, method, args):
+ gr = greenlet.getcurrent()
+ parent = gr.parent
+
+ def response_cb(err, rv):
+ pass # replaces next logging statement
+ #debug('response is available for greenlet %s, switching back', gr)
+ gr.switch(err, rv)
+
+ self._async_session.request(method, args, response_cb)
+ pass # replaces next logging statement
+ #debug('yielding from greenlet %s to wait for response', gr)
+ return parent.switch()
+
+ def _blocking_request(self, method, args):
+ result = []
+
+ def response_cb(err, rv):
+ result.extend([err, rv])
+ self.stop()
+
+ self._async_session.request(method, args, response_cb)
+ self._async_session.run(self._enqueue_request,
+ self._enqueue_notification)
+ return result
+
+ def _enqueue_request_and_stop(self, name, args, response):
+ self._enqueue_request(name, args, response)
+ self.stop()
+
+ def _enqueue_notification_and_stop(self, name, args):
+ self._enqueue_notification(name, args)
+ self.stop()
+
+ def _enqueue_request(self, name, args, response):
+ self._pending_messages.append(('request', name, args, response,))
+
+ def _enqueue_notification(self, name, args):
+ self._pending_messages.append(('notification', name, args,))
+
+ def _on_request(self, name, args, response):
+ def handler():
+ try:
+ rv = self._request_cb(name, args)
+ pass # replaces next logging statement
+ #debug('greenlet %s finished executing, '
+ #+ 'sending %s as response', gr, rv)
+ response.send(rv)
+ except ErrorResponse as err:
+ pass # replaces next logging statement
+ #warn("error response from request '%s %s': %s", name,
+ #args, format_exc())
+ response.send(err.args[0], error=True)
+ except Exception as err:
+ pass # replaces next logging statement
+ #warn("error caught while processing request '%s %s': %s", name,
+ #args, format_exc())
+ response.send(repr(err) + "\n" + format_exc(5), error=True)
+ pass # replaces next logging statement
+ #debug('greenlet %s is now dying...', gr)
+
+ # Create a new greenlet to handle the request
+ gr = greenlet.greenlet(handler)
+ pass # replaces next logging statement
+ #debug('received rpc request, greenlet %s will handle it', gr)
+ gr.switch()
+
+ def _on_notification(self, name, args):
+ def handler():
+ try:
+ self._notification_cb(name, args)
+ pass # replaces next logging statement
+ #debug('greenlet %s finished executing', gr)
+ except Exception:
+ pass # replaces next logging statement
+ #warn("error caught while processing notification '%s %s': %s",
+ #name, args, format_exc())
+
+ pass # replaces next logging statement
+ #debug('greenlet %s is now dying...', gr)
+
+ gr = greenlet.greenlet(handler)
+ pass # replaces next logging statement
+ #debug('received rpc notification, greenlet %s will handle it', gr)
+ gr.switch()
+
+
+class ErrorResponse(BaseException):
+
+ """Raise this in a request handler to respond with a given error message.
+
+ Unlike when other exceptions are caught, this gives full control off the
+ error response sent. When "ErrorResponse(msg)" is caught "msg" will be
+ sent verbatim as the error response.No traceback will be appended.
+ """
+
+ pass