backing up
[vsorcdistro/.git] / ryu / build / lib.linux-armv7l-2.7 / ryu / base / app_manager.py
1 # Copyright (C) 2011-2014 Nippon Telegraph and Telephone Corporation.
2 # Copyright (C) 2011 Isaku Yamahata <yamahata at valinux co jp>
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #    http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16
17 """
18 The central management of Ryu applications.
19
20 - Load Ryu applications
21 - Provide `contexts` to Ryu applications
22 - Route messages among Ryu applications
23
24 """
25
26 import inspect
27 import itertools
28 import logging
29 import sys
30 import os
31 import gc
32
33 from ryu import cfg
34 from ryu import utils
35 from ryu.app import wsgi
36 from ryu.controller.handler import register_instance, get_dependent_services
37 from ryu.controller.controller import Datapath
38 from ryu.controller import event
39 from ryu.controller.event import EventRequestBase, EventReplyBase
40 from ryu.lib import hub
41 from ryu.ofproto import ofproto_protocol
42
43 LOG = logging.getLogger('ryu.base.app_manager')
44
45 SERVICE_BRICKS = {}
46
47
48 def lookup_service_brick(name):
49     return SERVICE_BRICKS.get(name)
50
51
52 def _lookup_service_brick_by_ev_cls(ev_cls):
53     return _lookup_service_brick_by_mod_name(ev_cls.__module__)
54
55
56 def _lookup_service_brick_by_mod_name(mod_name):
57     return lookup_service_brick(mod_name.split('.')[-1])
58
59
60 def register_app(app):
61     assert isinstance(app, RyuApp)
62     assert app.name not in SERVICE_BRICKS
63     SERVICE_BRICKS[app.name] = app
64     register_instance(app)
65
66
67 def unregister_app(app):
68     SERVICE_BRICKS.pop(app.name)
69
70
71 def require_app(app_name, api_style=False):
72     """
73     Request the application to be automatically loaded.
74
75     If this is used for "api" style modules, which is imported by a client
76     application, set api_style=True.
77
78     If this is used for client application module, set api_style=False.
79     """
80     iterable = (inspect.getmodule(frame[0]) for frame in inspect.stack())
81     modules = [module for module in iterable if module is not None]
82     if api_style:
83         m = modules[2]  # skip a frame for "api" module
84     else:
85         m = modules[1]
86     m._REQUIRED_APP = getattr(m, '_REQUIRED_APP', [])
87     m._REQUIRED_APP.append(app_name)
88     LOG.debug('require_app: %s is required by %s', app_name, m.__name__)
89
90
91 class RyuApp(object):
92     """
93     The base class for Ryu applications.
94
95     RyuApp subclasses are instantiated after ryu-manager loaded
96     all requested Ryu application modules.
97     __init__ should call RyuApp.__init__ with the same arguments.
98     It's illegal to send any events in __init__.
99
100     The instance attribute 'name' is the name of the class used for
101     message routing among Ryu applications.  (Cf. send_event)
102     It's set to __class__.__name__ by RyuApp.__init__.
103     It's discouraged for subclasses to override this.
104     """
105
106     _CONTEXTS = {}
107     """
108     A dictionary to specify contexts which this Ryu application wants to use.
109     Its key is a name of context and its value is an ordinary class
110     which implements the context.  The class is instantiated by app_manager
111     and the instance is shared among RyuApp subclasses which has _CONTEXTS
112     member with the same key.  A RyuApp subclass can obtain a reference to
113     the instance via its __init__'s kwargs as the following.
114
115     Example::
116
117         _CONTEXTS = {
118             'network': network.Network
119         }
120
121         def __init__(self, *args, *kwargs):
122             self.network = kwargs['network']
123     """
124
125     _EVENTS = []
126     """
127     A list of event classes which this RyuApp subclass would generate.
128     This should be specified if and only if event classes are defined in
129     a different python module from the RyuApp subclass is.
130     """
131
132     OFP_VERSIONS = None
133     """
134     A list of supported OpenFlow versions for this RyuApp.
135     The default is all versions supported by the framework.
136
137     Examples::
138
139         OFP_VERSIONS = [ofproto_v1_0.OFP_VERSION,
140                         ofproto_v1_2.OFP_VERSION]
141
142     If multiple Ryu applications are loaded in the system,
143     the intersection of their OFP_VERSIONS is used.
144     """
145
146     @classmethod
147     def context_iteritems(cls):
148         """
149         Return iterator over the (key, contxt class) of application context
150         """
151         return iter(cls._CONTEXTS.items())
152
153     def __init__(self, *_args, **_kwargs):
154         super(RyuApp, self).__init__()
155         self.name = self.__class__.__name__
156         self.event_handlers = {}        # ev_cls -> handlers:list
157         self.observers = {}     # ev_cls -> observer-name -> states:set
158         self.threads = []
159         self.main_thread = None
160         self.events = hub.Queue(128)
161         self._events_sem = hub.BoundedSemaphore(self.events.maxsize)
162         if hasattr(self.__class__, 'LOGGER_NAME'):
163             self.logger = logging.getLogger(self.__class__.LOGGER_NAME)
164         else:
165             self.logger = logging.getLogger(self.name)
166         self.CONF = cfg.CONF
167
168         # prevent accidental creation of instances of this class outside RyuApp
169         class _EventThreadStop(event.EventBase):
170             pass
171         self._event_stop = _EventThreadStop()
172         self.is_active = True
173
174     def start(self):
175         """
176         Hook that is called after startup initialization is done.
177         """
178         self.threads.append(hub.spawn(self._event_loop))
179
180     def stop(self):
181         if self.main_thread:
182             hub.kill(self.main_thread)
183         self.is_active = False
184         self._send_event(self._event_stop, None)
185         hub.joinall(self.threads)
186
187     def set_main_thread(self, thread):
188         """
189         Set self.main_thread so that stop() can terminate it.
190
191         Only AppManager.instantiate_apps should call this function.
192         """
193         self.main_thread = thread
194
195     def register_handler(self, ev_cls, handler):
196         assert callable(handler)
197         self.event_handlers.setdefault(ev_cls, [])
198         self.event_handlers[ev_cls].append(handler)
199
200     def unregister_handler(self, ev_cls, handler):
201         assert callable(handler)
202         self.event_handlers[ev_cls].remove(handler)
203         if not self.event_handlers[ev_cls]:
204             del self.event_handlers[ev_cls]
205
206     def register_observer(self, ev_cls, name, states=None):
207         states = states or set()
208         ev_cls_observers = self.observers.setdefault(ev_cls, {})
209         ev_cls_observers.setdefault(name, set()).update(states)
210
211     def unregister_observer(self, ev_cls, name):
212         observers = self.observers.get(ev_cls, {})
213         observers.pop(name)
214
215     def unregister_observer_all_event(self, name):
216         for observers in self.observers.values():
217             observers.pop(name, None)
218
219     def observe_event(self, ev_cls, states=None):
220         brick = _lookup_service_brick_by_ev_cls(ev_cls)
221         if brick is not None:
222             brick.register_observer(ev_cls, self.name, states)
223
224     def unobserve_event(self, ev_cls):
225         brick = _lookup_service_brick_by_ev_cls(ev_cls)
226         if brick is not None:
227             brick.unregister_observer(ev_cls, self.name)
228
229     def get_handlers(self, ev, state=None):
230         """Returns a list of handlers for the specific event.
231
232         :param ev: The event to handle.
233         :param state: The current state. ("dispatcher")
234                       If None is given, returns all handlers for the event.
235                       Otherwise, returns only handlers that are interested
236                       in the specified state.
237                       The default is None.
238         """
239         ev_cls = ev.__class__
240         handlers = self.event_handlers.get(ev_cls, [])
241         if state is None:
242             return handlers
243
244         def test(h):
245             if not hasattr(h, 'callers') or ev_cls not in h.callers:
246                 # dynamically registered handlers does not have
247                 # h.callers element for the event.
248                 return True
249             states = h.callers[ev_cls].dispatchers
250             if not states:
251                 # empty states means all states
252                 return True
253             return state in states
254
255         return filter(test, handlers)
256
257     def get_observers(self, ev, state):
258         observers = []
259         for k, v in self.observers.get(ev.__class__, {}).items():
260             if not state or not v or state in v:
261                 observers.append(k)
262
263         return observers
264
265     def send_request(self, req):
266         """
267         Make a synchronous request.
268         Set req.sync to True, send it to a Ryu application specified by
269         req.dst, and block until receiving a reply.
270         Returns the received reply.
271         The argument should be an instance of EventRequestBase.
272         """
273
274         assert isinstance(req, EventRequestBase)
275         req.sync = True
276         req.reply_q = hub.Queue()
277         self.send_event(req.dst, req)
278         # going to sleep for the reply
279         return req.reply_q.get()
280
281     def _event_loop(self):
282         while self.is_active or not self.events.empty():
283             ev, state = self.events.get()
284             self._events_sem.release()
285             if ev == self._event_stop:
286                 continue
287             handlers = self.get_handlers(ev, state)
288             for handler in handlers:
289                 try:
290                     handler(ev)
291                 except hub.TaskExit:
292                     # Normal exit.
293                     # Propagate upwards, so we leave the event loop.
294                     raise
295                 except:
296                     LOG.exception('%s: Exception occurred during handler processing. '
297                                   'Backtrace from offending handler '
298                                   '[%s] servicing event [%s] follows.',
299                                   self.name, handler.__name__, ev.__class__.__name__)
300
301     def _send_event(self, ev, state):
302         self._events_sem.acquire()
303         self.events.put((ev, state))
304
305     def send_event(self, name, ev, state=None):
306         """
307         Send the specified event to the RyuApp instance specified by name.
308         """
309
310         if name in SERVICE_BRICKS:
311             if isinstance(ev, EventRequestBase):
312                 ev.src = self.name
313             LOG.debug("EVENT %s->%s %s",
314                       self.name, name, ev.__class__.__name__)
315             SERVICE_BRICKS[name]._send_event(ev, state)
316         else:
317             LOG.debug("EVENT LOST %s->%s %s",
318                       self.name, name, ev.__class__.__name__)
319
320     def send_event_to_observers(self, ev, state=None):
321         """
322         Send the specified event to all observers of this RyuApp.
323         """
324
325         for observer in self.get_observers(ev, state):
326             self.send_event(observer, ev, state)
327
328     def reply_to_request(self, req, rep):
329         """
330         Send a reply for a synchronous request sent by send_request.
331         The first argument should be an instance of EventRequestBase.
332         The second argument should be an instance of EventReplyBase.
333         """
334
335         assert isinstance(req, EventRequestBase)
336         assert isinstance(rep, EventReplyBase)
337         rep.dst = req.src
338         if req.sync:
339             req.reply_q.put(rep)
340         else:
341             self.send_event(rep.dst, rep)
342
343     def close(self):
344         """
345         teardown method.
346         The method name, close, is chosen for python context manager
347         """
348         pass
349
350
351 class AppManager(object):
352     # singleton
353     _instance = None
354
355     @staticmethod
356     def run_apps(app_lists):
357         """Run a set of Ryu applications
358
359         A convenient method to load and instantiate apps.
360         This blocks until all relevant apps stop.
361         """
362         app_mgr = AppManager.get_instance()
363         app_mgr.load_apps(app_lists)
364         contexts = app_mgr.create_contexts()
365         services = app_mgr.instantiate_apps(**contexts)
366         webapp = wsgi.start_service(app_mgr)
367         if webapp:
368             services.append(hub.spawn(webapp))
369         try:
370             hub.joinall(services)
371         finally:
372             app_mgr.close()
373             for t in services:
374                 t.kill()
375             hub.joinall(services)
376             gc.collect()
377
378     @staticmethod
379     def get_instance():
380         if not AppManager._instance:
381             AppManager._instance = AppManager()
382         return AppManager._instance
383
384     def __init__(self):
385         self.applications_cls = {}
386         self.applications = {}
387         self.contexts_cls = {}
388         self.contexts = {}
389         self.close_sem = hub.Semaphore()
390
391     def load_app(self, name):
392         mod = utils.import_module(name)
393         clses = inspect.getmembers(mod,
394                                    lambda cls: (inspect.isclass(cls) and
395                                                 issubclass(cls, RyuApp) and
396                                                 mod.__name__ ==
397                                                 cls.__module__))
398         if clses:
399             return clses[0][1]
400         return None
401
402     def load_apps(self, app_lists):
403         app_lists = [app for app
404                      in itertools.chain.from_iterable(app.split(',')
405                                                       for app in app_lists)]
406         while len(app_lists) > 0:
407             app_cls_name = app_lists.pop(0)
408
409             context_modules = [x.__module__ for x in self.contexts_cls.values()]
410             if app_cls_name in context_modules:
411                 continue
412
413             LOG.info('loading app %s', app_cls_name)
414
415             cls = self.load_app(app_cls_name)
416             if cls is None:
417                 continue
418
419             self.applications_cls[app_cls_name] = cls
420
421             services = []
422             for key, context_cls in cls.context_iteritems():
423                 v = self.contexts_cls.setdefault(key, context_cls)
424                 assert v == context_cls
425                 context_modules.append(context_cls.__module__)
426
427                 if issubclass(context_cls, RyuApp):
428                     services.extend(get_dependent_services(context_cls))
429
430             # we can't load an app that will be initiataed for
431             # contexts.
432             for i in get_dependent_services(cls):
433                 if i not in context_modules:
434                     services.append(i)
435             if services:
436                 app_lists.extend([s for s in set(services)
437                                   if s not in app_lists])
438
439     def create_contexts(self):
440         for key, cls in self.contexts_cls.items():
441             if issubclass(cls, RyuApp):
442                 # hack for dpset
443                 context = self._instantiate(None, cls)
444             else:
445                 context = cls()
446             LOG.info('creating context %s', key)
447             assert key not in self.contexts
448             self.contexts[key] = context
449         return self.contexts
450
451     def _update_bricks(self):
452         for i in SERVICE_BRICKS.values():
453             for _k, m in inspect.getmembers(i, inspect.ismethod):
454                 if not hasattr(m, 'callers'):
455                     continue
456                 for ev_cls, c in m.callers.items():
457                     if not c.ev_source:
458                         continue
459
460                     brick = _lookup_service_brick_by_mod_name(c.ev_source)
461                     if brick:
462                         brick.register_observer(ev_cls, i.name,
463                                                 c.dispatchers)
464
465                     # allow RyuApp and Event class are in different module
466                     for brick in SERVICE_BRICKS.values():
467                         if ev_cls in brick._EVENTS:
468                             brick.register_observer(ev_cls, i.name,
469                                                     c.dispatchers)
470
471     @staticmethod
472     def _report_brick(name, app):
473         LOG.debug("BRICK %s", name)
474         for ev_cls, list_ in app.observers.items():
475             LOG.debug("  PROVIDES %s TO %s", ev_cls.__name__, list_)
476         for ev_cls in app.event_handlers.keys():
477             LOG.debug("  CONSUMES %s", ev_cls.__name__)
478
479     @staticmethod
480     def report_bricks():
481         for brick, i in SERVICE_BRICKS.items():
482             AppManager._report_brick(brick, i)
483
484     def _instantiate(self, app_name, cls, *args, **kwargs):
485         # for now, only single instance of a given module
486         # Do we need to support multiple instances?
487         # Yes, maybe for slicing.
488         LOG.info('instantiating app %s of %s', app_name, cls.__name__)
489
490         if hasattr(cls, 'OFP_VERSIONS') and cls.OFP_VERSIONS is not None:
491             ofproto_protocol.set_app_supported_versions(cls.OFP_VERSIONS)
492
493         if app_name is not None:
494             assert app_name not in self.applications
495         app = cls(*args, **kwargs)
496         register_app(app)
497         assert app.name not in self.applications
498         self.applications[app.name] = app
499         return app
500
501     def instantiate(self, cls, *args, **kwargs):
502         app = self._instantiate(None, cls, *args, **kwargs)
503         self._update_bricks()
504         self._report_brick(app.name, app)
505         return app
506
507     def instantiate_apps(self, *args, **kwargs):
508         for app_name, cls in self.applications_cls.items():
509             self._instantiate(app_name, cls, *args, **kwargs)
510
511         self._update_bricks()
512         self.report_bricks()
513
514         threads = []
515         for app in self.applications.values():
516             t = app.start()
517             if t is not None:
518                 app.set_main_thread(t)
519                 threads.append(t)
520         return threads
521
522     @staticmethod
523     def _close(app):
524         close_method = getattr(app, 'close', None)
525         if callable(close_method):
526             close_method()
527
528     def uninstantiate(self, name):
529         app = self.applications.pop(name)
530         unregister_app(app)
531         for app_ in SERVICE_BRICKS.values():
532             app_.unregister_observer_all_event(name)
533         app.stop()
534         self._close(app)
535         events = app.events
536         if not events.empty():
537             app.logger.debug('%s events remains %d', app.name, events.qsize())
538
539     def close(self):
540         def close_all(close_dict):
541             for app in close_dict.values():
542                 self._close(app)
543             close_dict.clear()
544
545         # This semaphore prevents parallel execution of this function,
546         # as run_apps's finally clause starts another close() call.
547         with self.close_sem:
548             for app_name in list(self.applications.keys()):
549                 self.uninstantiate(app_name)
550             assert not self.applications
551             close_all(self.contexts)