1 # Copyright (C) 2011-2014 Nippon Telegraph and Telephone Corporation.
2 # Copyright (C) 2011 Isaku Yamahata <yamahata at valinux co jp>
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
18 The central management of Ryu applications.
20 - Load Ryu applications
21 - Provide `contexts` to Ryu applications
22 - Route messages among Ryu applications
35 from ryu.app import wsgi
36 from ryu.controller.handler import register_instance, get_dependent_services
37 from ryu.controller.controller import Datapath
38 from ryu.controller import event
39 from ryu.controller.event import EventRequestBase, EventReplyBase
40 from ryu.lib import hub
41 from ryu.ofproto import ofproto_protocol
43 LOG = logging.getLogger('ryu.base.app_manager')
48 def lookup_service_brick(name):
49 return SERVICE_BRICKS.get(name)
52 def _lookup_service_brick_by_ev_cls(ev_cls):
53 return _lookup_service_brick_by_mod_name(ev_cls.__module__)
56 def _lookup_service_brick_by_mod_name(mod_name):
57 return lookup_service_brick(mod_name.split('.')[-1])
60 def register_app(app):
61 assert isinstance(app, RyuApp)
62 assert app.name not in SERVICE_BRICKS
63 SERVICE_BRICKS[app.name] = app
64 register_instance(app)
67 def unregister_app(app):
68 SERVICE_BRICKS.pop(app.name)
71 def require_app(app_name, api_style=False):
73 Request the application to be automatically loaded.
75 If this is used for "api" style modules, which is imported by a client
76 application, set api_style=True.
78 If this is used for client application module, set api_style=False.
80 iterable = (inspect.getmodule(frame[0]) for frame in inspect.stack())
81 modules = [module for module in iterable if module is not None]
83 m = modules[2] # skip a frame for "api" module
86 m._REQUIRED_APP = getattr(m, '_REQUIRED_APP', [])
87 m._REQUIRED_APP.append(app_name)
88 LOG.debug('require_app: %s is required by %s', app_name, m.__name__)
93 The base class for Ryu applications.
95 RyuApp subclasses are instantiated after ryu-manager loaded
96 all requested Ryu application modules.
97 __init__ should call RyuApp.__init__ with the same arguments.
98 It's illegal to send any events in __init__.
100 The instance attribute 'name' is the name of the class used for
101 message routing among Ryu applications. (Cf. send_event)
102 It's set to __class__.__name__ by RyuApp.__init__.
103 It's discouraged for subclasses to override this.
108 A dictionary to specify contexts which this Ryu application wants to use.
109 Its key is a name of context and its value is an ordinary class
110 which implements the context. The class is instantiated by app_manager
111 and the instance is shared among RyuApp subclasses which has _CONTEXTS
112 member with the same key. A RyuApp subclass can obtain a reference to
113 the instance via its __init__'s kwargs as the following.
118 'network': network.Network
121 def __init__(self, *args, *kwargs):
122 self.network = kwargs['network']
127 A list of event classes which this RyuApp subclass would generate.
128 This should be specified if and only if event classes are defined in
129 a different python module from the RyuApp subclass is.
134 A list of supported OpenFlow versions for this RyuApp.
135 The default is all versions supported by the framework.
139 OFP_VERSIONS = [ofproto_v1_0.OFP_VERSION,
140 ofproto_v1_2.OFP_VERSION]
142 If multiple Ryu applications are loaded in the system,
143 the intersection of their OFP_VERSIONS is used.
147 def context_iteritems(cls):
149 Return iterator over the (key, contxt class) of application context
151 return iter(cls._CONTEXTS.items())
153 def __init__(self, *_args, **_kwargs):
154 super(RyuApp, self).__init__()
155 self.name = self.__class__.__name__
156 self.event_handlers = {} # ev_cls -> handlers:list
157 self.observers = {} # ev_cls -> observer-name -> states:set
159 self.main_thread = None
160 self.events = hub.Queue(128)
161 self._events_sem = hub.BoundedSemaphore(self.events.maxsize)
162 if hasattr(self.__class__, 'LOGGER_NAME'):
163 self.logger = logging.getLogger(self.__class__.LOGGER_NAME)
165 self.logger = logging.getLogger(self.name)
168 # prevent accidental creation of instances of this class outside RyuApp
169 class _EventThreadStop(event.EventBase):
171 self._event_stop = _EventThreadStop()
172 self.is_active = True
176 Hook that is called after startup initialization is done.
178 self.threads.append(hub.spawn(self._event_loop))
182 hub.kill(self.main_thread)
183 self.is_active = False
184 self._send_event(self._event_stop, None)
185 hub.joinall(self.threads)
187 def set_main_thread(self, thread):
189 Set self.main_thread so that stop() can terminate it.
191 Only AppManager.instantiate_apps should call this function.
193 self.main_thread = thread
195 def register_handler(self, ev_cls, handler):
196 assert callable(handler)
197 self.event_handlers.setdefault(ev_cls, [])
198 self.event_handlers[ev_cls].append(handler)
200 def unregister_handler(self, ev_cls, handler):
201 assert callable(handler)
202 self.event_handlers[ev_cls].remove(handler)
203 if not self.event_handlers[ev_cls]:
204 del self.event_handlers[ev_cls]
206 def register_observer(self, ev_cls, name, states=None):
207 states = states or set()
208 ev_cls_observers = self.observers.setdefault(ev_cls, {})
209 ev_cls_observers.setdefault(name, set()).update(states)
211 def unregister_observer(self, ev_cls, name):
212 observers = self.observers.get(ev_cls, {})
215 def unregister_observer_all_event(self, name):
216 for observers in self.observers.values():
217 observers.pop(name, None)
219 def observe_event(self, ev_cls, states=None):
220 brick = _lookup_service_brick_by_ev_cls(ev_cls)
221 if brick is not None:
222 brick.register_observer(ev_cls, self.name, states)
224 def unobserve_event(self, ev_cls):
225 brick = _lookup_service_brick_by_ev_cls(ev_cls)
226 if brick is not None:
227 brick.unregister_observer(ev_cls, self.name)
229 def get_handlers(self, ev, state=None):
230 """Returns a list of handlers for the specific event.
232 :param ev: The event to handle.
233 :param state: The current state. ("dispatcher")
234 If None is given, returns all handlers for the event.
235 Otherwise, returns only handlers that are interested
236 in the specified state.
239 ev_cls = ev.__class__
240 handlers = self.event_handlers.get(ev_cls, [])
245 if not hasattr(h, 'callers') or ev_cls not in h.callers:
246 # dynamically registered handlers does not have
247 # h.callers element for the event.
249 states = h.callers[ev_cls].dispatchers
251 # empty states means all states
253 return state in states
255 return filter(test, handlers)
257 def get_observers(self, ev, state):
259 for k, v in self.observers.get(ev.__class__, {}).items():
260 if not state or not v or state in v:
265 def send_request(self, req):
267 Make a synchronous request.
268 Set req.sync to True, send it to a Ryu application specified by
269 req.dst, and block until receiving a reply.
270 Returns the received reply.
271 The argument should be an instance of EventRequestBase.
274 assert isinstance(req, EventRequestBase)
276 req.reply_q = hub.Queue()
277 self.send_event(req.dst, req)
278 # going to sleep for the reply
279 return req.reply_q.get()
281 def _event_loop(self):
282 while self.is_active or not self.events.empty():
283 ev, state = self.events.get()
284 self._events_sem.release()
285 if ev == self._event_stop:
287 handlers = self.get_handlers(ev, state)
288 for handler in handlers:
293 # Propagate upwards, so we leave the event loop.
296 LOG.exception('%s: Exception occurred during handler processing. '
297 'Backtrace from offending handler '
298 '[%s] servicing event [%s] follows.',
299 self.name, handler.__name__, ev.__class__.__name__)
301 def _send_event(self, ev, state):
302 self._events_sem.acquire()
303 self.events.put((ev, state))
305 def send_event(self, name, ev, state=None):
307 Send the specified event to the RyuApp instance specified by name.
310 if name in SERVICE_BRICKS:
311 if isinstance(ev, EventRequestBase):
313 LOG.debug("EVENT %s->%s %s",
314 self.name, name, ev.__class__.__name__)
315 SERVICE_BRICKS[name]._send_event(ev, state)
317 LOG.debug("EVENT LOST %s->%s %s",
318 self.name, name, ev.__class__.__name__)
320 def send_event_to_observers(self, ev, state=None):
322 Send the specified event to all observers of this RyuApp.
325 for observer in self.get_observers(ev, state):
326 self.send_event(observer, ev, state)
328 def reply_to_request(self, req, rep):
330 Send a reply for a synchronous request sent by send_request.
331 The first argument should be an instance of EventRequestBase.
332 The second argument should be an instance of EventReplyBase.
335 assert isinstance(req, EventRequestBase)
336 assert isinstance(rep, EventReplyBase)
341 self.send_event(rep.dst, rep)
346 The method name, close, is chosen for python context manager
351 class AppManager(object):
356 def run_apps(app_lists):
357 """Run a set of Ryu applications
359 A convenient method to load and instantiate apps.
360 This blocks until all relevant apps stop.
362 app_mgr = AppManager.get_instance()
363 app_mgr.load_apps(app_lists)
364 contexts = app_mgr.create_contexts()
365 services = app_mgr.instantiate_apps(**contexts)
366 webapp = wsgi.start_service(app_mgr)
368 services.append(hub.spawn(webapp))
370 hub.joinall(services)
375 hub.joinall(services)
380 if not AppManager._instance:
381 AppManager._instance = AppManager()
382 return AppManager._instance
385 self.applications_cls = {}
386 self.applications = {}
387 self.contexts_cls = {}
389 self.close_sem = hub.Semaphore()
391 def load_app(self, name):
392 mod = utils.import_module(name)
393 clses = inspect.getmembers(mod,
394 lambda cls: (inspect.isclass(cls) and
395 issubclass(cls, RyuApp) and
402 def load_apps(self, app_lists):
403 app_lists = [app for app
404 in itertools.chain.from_iterable(app.split(',')
405 for app in app_lists)]
406 while len(app_lists) > 0:
407 app_cls_name = app_lists.pop(0)
409 context_modules = [x.__module__ for x in self.contexts_cls.values()]
410 if app_cls_name in context_modules:
413 LOG.info('loading app %s', app_cls_name)
415 cls = self.load_app(app_cls_name)
419 self.applications_cls[app_cls_name] = cls
422 for key, context_cls in cls.context_iteritems():
423 v = self.contexts_cls.setdefault(key, context_cls)
424 assert v == context_cls
425 context_modules.append(context_cls.__module__)
427 if issubclass(context_cls, RyuApp):
428 services.extend(get_dependent_services(context_cls))
430 # we can't load an app that will be initiataed for
432 for i in get_dependent_services(cls):
433 if i not in context_modules:
436 app_lists.extend([s for s in set(services)
437 if s not in app_lists])
439 def create_contexts(self):
440 for key, cls in self.contexts_cls.items():
441 if issubclass(cls, RyuApp):
443 context = self._instantiate(None, cls)
446 LOG.info('creating context %s', key)
447 assert key not in self.contexts
448 self.contexts[key] = context
451 def _update_bricks(self):
452 for i in SERVICE_BRICKS.values():
453 for _k, m in inspect.getmembers(i, inspect.ismethod):
454 if not hasattr(m, 'callers'):
456 for ev_cls, c in m.callers.items():
460 brick = _lookup_service_brick_by_mod_name(c.ev_source)
462 brick.register_observer(ev_cls, i.name,
465 # allow RyuApp and Event class are in different module
466 for brick in SERVICE_BRICKS.values():
467 if ev_cls in brick._EVENTS:
468 brick.register_observer(ev_cls, i.name,
472 def _report_brick(name, app):
473 LOG.debug("BRICK %s", name)
474 for ev_cls, list_ in app.observers.items():
475 LOG.debug(" PROVIDES %s TO %s", ev_cls.__name__, list_)
476 for ev_cls in app.event_handlers.keys():
477 LOG.debug(" CONSUMES %s", ev_cls.__name__)
481 for brick, i in SERVICE_BRICKS.items():
482 AppManager._report_brick(brick, i)
484 def _instantiate(self, app_name, cls, *args, **kwargs):
485 # for now, only single instance of a given module
486 # Do we need to support multiple instances?
487 # Yes, maybe for slicing.
488 LOG.info('instantiating app %s of %s', app_name, cls.__name__)
490 if hasattr(cls, 'OFP_VERSIONS') and cls.OFP_VERSIONS is not None:
491 ofproto_protocol.set_app_supported_versions(cls.OFP_VERSIONS)
493 if app_name is not None:
494 assert app_name not in self.applications
495 app = cls(*args, **kwargs)
497 assert app.name not in self.applications
498 self.applications[app.name] = app
501 def instantiate(self, cls, *args, **kwargs):
502 app = self._instantiate(None, cls, *args, **kwargs)
503 self._update_bricks()
504 self._report_brick(app.name, app)
507 def instantiate_apps(self, *args, **kwargs):
508 for app_name, cls in self.applications_cls.items():
509 self._instantiate(app_name, cls, *args, **kwargs)
511 self._update_bricks()
515 for app in self.applications.values():
518 app.set_main_thread(t)
524 close_method = getattr(app, 'close', None)
525 if callable(close_method):
528 def uninstantiate(self, name):
529 app = self.applications.pop(name)
531 for app_ in SERVICE_BRICKS.values():
532 app_.unregister_observer_all_event(name)
536 if not events.empty():
537 app.logger.debug('%s events remains %d', app.name, events.qsize())
540 def close_all(close_dict):
541 for app in close_dict.values():
545 # This semaphore prevents parallel execution of this function,
546 # as run_apps's finally clause starts another close() call.
548 for app_name in list(self.applications.keys()):
549 self.uninstantiate(app_name)
550 assert not self.applications
551 close_all(self.contexts)