1 # Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
2 # Copyright (C) 2013 YAMAMOTO Takashi <yamamoto at valinux co jp>
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
19 # https://github.com/msgpack/msgpack/blob/master/spec.md
21 # https://github.com/msgpack-rpc/msgpack-rpc/blob/master/spec.md
23 from collections import deque
30 class MessageType(object):
36 class MessageEncoder(object):
37 """msgpack-rpc encoder/decoder.
38 intended to be transport-agnostic.
42 super(MessageEncoder, self).__init__()
43 self._packer = msgpack.Packer(encoding='utf-8', use_bin_type=True)
44 self._unpacker = msgpack.Unpacker(encoding='utf-8')
47 def _create_msgid(self):
48 this_id = self._next_msgid
49 self._next_msgid = (self._next_msgid + 1) % 0xffffffff
52 def create_request(self, method, params):
53 assert isinstance(method, (str, six.binary_type))
54 assert isinstance(params, list)
55 msgid = self._create_msgid()
56 return (self._packer.pack(
57 [MessageType.REQUEST, msgid, method, params]), msgid)
59 def create_response(self, msgid, error=None, result=None):
60 assert isinstance(msgid, int)
61 assert 0 <= msgid <= 0xffffffff
62 assert error is None or result is None
63 return self._packer.pack([MessageType.RESPONSE, msgid, error, result])
65 def create_notification(self, method, params):
66 assert isinstance(method, (str, six.binary_type))
67 assert isinstance(params, list)
68 return self._packer.pack([MessageType.NOTIFY, method, params])
70 def get_and_dispatch_messages(self, data, disp_table):
71 """dissect messages from a raw stream data.
72 disp_table[type] should be a callable for the corresponding
75 self._unpacker.feed(data)
76 for m in self._unpacker:
77 self._dispatch_message(m, disp_table)
80 def _dispatch_message(m, disp_table):
86 # ignore messages with unknown type
91 class EndPoint(object):
93 *sock* is a socket-like. it can be either blocking or non-blocking.
96 def __init__(self, sock, encoder=None, disp_table=None):
98 encoder = MessageEncoder()
99 self._encoder = encoder
101 if disp_table is None:
103 MessageType.REQUEST: self._enqueue_incoming_request,
104 MessageType.RESPONSE: self._enqueue_incoming_response,
105 MessageType.NOTIFY: self._enqueue_incoming_notification
108 self._table = disp_table
109 self._send_buffer = bytearray()
110 # msgids for which we sent a request but have not received a response
111 self._pending_requests = set()
112 # queues for incoming messages
113 self._requests = deque()
114 self._notifications = deque()
116 self._incoming = 0 # number of incoming messages in our queues
117 self._closed_by_peer = False
119 def selectable(self):
122 if self._send_buffer:
123 wlist.append(self._sock)
126 def process_outgoing(self):
128 sent_bytes = self._sock.send(self._send_buffer)
131 del self._send_buffer[:sent_bytes]
133 def process_incoming(self):
134 self.receive_messages(all=True)
137 self.process_outgoing()
138 self.process_incoming()
141 rlist, wlist = self.selectable()
142 select.select(rlist, wlist, rlist + wlist)
145 while not self._closed_by_peer:
149 def _send_message(self, msg):
150 self._send_buffer += msg
151 self.process_outgoing()
153 def send_request(self, method, params):
156 msg, msgid = self._encoder.create_request(method, params)
157 self._send_message(msg)
158 self._pending_requests.add(msgid)
161 def send_response(self, msgid, error=None, result=None):
164 msg = self._encoder.create_response(msgid, error, result)
165 self._send_message(msg)
167 def send_notification(self, method, params):
168 """Send a notification
170 msg = self._encoder.create_notification(method, params)
171 self._send_message(msg)
173 def receive_messages(self, all=False):
174 """Try to receive some messages.
175 Received messages are put on the internal queues.
176 They can be retrieved using get_xxx() methods.
177 Returns True if there's something queued for get_xxx() methods.
179 while all or self._incoming == 0:
181 packet = self._sock.recv(4096) # XXX the size is arbitrary
185 if packet is not None:
186 # socket closed by peer
187 self._closed_by_peer = True
189 self._encoder.get_and_dispatch_messages(packet, self._table)
190 return self._incoming > 0
192 def _enqueue_incoming_request(self, m):
193 self._requests.append(m)
196 def _enqueue_incoming_response(self, m):
197 msgid, error, result = m
199 self._pending_requests.remove(msgid)
204 assert msgid not in self._responses
205 self._responses[msgid] = (error, result)
208 def _enqueue_incoming_notification(self, m):
209 self._notifications.append(m)
212 def _get_message(self, q):
215 assert self._incoming > 0
221 def get_request(self):
222 return self._get_message(self._requests)
224 def get_response(self, msgid):
226 m = self._responses.pop(msgid)
227 assert self._incoming > 0
234 def get_notification(self):
235 return self._get_message(self._notifications)
238 class RPCError(Exception):
239 """an error from server
242 def __init__(self, error):
243 super(RPCError, self).__init__()
250 return str(self._error)
253 class Client(object):
254 """a convenient class for a pure rpc client
255 *sock* is a socket-like. it should be blocking.
258 def __init__(self, sock, encoder=None, notification_callback=None):
259 self._endpoint = EndPoint(sock, encoder)
260 if notification_callback is None:
261 # ignore notifications by default
262 self._notification_callback = lambda n: None
264 self._notification_callback = notification_callback
266 def _process_input_notification(self):
267 n = self._endpoint.get_notification()
269 self._notification_callback(n)
271 def _process_input_request(self):
272 # ignore requests as we are a pure client
274 self._endpoint.get_request()
276 def call(self, method, params):
278 send a request and wait for a response.
279 return a result. or raise RPCError exception if the peer
282 msgid = self._endpoint.send_request(method, params)
284 if not self._endpoint.receive_messages():
285 raise EOFError("EOF")
286 res = self._endpoint.get_response(msgid)
291 raise RPCError(error)
292 self._process_input_notification()
293 self._process_input_request()
295 def send_notification(self, method, params):
296 """send a notification to the peer.
298 self._endpoint.send_notification(method, params)
300 def receive_notification(self):
301 """wait for the next incoming message.
302 intended to be used when we have nothing to send but want to receive
305 if not self._endpoint.receive_messages():
306 raise EOFError("EOF")
307 self._process_input_notification()
308 self._process_input_request()
310 def peek_notification(self):
312 rlist, _wlist = self._endpoint.selectable()
313 rlist, _wlist, _xlist = select.select(rlist, [], [], 0)
316 self.receive_notification()