backing up
[vsorcdistro/.git] / ryu / build / lib.linux-armv7l-2.7 / ryu / lib / hub.py
1 # Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
2 # Copyright (C) 2013 YAMAMOTO Takashi <yamamoto at valinux co jp>
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #    http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16
17 import logging
18 import os
19 from ryu.lib import ip
20
21
22 # We don't bother to use cfg.py because monkey patch needs to be
23 # called very early. Instead, we use an environment variable to
24 # select the type of hub.
25 HUB_TYPE = os.getenv('RYU_HUB_TYPE', 'eventlet')
26
27 LOG = logging.getLogger('ryu.lib.hub')
28
29 if HUB_TYPE == 'eventlet':
30     import eventlet
31     # HACK:
32     # sleep() is the workaround for the following issue.
33     # https://github.com/eventlet/eventlet/issues/401
34     eventlet.sleep()
35     import eventlet.event
36     import eventlet.queue
37     import eventlet.semaphore
38     import eventlet.timeout
39     import eventlet.wsgi
40     from eventlet import websocket
41     import greenlet
42     import ssl
43     import socket
44     import traceback
45     import sys
46
47     getcurrent = eventlet.getcurrent
48     patch = eventlet.monkey_patch
49     sleep = eventlet.sleep
50     listen = eventlet.listen
51     connect = eventlet.connect
52
53     def spawn(*args, **kwargs):
54         raise_error = kwargs.pop('raise_error', False)
55
56         def _launch(func, *args, **kwargs):
57             # Mimic gevent's default raise_error=False behaviour
58             # by not propagating an exception to the joiner.
59             try:
60                 return func(*args, **kwargs)
61             except TaskExit:
62                 pass
63             except BaseException as e:
64                 if raise_error:
65                     raise e
66                 # Log uncaught exception.
67                 # Note: this is an intentional divergence from gevent
68                 # behaviour; gevent silently ignores such exceptions.
69                 LOG.error('hub: uncaught exception: %s',
70                           traceback.format_exc())
71
72         return eventlet.spawn(_launch, *args, **kwargs)
73
74     def spawn_after(seconds, *args, **kwargs):
75         raise_error = kwargs.pop('raise_error', False)
76
77         def _launch(func, *args, **kwargs):
78             # Mimic gevent's default raise_error=False behaviour
79             # by not propagating an exception to the joiner.
80             try:
81                 return func(*args, **kwargs)
82             except TaskExit:
83                 pass
84             except BaseException as e:
85                 if raise_error:
86                     raise e
87                 # Log uncaught exception.
88                 # Note: this is an intentional divergence from gevent
89                 # behaviour; gevent silently ignores such exceptions.
90                 LOG.error('hub: uncaught exception: %s',
91                           traceback.format_exc())
92
93         return eventlet.spawn_after(seconds, _launch, *args, **kwargs)
94
95     def kill(thread):
96         thread.kill()
97
98     def joinall(threads):
99         for t in threads:
100             # This try-except is necessary when killing an inactive
101             # greenthread.
102             try:
103                 t.wait()
104             except TaskExit:
105                 pass
106
107     Queue = eventlet.queue.LightQueue
108     QueueEmpty = eventlet.queue.Empty
109     Semaphore = eventlet.semaphore.Semaphore
110     BoundedSemaphore = eventlet.semaphore.BoundedSemaphore
111     TaskExit = greenlet.GreenletExit
112
113     class StreamServer(object):
114         def __init__(self, listen_info, handle=None, backlog=None,
115                      spawn='default', **ssl_args):
116             assert backlog is None
117             assert spawn == 'default'
118
119             if ip.valid_ipv6(listen_info[0]):
120                 self.server = eventlet.listen(listen_info,
121                                               family=socket.AF_INET6)
122             elif os.path.isdir(os.path.dirname(listen_info[0])):
123                 # Case for Unix domain socket
124                 self.server = eventlet.listen(listen_info[0],
125                                               family=socket.AF_UNIX)
126             else:
127                 self.server = eventlet.listen(listen_info)
128
129             if ssl_args:
130                 def wrap_and_handle(sock, addr):
131                     ssl_args.setdefault('server_side', True)
132                     if 'ssl_ctx' in ssl_args:
133                         ctx = ssl_args.pop('ssl_ctx')
134                         ctx.load_cert_chain(ssl_args.pop('certfile'),
135                                             ssl_args.pop('keyfile'))
136                         if 'cert_reqs' in ssl_args:
137                             ctx.verify_mode = ssl_args.pop('cert_reqs')
138                         if 'ca_certs' in ssl_args:
139                             ctx.load_verify_locations(ssl_args.pop('ca_certs'))
140                         handle(ctx.wrap_socket(sock, **ssl_args), addr)
141                     else:
142                         handle(ssl.wrap_socket(sock, **ssl_args), addr)
143
144                 self.handle = wrap_and_handle
145             else:
146                 self.handle = handle
147
148         def serve_forever(self):
149             while True:
150                 sock, addr = self.server.accept()
151                 spawn(self.handle, sock, addr)
152
153     class StreamClient(object):
154         def __init__(self, addr, timeout=None, **ssl_args):
155             assert ip.valid_ipv4(addr[0]) or ip.valid_ipv6(addr[0])
156             self.addr = addr
157             self.timeout = timeout
158             self.ssl_args = ssl_args
159             self._is_active = True
160
161         def connect(self):
162             try:
163                 if self.timeout is not None:
164                     client = socket.create_connection(self.addr,
165                                                       timeout=self.timeout)
166                 else:
167                     client = socket.create_connection(self.addr)
168             except socket.error:
169                 return None
170
171             if self.ssl_args:
172                 client = ssl.wrap_socket(client, **self.ssl_args)
173
174             return client
175
176         def connect_loop(self, handle, interval):
177             while self._is_active:
178                 sock = self.connect()
179                 if sock:
180                     handle(sock, self.addr)
181                 sleep(interval)
182
183         def stop(self):
184             self._is_active = False
185
186     class LoggingWrapper(object):
187         def write(self, message):
188             LOG.info(message.rstrip('\n'))
189
190     class WSGIServer(StreamServer):
191         def serve_forever(self):
192             self.logger = LoggingWrapper()
193             eventlet.wsgi.server(self.server, self.handle, self.logger)
194
195     WebSocketWSGI = websocket.WebSocketWSGI
196
197     Timeout = eventlet.timeout.Timeout
198
199     class Event(object):
200         def __init__(self):
201             self._ev = eventlet.event.Event()
202             self._cond = False
203
204         def _wait(self, timeout=None):
205             while not self._cond:
206                 self._ev.wait()
207
208         def _broadcast(self):
209             self._ev.send()
210             # Since eventlet Event doesn't allow multiple send() operations
211             # on an event, re-create the underlying event.
212             # Note: _ev.reset() is obsolete.
213             self._ev = eventlet.event.Event()
214
215         def is_set(self):
216             return self._cond
217
218         def set(self):
219             self._cond = True
220             self._broadcast()
221
222         def clear(self):
223             self._cond = False
224
225         def wait(self, timeout=None):
226             if timeout is None:
227                 self._wait()
228             else:
229                 try:
230                     with Timeout(timeout):
231                         self._wait()
232                 except Timeout:
233                     pass
234
235             return self._cond