1 """Asynchronous msgpack-rpc handling in the event loop pipeline."""
3 from traceback import format_exc
6 logger = logging.getLogger(__name__)
7 debug, info, warn = (logger.debug, logger.info, logger.warning,)
10 class AsyncSession(object):
12 """Asynchronous msgpack-rpc layer that wraps a msgpack stream.
14 This wraps the msgpack stream interface for reading/writing msgpack
15 documents and exposes an interface for sending and receiving msgpack-rpc
16 requests and notifications.
19 def __init__(self, msgpack_stream):
20 """Wrap `msgpack_stream` on a msgpack-rpc interface."""
21 self._msgpack_stream = msgpack_stream
22 self._next_request_id = 1
23 self._pending_requests = {}
24 self._request_cb = self._notification_cb = None
28 2: self._on_notification
30 self.loop = msgpack_stream.loop
32 def threadsafe_call(self, fn):
33 """Wrapper around `MsgpackStream.threadsafe_call`."""
34 self._msgpack_stream.threadsafe_call(fn)
36 def request(self, method, args, response_cb):
37 """Send a msgpack-rpc request to Nvim.
39 A msgpack-rpc with method `method` and argument `args` is sent to
40 Nvim. The `response_cb` function is called with when the response
43 request_id = self._next_request_id
44 self._next_request_id = request_id + 1
45 self._msgpack_stream.send([0, request_id, method, args])
46 self._pending_requests[request_id] = response_cb
48 def notify(self, method, args):
49 """Send a msgpack-rpc notification to Nvim.
51 A msgpack-rpc with method `method` and argument `args` is sent to
52 Nvim. This will have the same effect as a request, but no response
55 self._msgpack_stream.send([2, method, args])
57 def run(self, request_cb, notification_cb):
58 """Run the event loop to receive requests and notifications from Nvim.
60 While the event loop is running, `request_cb` and `notification_cb`
61 will be called whenever requests or notifications are respectively
64 self._request_cb = request_cb
65 self._notification_cb = notification_cb
66 self._msgpack_stream.run(self._on_message)
67 self._request_cb = None
68 self._notification_cb = None
71 """Stop the event loop."""
72 self._msgpack_stream.stop()
75 """Close the event loop."""
76 self._msgpack_stream.close()
78 def _on_message(self, msg):
80 self._handlers.get(msg[0], self._on_invalid_message)(msg)
82 err_str = format_exc(5)
83 pass # replaces next logging statement
85 self._msgpack_stream.send([1, 0, err_str, None])
87 def _on_request(self, msg):
90 # - msg[2]: method name
92 pass # replaces next logging statement
93 #debug('received request: %s, %s', msg[2], msg[3])
94 self._request_cb(msg[2], msg[3], Response(self._msgpack_stream,
97 def _on_response(self, msg):
98 # response to a previous request:
100 # - msg[2]: error(if any)
101 # - msg[3]: result(if not errored)
102 pass # replaces next logging statement
103 #debug('received response: %s, %s', msg[2], msg[3])
104 self._pending_requests.pop(msg[1])(msg[2], msg[3])
106 def _on_notification(self, msg):
108 # - msg[1]: event name
109 # - msg[2]: arguments
110 pass # replaces next logging statement
111 #debug('received notification: %s, %s', msg[1], msg[2])
112 self._notification_cb(msg[1], msg[2])
114 def _on_invalid_message(self, msg):
115 error = 'Received invalid message %s' % msg
116 pass # replaces next logging statement
118 self._msgpack_stream.send([1, 0, error, None])
121 class Response(object):
123 """Response to a msgpack-rpc request that came from Nvim.
125 When Nvim sends a msgpack-rpc request, an instance of this class is
126 created for remembering state required to send a response.
129 def __init__(self, msgpack_stream, request_id):
130 """Initialize the Response instance."""
131 self._msgpack_stream = msgpack_stream
132 self._request_id = request_id
134 def send(self, value, error=False):
135 """Send the response.
137 If `error` is True, it will be sent as an error.
140 resp = [1, self._request_id, value, None]
142 resp = [1, self._request_id, None, value]
143 pass # replaces next logging statement
144 #debug('sending response to request %d: %s', self._request_id, resp)
145 self._msgpack_stream.send(resp)