1 # Copyright (C) 2013-2015 Nippon Telegraph and Telephone Corporation.
2 # Copyright (C) 2013-2015 YAMAMOTO Takashi <yamamoto at valinux co jp>
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
22 from nose.tools import raises
25 from ryu.lib import hub
26 from ryu.lib import rpc
29 class MyException(BaseException):
33 class Test_rpc(unittest.TestCase):
34 """ Test case for ryu.lib.rpc
37 def _handle_request(self, m):
38 e = self._server_endpoint
39 msgid, method, params = m
41 e.send_response(msgid, result=params[0])
43 e.send_response(msgid, error=params[0])
44 elif method == 'callback':
47 self._requests.add(e.send_request(cb, [msgid, n, cb, v]))
48 elif method == 'notify1':
49 e.send_notification(params[1], params[2])
50 e.send_response(msgid, result=params[0])
51 elif method == 'shutdown':
52 how = getattr(socket, params[0])
53 self._server_sock.shutdown(how)
54 e.send_response(msgid, result=method)
56 raise Exception("unknown method %s" % method)
58 def _handle_notification(self, m):
59 e = self._server_endpoint
61 if method == 'notify2':
62 e.send_notification(params[0], params[1])
64 def _handle_response(self, m):
65 e = self._server_endpoint
66 msgid, error, result = m
68 self._requests.remove(msgid)
69 omsgid, n, cb, v = result
72 e.send_response(omsgid, result=v)
74 self._requests.add(e.send_request(cb, [omsgid, n, cb, v]))
77 self._server_sock, self._client_sock = socket.socketpair()
79 rpc.MessageType.REQUEST: self._handle_request,
80 rpc.MessageType.RESPONSE: self._handle_response,
81 rpc.MessageType.NOTIFY: self._handle_notification
83 self._requests = set()
84 self._server_sock.setblocking(0)
85 self._server_endpoint = rpc.EndPoint(self._server_sock,
87 self._server_thread = hub.spawn(self._server_endpoint.serve)
90 hub.kill(self._server_thread)
91 hub.joinall([self._server_thread])
93 def test_0_call_str(self):
94 c = rpc.Client(self._client_sock)
96 result = c.call('resp', [obj])
98 assert isinstance(result, str)
100 def test_0_call_int(self):
101 c = rpc.Client(self._client_sock)
103 assert isinstance(obj, int)
104 result = c.call('resp', [obj])
106 assert isinstance(result, numbers.Integral)
108 def test_0_call_int2(self):
109 c = rpc.Client(self._client_sock)
111 assert isinstance(obj, int)
112 result = c.call('resp', [obj])
114 assert isinstance(result, numbers.Integral)
116 def test_0_call_int3(self):
117 c = rpc.Client(self._client_sock)
118 obj = - six.MAXSIZE - 1
119 assert isinstance(obj, int)
120 result = c.call('resp', [obj])
122 assert isinstance(result, numbers.Integral)
124 def test_0_call_long(self):
125 c = rpc.Client(self._client_sock)
126 obj = 0xffffffffffffffff # max value for msgpack
127 assert isinstance(obj, numbers.Integral)
128 result = c.call('resp', [obj])
130 assert isinstance(result, numbers.Integral)
132 def test_0_call_long2(self):
133 c = rpc.Client(self._client_sock)
134 # Note: the python type of this value is int for 64-bit arch
135 obj = -0x8000000000000000 # min value for msgpack
136 assert isinstance(obj, numbers.Integral)
137 result = c.call('resp', [obj])
139 assert isinstance(result, numbers.Integral)
141 def test_0_call_bytearray(self):
142 c = rpc.Client(self._client_sock)
143 obj = bytearray(b'foo')
144 # Note: msgpack-python version 0.50 or later supports bytearray
145 # objects, here ignores TypeError for the backward compatibility.
147 result = c.call('resp', [obj])
149 # Case with msgpack-python version 0.4.x or earlier.
151 self.assertEqual(obj, result)
152 self.assertIsInstance(result, six.binary_type)
154 def test_1_shutdown_wr(self):
155 # test if the server shutdown on disconnect
156 self._client_sock.shutdown(socket.SHUT_WR)
157 hub.joinall([self._server_thread])
160 def test_1_client_shutdown_wr(self):
161 c = rpc.Client(self._client_sock)
162 c.call('shutdown', ['SHUT_WR'])
164 def test_1_call_True(self):
165 c = rpc.Client(self._client_sock)
167 assert c.call('resp', [obj]) == obj
169 def test_2_call_None(self):
170 c = rpc.Client(self._client_sock)
172 assert c.call('resp', [obj]) is None
174 def test_2_call_False(self):
175 c = rpc.Client(self._client_sock)
177 assert c.call('resp', [obj]) == obj
179 def test_2_call_dict(self):
180 c = rpc.Client(self._client_sock)
181 obj = {'hoge': 1, 'fuga': 2}
182 assert c.call('resp', [obj]) == obj
184 def test_2_call_empty_dict(self):
185 c = rpc.Client(self._client_sock)
187 assert c.call('resp', [obj]) == obj
189 def test_2_call_array(self):
190 c = rpc.Client(self._client_sock)
192 assert c.call('resp', [obj]) == obj
194 def test_2_call_empty_array(self):
195 c = rpc.Client(self._client_sock)
197 assert c.call('resp', [obj]) == obj
199 def test_2_call_tuple(self):
200 c = rpc.Client(self._client_sock)
201 # Note: msgpack library implicitly convert a tuple into a list
203 assert c.call('resp', [obj]) == list(obj)
205 def test_2_call_unicode(self):
206 c = rpc.Client(self._client_sock)
207 # Note: We use encoding='utf-8' option in msgpack.Packer/Unpacker
208 # in order to support Python 3.
209 # With this option, utf-8 encoded bytes will be decoded into unicode
210 # type in Python 2 and str type in Python 3.
212 result = c.call('resp', [obj])
214 assert isinstance(result, six.text_type)
216 def test_2_call_small_binary(self):
217 c = rpc.Client(self._client_sock)
218 obj = struct.pack("100x")
219 result = c.call('resp', [obj])
221 assert isinstance(result, six.binary_type)
223 def test_3_call_complex(self):
224 c = rpc.Client(self._client_sock)
225 obj = [1, 'hoge', {'foo': 1, 3: 'bar'}]
226 assert c.call('resp', [obj]) == obj
228 @unittest.skip("doesn't work with eventlet 0.18 and later")
229 def test_4_call_large_binary(self):
230 c = rpc.Client(self._client_sock)
231 obj = struct.pack("10000000x")
232 result = c.call('resp', [obj])
234 assert isinstance(result, six.binary_type)
236 def test_0_notification1(self):
241 c = rpc.Client(self._client_sock, notification_callback=callback)
244 assert c.call('notify1', [robj, 'notify_hoge', [obj]]) == robj
245 c.receive_notification()
250 assert method == 'notify_hoge'
251 assert params[0] == obj
253 def test_0_notification2(self):
258 c = rpc.Client(self._client_sock, notification_callback=callback)
260 c.send_notification('notify2', ['notify_hoge', [obj]])
261 c.receive_notification()
266 assert method == 'notify_hoge'
267 assert params[0] == obj
269 def test_0_call_error(self):
270 c = rpc.Client(self._client_sock)
274 raise Exception("unexpected")
275 except rpc.RPCError as e:
276 assert e.get_value() == obj
278 def test_0_call_error_notification(self):
283 c = rpc.Client(self._client_sock, notification_callback=callback)
284 c.send_notification('notify2', ['notify_foo', []])
285 hub.sleep(0.5) # give the peer a chance to run
289 raise Exception("unexpected")
290 except rpc.RPCError as e:
291 assert e.get_value() == obj
295 assert method == 'notify_foo'
298 def test_4_async_call(self):
299 """send a bunch of requests and then wait for responses
302 old_blocking = self._client_sock.setblocking(0)
304 e = rpc.EndPoint(self._client_sock)
306 for i in range(1, num_calls + 1):
307 s.add(e.send_request('resp', [i]))
314 r = e.get_response(x)
321 assert done.issubset(s)
323 assert sum == (1 + num_calls) * num_calls / 2
325 self._client_sock.setblocking(old_blocking)
327 def test_4_async_call2(self):
328 """both sides act as rpc client and server
330 assert not self._requests
332 old_blocking = self._client_sock.setblocking(0)
334 e = rpc.EndPoint(self._client_sock)
336 for i in range(1, num_calls + 1):
337 s.add(e.send_request('callback', [i, 'ourcallback', 0]))
344 r = e.get_response(x)
351 assert done.issubset(s)
355 msgid, method, params = r
356 assert method == 'ourcallback'
357 omsgid, n, cb, v = params
359 assert cb == 'ourcallback'
361 e.send_response(msgid, result=[omsgid, n - 1, cb, v + 1])
362 assert sum == (1 + num_calls) * num_calls / 2
364 self._client_sock.setblocking(old_blocking)
365 assert not self._requests