1 """Synchronous msgpack-rpc session layer."""
4 from collections import deque
5 from traceback import format_exc
9 from pynvim.compat import check_async
11 logger = logging.getLogger(__name__)
12 error, debug, info, warn = (logger.error, logger.debug, logger.info,
16 class Session(object):
18 """Msgpack-rpc session layer that uses coroutines for a synchronous API.
20 This class provides the public msgpack-rpc API required by this library.
21 It uses the greenlet module to handle requests and notifications coming
22 from Nvim with a synchronous API.
25 def __init__(self, async_session):
26 """Wrap `async_session` on a synchronous msgpack-rpc interface."""
27 self._async_session = async_session
28 self._request_cb = self._notification_cb = None
29 self._pending_messages = deque()
30 self._is_running = False
31 self._setup_exception = None
32 self.loop = async_session.loop
33 self._loop_thread = None
35 def threadsafe_call(self, fn, *args, **kwargs):
36 """Wrapper around `AsyncSession.threadsafe_call`."""
41 pass # replaces next logging statement
42 #warn("error caught while excecuting async callback\n%s\n",
45 def greenlet_wrapper():
46 gr = greenlet.greenlet(handler)
49 self._async_session.threadsafe_call(greenlet_wrapper)
51 def next_message(self):
52 """Block until a message(request or notification) is available.
54 If any messages were previously enqueued, return the first in queue.
55 If not, run the event loop until one is received.
58 raise Exception('Event loop already running')
59 if self._pending_messages:
60 return self._pending_messages.popleft()
61 self._async_session.run(self._enqueue_request_and_stop,
62 self._enqueue_notification_and_stop)
63 if self._pending_messages:
64 return self._pending_messages.popleft()
66 def request(self, method, *args, **kwargs):
67 """Send a msgpack-rpc request and block until as response is received.
69 If the event loop is running, this method must have been called by a
70 request or notification handler running on a greenlet. In that case,
71 send the quest and yield to the parent greenlet until a response is
74 When the event loop is not running, it will perform a blocking request
77 - Run the loop until the response is available
78 - Put requests/notifications received while waiting into a queue
80 If the `async_` flag is present and True, a asynchronous notification
81 is sent instead. This will never block, and the return value or error
84 async_ = check_async(kwargs.pop('async_', None), kwargs, False)
86 self._async_session.notify(method, args)
90 raise ValueError("request got unsupported keyword argument(s): {}"
91 .format(', '.join(kwargs.keys())))
94 v = self._yielding_request(method, args)
96 v = self._blocking_request(method, args)
102 pass # replaces next logging statement
103 #info("'Received error: %s", err)
104 raise self.error_wrapper(err)
107 def run(self, request_cb, notification_cb, setup_cb=None):
108 """Run the event loop to receive requests and notifications from Nvim.
110 Like `AsyncSession.run()`, but `request_cb` and `notification_cb` are
113 self._request_cb = request_cb
114 self._notification_cb = notification_cb
115 self._is_running = True
116 self._setup_exception = None
117 self._loop_thread = threading.current_thread()
122 except Exception as e:
123 self._setup_exception = e
127 # Create a new greenlet to handle the setup function
128 gr = greenlet.greenlet(on_setup)
131 if self._setup_exception:
132 pass # replaces next logging statement
133 #error('Setup error: {}'.format(self._setup_exception))
134 raise self._setup_exception
136 # Process all pending requests and notifications
137 while self._pending_messages:
138 msg = self._pending_messages.popleft()
139 getattr(self, '_on_{}'.format(msg[0]))(*msg[1:])
140 self._async_session.run(self._on_request, self._on_notification)
141 self._is_running = False
142 self._request_cb = None
143 self._notification_cb = None
144 self._loop_thread = None
146 if self._setup_exception:
147 raise self._setup_exception
150 """Stop the event loop."""
151 self._async_session.stop()
154 """Close the event loop."""
155 self._async_session.close()
157 def _yielding_request(self, method, args):
158 gr = greenlet.getcurrent()
161 def response_cb(err, rv):
162 pass # replaces next logging statement
163 #debug('response is available for greenlet %s, switching back', gr)
166 self._async_session.request(method, args, response_cb)
167 pass # replaces next logging statement
168 #debug('yielding from greenlet %s to wait for response', gr)
169 return parent.switch()
171 def _blocking_request(self, method, args):
174 def response_cb(err, rv):
175 result.extend([err, rv])
178 self._async_session.request(method, args, response_cb)
179 self._async_session.run(self._enqueue_request,
180 self._enqueue_notification)
183 def _enqueue_request_and_stop(self, name, args, response):
184 self._enqueue_request(name, args, response)
187 def _enqueue_notification_and_stop(self, name, args):
188 self._enqueue_notification(name, args)
191 def _enqueue_request(self, name, args, response):
192 self._pending_messages.append(('request', name, args, response,))
194 def _enqueue_notification(self, name, args):
195 self._pending_messages.append(('notification', name, args,))
197 def _on_request(self, name, args, response):
200 rv = self._request_cb(name, args)
201 pass # replaces next logging statement
202 #debug('greenlet %s finished executing, '
203 #+ 'sending %s as response', gr, rv)
205 except ErrorResponse as err:
206 pass # replaces next logging statement
207 #warn("error response from request '%s %s': %s", name,
209 response.send(err.args[0], error=True)
210 except Exception as err:
211 pass # replaces next logging statement
212 #warn("error caught while processing request '%s %s': %s", name,
214 response.send(repr(err) + "\n" + format_exc(5), error=True)
215 pass # replaces next logging statement
216 #debug('greenlet %s is now dying...', gr)
218 # Create a new greenlet to handle the request
219 gr = greenlet.greenlet(handler)
220 pass # replaces next logging statement
221 #debug('received rpc request, greenlet %s will handle it', gr)
224 def _on_notification(self, name, args):
227 self._notification_cb(name, args)
228 pass # replaces next logging statement
229 #debug('greenlet %s finished executing', gr)
231 pass # replaces next logging statement
232 #warn("error caught while processing notification '%s %s': %s",
233 #name, args, format_exc())
235 pass # replaces next logging statement
236 #debug('greenlet %s is now dying...', gr)
238 gr = greenlet.greenlet(handler)
239 pass # replaces next logging statement
240 #debug('received rpc notification, greenlet %s will handle it', gr)
244 class ErrorResponse(BaseException):
246 """Raise this in a request handler to respond with a given error message.
248 Unlike when other exceptions are caught, this gives full control off the
249 error response sent. When "ErrorResponse(msg)" is caught "msg" will be
250 sent verbatim as the error response.No traceback will be appended.