1 # Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
2 # Copyright (C) 2013 YAMAMOTO Takashi <yamamoto at valinux co jp>
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
19 from ryu.lib import ip
22 # We don't bother to use cfg.py because monkey patch needs to be
23 # called very early. Instead, we use an environment variable to
24 # select the type of hub.
25 HUB_TYPE = os.getenv('RYU_HUB_TYPE', 'eventlet')
27 LOG = logging.getLogger('ryu.lib.hub')
29 if HUB_TYPE == 'eventlet':
32 # sleep() is the workaround for the following issue.
33 # https://github.com/eventlet/eventlet/issues/401
37 import eventlet.semaphore
38 import eventlet.timeout
40 from eventlet import websocket
47 getcurrent = eventlet.getcurrent
48 patch = eventlet.monkey_patch
49 sleep = eventlet.sleep
50 listen = eventlet.listen
51 connect = eventlet.connect
53 def spawn(*args, **kwargs):
54 raise_error = kwargs.pop('raise_error', False)
56 def _launch(func, *args, **kwargs):
57 # Mimic gevent's default raise_error=False behaviour
58 # by not propagating an exception to the joiner.
60 return func(*args, **kwargs)
63 except BaseException as e:
66 # Log uncaught exception.
67 # Note: this is an intentional divergence from gevent
68 # behaviour; gevent silently ignores such exceptions.
69 LOG.error('hub: uncaught exception: %s',
70 traceback.format_exc())
72 return eventlet.spawn(_launch, *args, **kwargs)
74 def spawn_after(seconds, *args, **kwargs):
75 raise_error = kwargs.pop('raise_error', False)
77 def _launch(func, *args, **kwargs):
78 # Mimic gevent's default raise_error=False behaviour
79 # by not propagating an exception to the joiner.
81 return func(*args, **kwargs)
84 except BaseException as e:
87 # Log uncaught exception.
88 # Note: this is an intentional divergence from gevent
89 # behaviour; gevent silently ignores such exceptions.
90 LOG.error('hub: uncaught exception: %s',
91 traceback.format_exc())
93 return eventlet.spawn_after(seconds, _launch, *args, **kwargs)
100 # This try-except is necessary when killing an inactive
107 Queue = eventlet.queue.LightQueue
108 QueueEmpty = eventlet.queue.Empty
109 Semaphore = eventlet.semaphore.Semaphore
110 BoundedSemaphore = eventlet.semaphore.BoundedSemaphore
111 TaskExit = greenlet.GreenletExit
113 class StreamServer(object):
114 def __init__(self, listen_info, handle=None, backlog=None,
115 spawn='default', **ssl_args):
116 assert backlog is None
117 assert spawn == 'default'
119 if ip.valid_ipv6(listen_info[0]):
120 self.server = eventlet.listen(listen_info,
121 family=socket.AF_INET6)
122 elif os.path.isdir(os.path.dirname(listen_info[0])):
123 # Case for Unix domain socket
124 self.server = eventlet.listen(listen_info[0],
125 family=socket.AF_UNIX)
127 self.server = eventlet.listen(listen_info)
130 def wrap_and_handle(sock, addr):
131 ssl_args.setdefault('server_side', True)
132 if 'ssl_ctx' in ssl_args:
133 ctx = ssl_args.pop('ssl_ctx')
134 ctx.load_cert_chain(ssl_args.pop('certfile'),
135 ssl_args.pop('keyfile'))
136 if 'cert_reqs' in ssl_args:
137 ctx.verify_mode = ssl_args.pop('cert_reqs')
138 if 'ca_certs' in ssl_args:
139 ctx.load_verify_locations(ssl_args.pop('ca_certs'))
140 handle(ctx.wrap_socket(sock, **ssl_args), addr)
142 handle(ssl.wrap_socket(sock, **ssl_args), addr)
144 self.handle = wrap_and_handle
148 def serve_forever(self):
150 sock, addr = self.server.accept()
151 spawn(self.handle, sock, addr)
153 class StreamClient(object):
154 def __init__(self, addr, timeout=None, **ssl_args):
155 assert ip.valid_ipv4(addr[0]) or ip.valid_ipv6(addr[0])
157 self.timeout = timeout
158 self.ssl_args = ssl_args
159 self._is_active = True
163 if self.timeout is not None:
164 client = socket.create_connection(self.addr,
165 timeout=self.timeout)
167 client = socket.create_connection(self.addr)
172 client = ssl.wrap_socket(client, **self.ssl_args)
176 def connect_loop(self, handle, interval):
177 while self._is_active:
178 sock = self.connect()
180 handle(sock, self.addr)
184 self._is_active = False
186 class LoggingWrapper(object):
187 def write(self, message):
188 LOG.info(message.rstrip('\n'))
190 class WSGIServer(StreamServer):
191 def serve_forever(self):
192 self.logger = LoggingWrapper()
193 eventlet.wsgi.server(self.server, self.handle, self.logger)
195 WebSocketWSGI = websocket.WebSocketWSGI
197 Timeout = eventlet.timeout.Timeout
201 self._ev = eventlet.event.Event()
204 def _wait(self, timeout=None):
205 while not self._cond:
208 def _broadcast(self):
210 # Since eventlet Event doesn't allow multiple send() operations
211 # on an event, re-create the underlying event.
212 # Note: _ev.reset() is obsolete.
213 self._ev = eventlet.event.Event()
225 def wait(self, timeout=None):
230 with Timeout(timeout):