backing up
[vsorcdistro/.git] / ryu / build / lib.linux-armv7l-2.7 / ryu / tests / unit / lib / test_hub.py
1 # Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
2 # Copyright (C) 2013 YAMAMOTO Takashi <yamamoto at valinux co jp>
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #    http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16
17 import time
18 import unittest
19 from nose.tools import raises
20
21 from ryu.lib import hub
22 hub.patch()
23
24
25 class MyException(BaseException):
26     pass
27
28
29 class Test_hub(unittest.TestCase):
30     """ Test case for ryu.lib.hub
31     """
32
33     def setUp(self):
34         pass
35
36     def tearDown(self):
37         pass
38
39     # we want to test timeout first because the rest of tests rely on it.
40     # thus test_0_ prefix.
41
42     @raises(hub.Timeout)
43     def test_0_timeout1(self):
44         with hub.Timeout(0.1):
45             hub.sleep(1)
46
47     @raises(MyException)
48     def test_0_timeout2(self):
49         with hub.Timeout(0.1, MyException):
50             hub.sleep(1)
51
52     def test_0_timeout3(self):
53         with hub.Timeout(1):
54             hub.sleep(0.1)
55         # sleep some more to ensure timer cancelation
56         hub.sleep(2)
57
58     def test_spawn_event1(self):
59         def _child(ev, result):
60             hub.sleep(1)
61             result.append(1)
62             ev.set()
63
64         ev = hub.Event()
65         result = []
66         with hub.Timeout(2):
67             hub.spawn(_child, ev, result)
68             ev.wait()
69         assert len(result) == 1
70
71     def test_spawn_event2(self):
72         def _child(ev, result):
73             hub.sleep(1)
74             result.append(1)
75             ev.set()
76
77         ev = hub.Event()
78         result = []
79         with hub.Timeout(2):
80             t = hub.spawn(_child, ev, result)
81             ev.wait(timeout=0.5)
82         assert len(result) == 0
83         ev.wait()
84         assert len(result) == 1
85
86     def test_spawn_event3(self):
87         def _child(ev, ev2, result):
88             ev2.wait()
89             hub.sleep(0.5)
90             result.append(1)
91             ev.set()
92
93         ev = hub.Event()
94         ev2 = hub.Event()
95         result = []
96         with hub.Timeout(2):
97             hub.spawn(_child, ev, ev2, result)
98             hub.spawn(_child, ev, ev2, result)
99             hub.sleep(0.5)
100             ev2.set()  # this should wake up the above created two threads
101             ev.wait(timeout=1)
102         assert len(result) == 2
103
104     def test_spawn_select1(self):
105         import select
106         import socket
107
108         def _child(s1):
109             hub.sleep(0.5)
110             s1.send(b"hoge")
111
112         s1, s2 = socket.socketpair()
113         with hub.Timeout(1):
114             hub.spawn(_child, s1)
115             select.select([s2.fileno()], [], [])
116             select.select([s2.fileno()], [], [])  # return immediately
117
118     @raises(MyException)
119     def test_select1(self):
120         import select
121         import socket
122
123         s1, s2 = socket.socketpair()
124         with hub.Timeout(1, MyException):
125             select.select([s2.fileno()], [], [])
126
127     def test_select2(self):
128         import select
129
130         with hub.Timeout(1, MyException):
131             select.select([], [], [], 0)  # timeout immediately
132
133     def test_select3(self):
134         import select
135         import socket
136
137         s1, s2 = socket.socketpair()
138         with hub.Timeout(1, MyException):
139             list = [s1.fileno(), s2.fileno()]
140             rlist, wlist, xlist = select.select(list, list, list)
141             assert not s1.fileno() in rlist
142             assert not s2.fileno() in rlist
143             # the following two assertions are commented out because one of
144             # them fails with eventlet-patched select.
145             #       assert s1.fileno() in wlist
146             #       assert s2.fileno() in wlist
147             # note: eventlet-patched select returns at most one file.
148             assert (s1.fileno() in wlist) or (s2.fileno() in wlist)
149             assert not s1.fileno() in xlist
150             assert not s2.fileno() in xlist
151
152     def test_spawn_joinall(self):
153         def _child(ev2, result):
154             ev2.wait()
155             hub.sleep(0.5)
156             result.append(1)
157             raise BaseException("this exception should not be propagated")
158
159         ev2 = hub.Event()
160         threads = []
161         result = []
162         with hub.Timeout(2):
163             threads.append(hub.spawn(_child, ev2, result))
164             threads.append(hub.spawn(_child, ev2, result))
165             hub.sleep(0.5)
166             ev2.set()  # this should wake up the above created two threads
167             hub.joinall(threads)
168         assert len(result) == 2
169
170     def test_spawn_kill_joinall(self):
171         def _child(ev2, result):
172             ev2.wait()
173             result.append(1)
174
175         ev2 = hub.Event()
176         threads = []
177         result = []
178         with hub.Timeout(2):
179             threads.append(hub.spawn(_child, ev2, result))
180             threads.append(hub.spawn(_child, ev2, result))
181             hub.sleep(0.5)
182             for t in threads:
183                 hub.kill(t)
184             hub.joinall(threads)
185         assert len(result) == 0
186
187     def test_spawn_kill_nowait_joinall(self):
188         # XXX this test relies on the scheduling behaviour.
189         # the intention here is, killing threads before they get active.
190
191         def _child(result):
192             result.append(1)
193
194         threads = []
195         result = []
196         with hub.Timeout(2):
197             threads.append(hub.spawn(_child, result))
198             for t in threads:
199                 hub.kill(t)
200             hub.joinall(threads)
201         assert len(result) == 0
202
203     def test_spawn_kill_die_joinall(self):
204         def _child(result):
205             result.append(1)
206
207         threads = []
208         result = []
209         with hub.Timeout(2):
210             threads.append(hub.spawn(_child, result))
211             threads.append(hub.spawn(_child, result))
212             hub.sleep(0.5)
213             for t in threads:
214                 hub.kill(t)
215             hub.joinall(threads)
216         assert len(result) == 2
217
218     def test_spawn_exception_joinall(self):
219         def _child():
220             raise Exception("hoge")
221
222         threads = []
223         with hub.Timeout(2):
224             threads.append(hub.spawn(_child))
225             threads.append(hub.spawn(_child))
226             hub.sleep(0.5)
227             hub.joinall(threads)
228
229     def test_event1(self):
230         ev = hub.Event()
231         ev.set()
232         with hub.Timeout(1):
233             ev.wait()  # should return immediately
234
235     def test_event2(self):
236         ev = hub.Event()
237         # allow multiple sets unlike eventlet Event
238         ev.set()
239         ev.set()