backing up
[vsorcdistro/.git] / ryu / build / lib.linux-armv7l-2.7 / ryu / services / protocols / bgp / base.py
1 # Copyright (C) 2014 Nippon Telegraph and Telephone Corporation.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #    http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 """
16   Defines some base class related to managing green threads.
17 """
18 from __future__ import absolute_import
19
20 import abc
21 from collections import OrderedDict
22 import logging
23 import socket
24 import time
25 import traceback
26 import weakref
27
28 import netaddr
29 import six
30
31 from ryu.lib import hub
32 from ryu.lib import sockopt
33 from ryu.lib import ip
34 from ryu.lib.hub import Timeout
35 from ryu.lib.packet.bgp import RF_IPv4_UC
36 from ryu.lib.packet.bgp import RF_IPv6_UC
37 from ryu.lib.packet.bgp import RF_IPv4_VPN
38 from ryu.lib.packet.bgp import RF_IPv6_VPN
39 from ryu.lib.packet.bgp import RF_L2_EVPN
40 from ryu.lib.packet.bgp import RF_IPv4_FLOWSPEC
41 from ryu.lib.packet.bgp import RF_IPv6_FLOWSPEC
42 from ryu.lib.packet.bgp import RF_VPNv4_FLOWSPEC
43 from ryu.lib.packet.bgp import RF_VPNv6_FLOWSPEC
44 from ryu.lib.packet.bgp import RF_L2VPN_FLOWSPEC
45 from ryu.lib.packet.bgp import RF_RTC_UC
46 from ryu.services.protocols.bgp.utils.circlist import CircularListType
47 from ryu.services.protocols.bgp.utils.evtlet import LoopingCall
48
49
50 # Logger instance for this module.
51 LOG = logging.getLogger('bgpspeaker.base')
52
53 # Pointer to active/available OrderedDict.
54 OrderedDict = OrderedDict
55
56
57 # Currently supported address families.
58 SUPPORTED_GLOBAL_RF = {
59     RF_IPv4_UC,
60     RF_IPv6_UC,
61     RF_IPv4_VPN,
62     RF_RTC_UC,
63     RF_IPv6_VPN,
64     RF_L2_EVPN,
65     RF_IPv4_FLOWSPEC,
66     RF_IPv6_FLOWSPEC,
67     RF_VPNv4_FLOWSPEC,
68     RF_VPNv6_FLOWSPEC,
69     RF_L2VPN_FLOWSPEC,
70 }
71
72
73 # Various error codes
74 ACTIVITY_ERROR_CODE = 100
75 RUNTIME_CONF_ERROR_CODE = 200
76 BIN_ERROR = 300
77 NET_CTRL_ERROR_CODE = 400
78 API_ERROR_CODE = 500
79 PREFIX_ERROR_CODE = 600
80 BGP_PROCESSOR_ERROR_CODE = 700
81 CORE_ERROR_CODE = 800
82
83 # Registry of custom exceptions
84 # Key: code:sub-code
85 # Value: exception class
86 _EXCEPTION_REGISTRY = {}
87
88
89 class BGPSException(Exception):
90     """Base exception class for all BGPS related exceptions.
91     """
92
93     CODE = 1
94     SUB_CODE = 1
95     DEF_DESC = 'Unknown exception.'
96
97     def __init__(self, desc=None):
98         super(BGPSException, self).__init__()
99         if not desc:
100             desc = self.__class__.DEF_DESC
101         kls = self.__class__
102         self.message = '%d.%d - %s' % (kls.CODE, kls.SUB_CODE, desc)
103
104     def __repr__(self):
105         kls = self.__class__
106         return '<%s(desc=%s)>' % (kls, self.message)
107
108     def __str__(self, *args, **kwargs):
109         return self.message
110
111
112 def add_bgp_error_metadata(code, sub_code, def_desc='unknown'):
113     """Decorator for all exceptions that want to set exception class meta-data.
114     """
115     # Check registry if we already have an exception with same code/sub-code
116     if _EXCEPTION_REGISTRY.get((code, sub_code)) is not None:
117         raise ValueError('BGPSException with code %d and sub-code %d '
118                          'already defined.' % (code, sub_code))
119
120     def decorator(subclass):
121         """Sets class constants for exception code and sub-code.
122
123         If given class is sub-class of BGPSException we sets class constants.
124         """
125         if issubclass(subclass, BGPSException):
126             _EXCEPTION_REGISTRY[(code, sub_code)] = subclass
127             subclass.CODE = code
128             subclass.SUB_CODE = sub_code
129             subclass.DEF_DESC = def_desc
130         return subclass
131     return decorator
132
133
134 @add_bgp_error_metadata(code=ACTIVITY_ERROR_CODE,
135                         sub_code=1,
136                         def_desc='Unknown activity exception.')
137 class ActivityException(BGPSException):
138     """Base class for exceptions related to Activity.
139     """
140     pass
141
142
143 @six.add_metaclass(abc.ABCMeta)
144 class Activity(object):
145     """Base class for a thread of execution that provides some custom settings.
146
147     Activity is also a container of other activities or threads that it has
148     started. Inside a Activity you should always use one of the spawn method
149     to start another activity or greenthread. Activity is also holds pointers
150     to sockets that it or its child activities of threads have create.
151     """
152
153     def __init__(self, name=None):
154         self._name = name
155         if self._name is None:
156             self._name = 'UnknownActivity: ' + str(time.time())
157         self._child_thread_map = weakref.WeakValueDictionary()
158         self._child_activity_map = weakref.WeakValueDictionary()
159         self._asso_socket_map = weakref.WeakValueDictionary()
160         self._timers = weakref.WeakValueDictionary()
161         self._started = False
162
163     @property
164     def name(self):
165         return self._name
166
167     @property
168     def started(self):
169         return self._started
170
171     def _validate_activity(self, activity):
172         """Checks the validity of the given activity before it can be started.
173         """
174         if not self._started:
175             raise ActivityException(desc='Tried to spawn a child activity'
176                                     ' before Activity was started.')
177
178         if activity.started:
179             raise ActivityException(desc='Tried to start an Activity that was '
180                                     'already started.')
181
182     def _spawn_activity(self, activity, *args, **kwargs):
183         """Starts *activity* in a new thread and passes *args* and *kwargs*.
184
185         Maintains pointer to this activity and stops *activity* when this
186         activity is stopped.
187         """
188         self._validate_activity(activity)
189
190         # Spawn a new greenthread for given activity
191         greenthread = hub.spawn(activity.start, *args, **kwargs)
192         self._child_thread_map[activity.name] = greenthread
193         self._child_activity_map[activity.name] = activity
194         return greenthread
195
196     def _spawn_activity_after(self, seconds, activity, *args, **kwargs):
197         self._validate_activity(activity)
198
199         # Schedule to spawn a new greenthread after requested delay
200         greenthread = hub.spawn_after(seconds, activity.start, *args,
201                                       **kwargs)
202         self._child_thread_map[activity.name] = greenthread
203         self._child_activity_map[activity.name] = activity
204         return greenthread
205
206     def _validate_callable(self, callable_):
207         if callable_ is None:
208             raise ActivityException(desc='Callable cannot be None')
209
210         if not hasattr(callable_, '__call__'):
211             raise ActivityException(desc='Currently only supports instances'
212                                     ' that have __call__ as callable which'
213                                     ' is missing in given arg.')
214         if not self._started:
215             raise ActivityException(desc='Tried to spawn a child thread '
216                                     'before this Activity was started.')
217
218     def _spawn(self, name, callable_, *args, **kwargs):
219         self._validate_callable(callable_)
220         greenthread = hub.spawn(callable_, *args, **kwargs)
221         self._child_thread_map[name] = greenthread
222         return greenthread
223
224     def _spawn_after(self, name, seconds, callable_, *args, **kwargs):
225         self._validate_callable(callable_)
226         greenthread = hub.spawn_after(seconds, callable_, *args, **kwargs)
227         self._child_thread_map[name] = greenthread
228         return greenthread
229
230     def _create_timer(self, name, func, *arg, **kwarg):
231         timer = LoopingCall(func, *arg, **kwarg)
232         self._timers[name] = timer
233         return timer
234
235     @abc.abstractmethod
236     def _run(self, *args, **kwargs):
237         """Main activity of this class.
238
239         Can launch other activity/callables here.
240         Sub-classes should override this method.
241         """
242         raise NotImplementedError()
243
244     def start(self, *args, **kwargs):
245         """Starts the main activity of this class.
246
247         Calls *_run* and calls *stop* when *_run* is finished.
248         This method should be run in a new greenthread as it may not return
249         immediately.
250         """
251         if self.started:
252             raise ActivityException(desc='Activity already started')
253
254         self._started = True
255         try:
256             self._run(*args, **kwargs)
257         except BGPSException:
258             LOG.error(traceback.format_exc())
259         finally:
260             if self.started:  # could have been stopped somewhere else
261                 self.stop()
262
263     def pause(self, seconds=0):
264         """Relinquishes hub for given number of seconds.
265
266         In other words is puts to sleep to give other greenthread a chance to
267         run.
268         """
269         hub.sleep(seconds)
270
271     def _stop_child_activities(self, name=None):
272         """Stop all child activities spawn by this activity.
273         """
274         # Makes a list copy of items() to avoid dictionary size changed
275         # during iteration
276         for child_name, child in list(self._child_activity_map.items()):
277             if name is not None and name != child_name:
278                 continue
279             LOG.debug('%s: Stopping child activity %s ', self.name, child_name)
280             if child.started:
281                 child.stop()
282             self._child_activity_map.pop(child_name, None)
283
284     def _stop_child_threads(self, name=None):
285         """Stops all threads spawn by this activity.
286         """
287         for thread_name, thread in list(self._child_thread_map.items()):
288             if name is not None and thread_name is name:
289                 LOG.debug('%s: Stopping child thread %s',
290                           self.name, thread_name)
291                 thread.kill()
292                 self._child_thread_map.pop(thread_name, None)
293
294     def _close_asso_sockets(self):
295         """Closes all the sockets linked to this activity.
296         """
297         for sock_name, sock in list(self._asso_socket_map.items()):
298             LOG.debug('%s: Closing socket %s - %s', self.name, sock_name, sock)
299             sock.close()
300
301     def _stop_timers(self):
302         for timer_name, timer in list(self._timers.items()):
303             LOG.debug('%s: Stopping timer %s', self.name, timer_name)
304             timer.stop()
305
306     def stop(self):
307         """Stops all child threads and activities and closes associated
308         sockets.
309
310         Re-initializes this activity to be able to start again.
311         Raise `ActivityException` if activity is not currently started.
312         """
313         if not self.started:
314             raise ActivityException(desc='Cannot call stop when activity is '
315                                     'not started or has been stopped already.')
316
317         LOG.debug('Stopping activity %s.', self.name)
318         self._stop_timers()
319         self._stop_child_activities()
320         self._stop_child_threads()
321         self._close_asso_sockets()
322
323         # Setup activity for start again.
324         self._started = False
325         self._asso_socket_map = weakref.WeakValueDictionary()
326         self._child_activity_map = weakref.WeakValueDictionary()
327         self._child_thread_map = weakref.WeakValueDictionary()
328         self._timers = weakref.WeakValueDictionary()
329         LOG.debug('Stopping activity %s finished.', self.name)
330
331     def _canonicalize_ip(self, ip):
332         addr = netaddr.IPAddress(ip)
333         if addr.is_ipv4_mapped():
334             ip = str(addr.ipv4())
335         return ip
336
337     def get_remotename(self, sock):
338         addr, port = sock.getpeername()[:2]
339         return self._canonicalize_ip(addr), str(port)
340
341     def get_localname(self, sock):
342         addr, port = sock.getsockname()[:2]
343         return self._canonicalize_ip(addr), str(port)
344
345     def _create_listen_socket(self, family, loc_addr):
346         s = socket.socket(family)
347         s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
348         s.bind(loc_addr)
349         s.listen(1)
350         return s
351
352     def _listen_socket_loop(self, s, conn_handle):
353         while True:
354             sock, client_address = s.accept()
355             client_address, port = self.get_remotename(sock)
356             LOG.debug('Connect request received from client for port'
357                       ' %s:%s', client_address, port)
358             client_name = self.name + '_client@' + client_address
359             self._asso_socket_map[client_name] = sock
360             self._spawn(client_name, conn_handle, sock)
361
362     def _listen_tcp(self, loc_addr, conn_handle):
363         """Creates a TCP server socket which listens on `port` number.
364
365         For each connection `server_factory` starts a new protocol.
366         """
367         info = socket.getaddrinfo(loc_addr[0], loc_addr[1], socket.AF_UNSPEC,
368                                   socket.SOCK_STREAM, 0, socket.AI_PASSIVE)
369         listen_sockets = {}
370         for res in info:
371             af, socktype, proto, _, sa = res
372             sock = None
373             try:
374                 sock = socket.socket(af, socktype, proto)
375                 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
376                 if af == socket.AF_INET6:
377                     sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
378
379                 sock.bind(sa)
380                 sock.listen(50)
381                 listen_sockets[sa] = sock
382             except socket.error as e:
383                 LOG.error('Error creating socket: %s', e)
384
385                 if sock:
386                     sock.close()
387
388         count = 0
389         server = None
390         for sa in listen_sockets:
391             name = self.name + '_server@' + str(sa[0])
392             self._asso_socket_map[name] = listen_sockets[sa]
393             if count == 0:
394                 import eventlet
395                 server = eventlet.spawn(self._listen_socket_loop,
396                                         listen_sockets[sa], conn_handle)
397
398                 self._child_thread_map[name] = server
399                 count += 1
400             else:
401                 server = self._spawn(name, self._listen_socket_loop,
402                                      listen_sockets[sa], conn_handle)
403         return server, listen_sockets
404
405     def _connect_tcp(self, peer_addr, conn_handler, time_out=None,
406                      bind_address=None, password=None):
407         """Creates a TCP connection to given peer address.
408
409         Tries to create a socket for `timeout` number of seconds. If
410         successful, uses the socket instance to start `client_factory`.
411         The socket is bound to `bind_address` if specified.
412         """
413         LOG.debug('Connect TCP called for %s:%s', peer_addr[0], peer_addr[1])
414         if ip.valid_ipv4(peer_addr[0]):
415             family = socket.AF_INET
416         else:
417             family = socket.AF_INET6
418         with Timeout(time_out, socket.error):
419             sock = socket.socket(family)
420             if bind_address:
421                 sock.bind(bind_address)
422             if password:
423                 sockopt.set_tcp_md5sig(sock, peer_addr[0], password)
424             sock.connect(peer_addr)
425             # socket.error exception is raised in case of timeout and
426             # the following code is executed only when the connection
427             # is established.
428
429         # Connection name for pro-active connection is made up of
430         # local end address + remote end address
431         local = self.get_localname(sock)[0]
432         remote = self.get_remotename(sock)[0]
433         conn_name = ('L: ' + local + ', R: ' + remote)
434         self._asso_socket_map[conn_name] = sock
435         # If connection is established, we call connection handler
436         # in a new thread.
437         self._spawn(conn_name, conn_handler, sock)
438         return sock
439
440
441 #
442 # Sink
443 #
444 class Sink(object):
445     """An entity to which we send out messages (eg. BGP routes)."""
446
447     #
448     # OutgoingMsgList
449     #
450     # A circular list type in which objects are linked to each
451     # other using the 'next_sink_out_route' and 'prev_sink_out_route'
452     # attributes.
453     #
454     OutgoingMsgList = CircularListType(next_attr_name='next_sink_out_route',
455                                        prev_attr_name='prev_sink_out_route')
456
457     # Next available index that can identify an instance uniquely.
458     idx = 0
459
460     @staticmethod
461     def next_index():
462         """Increments the sink index and returns the value."""
463         Sink.idx += 1
464         return Sink.idx
465
466     def __init__(self):
467         # A small integer that represents this sink.
468         self.index = Sink.next_index()
469
470         # Create an event for signal enqueuing.
471         from .utils.evtlet import EventletIOFactory
472         self.outgoing_msg_event = EventletIOFactory.create_custom_event()
473
474         self.messages_queued = 0
475         # List of msgs. that are to be sent to this peer. Each item
476         # in the list is an instance of OutgoingRoute.
477         self.outgoing_msg_list = Sink.OutgoingMsgList()
478
479     def clear_outgoing_msg_list(self):
480         self.outgoing_msg_list = Sink.OutgoingMsgList()
481
482     def enque_outgoing_msg(self, msg):
483         self.outgoing_msg_list.append(msg)
484         self.outgoing_msg_event.set()
485
486         self.messages_queued += 1
487
488     def enque_first_outgoing_msg(self, msg):
489         self.outgoing_msg_list.prepend(msg)
490         self.outgoing_msg_event.set()
491
492     def __iter__(self):
493         return self
494
495     def next(self):
496         """Pops and returns the first outgoing message from the list.
497
498         If message list currently has no messages, the calling thread will
499         be put to sleep until we have at-least one message in the list that
500         can be popped and returned.
501         """
502         # We pick the first outgoing available and send it.
503         outgoing_msg = self.outgoing_msg_list.pop_first()
504         # If we do not have any outgoing msg., we wait.
505         if outgoing_msg is None:
506             self.outgoing_msg_event.clear()
507             self.outgoing_msg_event.wait()
508             outgoing_msg = self.outgoing_msg_list.pop_first()
509
510         return outgoing_msg
511
512     # For Python 3 compatibility
513     __next__ = next
514
515
516 #
517 # Source
518 #
519 class Source(object):
520     """An entity that gives us BGP routes. A BGP peer, for example."""
521
522     def __init__(self, version_num):
523         # Number that is currently being used to stamp information
524         # received from this source. We will bump this number up when
525         # the information that is now expected from the source belongs
526         # to a different logical batch. This mechanism can be used to
527         # identify stale information.
528         self.version_num = version_num
529
530
531 class FlexinetPeer(Source, Sink):
532     def __init__(self):
533         # Initialize source and sink
534         Source.__init__(self, 1)
535         Sink.__init__(self)
536
537
538 # Registry of validators for configuration/settings.
539 _VALIDATORS = {}
540
541
542 def validate(**kwargs):
543     """Defines a decorator to register a validator with a name for look-up.
544
545     If name is not provided we use function name as name of the validator.
546     """
547     def decorator(func):
548         _VALIDATORS[kwargs.pop('name', func.__name__)] = func
549         return func
550
551     return decorator
552
553
554 def get_validator(name):
555     """Returns a validator registered for given name.
556     """
557     return _VALIDATORS.get(name)