1 # Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
2 # Copyright (C) 2013 YAMAMOTO Takashi <yamamoto at valinux co jp>
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
19 from nose.tools import raises
21 from ryu.lib import hub
25 class MyException(BaseException):
29 class Test_hub(unittest.TestCase):
30 """ Test case for ryu.lib.hub
39 # we want to test timeout first because the rest of tests rely on it.
40 # thus test_0_ prefix.
43 def test_0_timeout1(self):
44 with hub.Timeout(0.1):
48 def test_0_timeout2(self):
49 with hub.Timeout(0.1, MyException):
52 def test_0_timeout3(self):
55 # sleep some more to ensure timer cancelation
58 def test_spawn_event1(self):
59 def _child(ev, result):
67 hub.spawn(_child, ev, result)
69 assert len(result) == 1
71 def test_spawn_event2(self):
72 def _child(ev, result):
80 t = hub.spawn(_child, ev, result)
82 assert len(result) == 0
84 assert len(result) == 1
86 def test_spawn_event3(self):
87 def _child(ev, ev2, result):
97 hub.spawn(_child, ev, ev2, result)
98 hub.spawn(_child, ev, ev2, result)
100 ev2.set() # this should wake up the above created two threads
102 assert len(result) == 2
104 def test_spawn_select1(self):
112 s1, s2 = socket.socketpair()
114 hub.spawn(_child, s1)
115 select.select([s2.fileno()], [], [])
116 select.select([s2.fileno()], [], []) # return immediately
119 def test_select1(self):
123 s1, s2 = socket.socketpair()
124 with hub.Timeout(1, MyException):
125 select.select([s2.fileno()], [], [])
127 def test_select2(self):
130 with hub.Timeout(1, MyException):
131 select.select([], [], [], 0) # timeout immediately
133 def test_select3(self):
137 s1, s2 = socket.socketpair()
138 with hub.Timeout(1, MyException):
139 list = [s1.fileno(), s2.fileno()]
140 rlist, wlist, xlist = select.select(list, list, list)
141 assert not s1.fileno() in rlist
142 assert not s2.fileno() in rlist
143 # the following two assertions are commented out because one of
144 # them fails with eventlet-patched select.
145 # assert s1.fileno() in wlist
146 # assert s2.fileno() in wlist
147 # note: eventlet-patched select returns at most one file.
148 assert (s1.fileno() in wlist) or (s2.fileno() in wlist)
149 assert not s1.fileno() in xlist
150 assert not s2.fileno() in xlist
152 def test_spawn_joinall(self):
153 def _child(ev2, result):
157 raise BaseException("this exception should not be propagated")
163 threads.append(hub.spawn(_child, ev2, result))
164 threads.append(hub.spawn(_child, ev2, result))
166 ev2.set() # this should wake up the above created two threads
168 assert len(result) == 2
170 def test_spawn_kill_joinall(self):
171 def _child(ev2, result):
179 threads.append(hub.spawn(_child, ev2, result))
180 threads.append(hub.spawn(_child, ev2, result))
185 assert len(result) == 0
187 def test_spawn_kill_nowait_joinall(self):
188 # XXX this test relies on the scheduling behaviour.
189 # the intention here is, killing threads before they get active.
197 threads.append(hub.spawn(_child, result))
201 assert len(result) == 0
203 def test_spawn_kill_die_joinall(self):
210 threads.append(hub.spawn(_child, result))
211 threads.append(hub.spawn(_child, result))
216 assert len(result) == 2
218 def test_spawn_exception_joinall(self):
220 raise Exception("hoge")
224 threads.append(hub.spawn(_child))
225 threads.append(hub.spawn(_child))
229 def test_event1(self):
233 ev.wait() # should return immediately
235 def test_event2(self):
237 # allow multiple sets unlike eventlet Event