1 # Copyright (C) 2014 Nippon Telegraph and Telephone Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 Defines some base class related to managing green threads.
18 from __future__ import absolute_import
21 from collections import OrderedDict
31 from ryu.lib import hub
32 from ryu.lib import sockopt
33 from ryu.lib import ip
34 from ryu.lib.hub import Timeout
35 from ryu.lib.packet.bgp import RF_IPv4_UC
36 from ryu.lib.packet.bgp import RF_IPv6_UC
37 from ryu.lib.packet.bgp import RF_IPv4_VPN
38 from ryu.lib.packet.bgp import RF_IPv6_VPN
39 from ryu.lib.packet.bgp import RF_L2_EVPN
40 from ryu.lib.packet.bgp import RF_IPv4_FLOWSPEC
41 from ryu.lib.packet.bgp import RF_IPv6_FLOWSPEC
42 from ryu.lib.packet.bgp import RF_VPNv4_FLOWSPEC
43 from ryu.lib.packet.bgp import RF_VPNv6_FLOWSPEC
44 from ryu.lib.packet.bgp import RF_L2VPN_FLOWSPEC
45 from ryu.lib.packet.bgp import RF_RTC_UC
46 from ryu.services.protocols.bgp.utils.circlist import CircularListType
47 from ryu.services.protocols.bgp.utils.evtlet import LoopingCall
50 # Logger instance for this module.
51 LOG = logging.getLogger('bgpspeaker.base')
53 # Pointer to active/available OrderedDict.
54 OrderedDict = OrderedDict
57 # Currently supported address families.
58 SUPPORTED_GLOBAL_RF = {
74 ACTIVITY_ERROR_CODE = 100
75 RUNTIME_CONF_ERROR_CODE = 200
77 NET_CTRL_ERROR_CODE = 400
79 PREFIX_ERROR_CODE = 600
80 BGP_PROCESSOR_ERROR_CODE = 700
83 # Registry of custom exceptions
85 # Value: exception class
86 _EXCEPTION_REGISTRY = {}
89 class BGPSException(Exception):
90 """Base exception class for all BGPS related exceptions.
95 DEF_DESC = 'Unknown exception.'
97 def __init__(self, desc=None):
98 super(BGPSException, self).__init__()
100 desc = self.__class__.DEF_DESC
102 self.message = '%d.%d - %s' % (kls.CODE, kls.SUB_CODE, desc)
106 return '<%s(desc=%s)>' % (kls, self.message)
108 def __str__(self, *args, **kwargs):
112 def add_bgp_error_metadata(code, sub_code, def_desc='unknown'):
113 """Decorator for all exceptions that want to set exception class meta-data.
115 # Check registry if we already have an exception with same code/sub-code
116 if _EXCEPTION_REGISTRY.get((code, sub_code)) is not None:
117 raise ValueError('BGPSException with code %d and sub-code %d '
118 'already defined.' % (code, sub_code))
120 def decorator(subclass):
121 """Sets class constants for exception code and sub-code.
123 If given class is sub-class of BGPSException we sets class constants.
125 if issubclass(subclass, BGPSException):
126 _EXCEPTION_REGISTRY[(code, sub_code)] = subclass
128 subclass.SUB_CODE = sub_code
129 subclass.DEF_DESC = def_desc
134 @add_bgp_error_metadata(code=ACTIVITY_ERROR_CODE,
136 def_desc='Unknown activity exception.')
137 class ActivityException(BGPSException):
138 """Base class for exceptions related to Activity.
143 @six.add_metaclass(abc.ABCMeta)
144 class Activity(object):
145 """Base class for a thread of execution that provides some custom settings.
147 Activity is also a container of other activities or threads that it has
148 started. Inside a Activity you should always use one of the spawn method
149 to start another activity or greenthread. Activity is also holds pointers
150 to sockets that it or its child activities of threads have create.
153 def __init__(self, name=None):
155 if self._name is None:
156 self._name = 'UnknownActivity: ' + str(time.time())
157 self._child_thread_map = weakref.WeakValueDictionary()
158 self._child_activity_map = weakref.WeakValueDictionary()
159 self._asso_socket_map = weakref.WeakValueDictionary()
160 self._timers = weakref.WeakValueDictionary()
161 self._started = False
171 def _validate_activity(self, activity):
172 """Checks the validity of the given activity before it can be started.
174 if not self._started:
175 raise ActivityException(desc='Tried to spawn a child activity'
176 ' before Activity was started.')
179 raise ActivityException(desc='Tried to start an Activity that was '
182 def _spawn_activity(self, activity, *args, **kwargs):
183 """Starts *activity* in a new thread and passes *args* and *kwargs*.
185 Maintains pointer to this activity and stops *activity* when this
188 self._validate_activity(activity)
190 # Spawn a new greenthread for given activity
191 greenthread = hub.spawn(activity.start, *args, **kwargs)
192 self._child_thread_map[activity.name] = greenthread
193 self._child_activity_map[activity.name] = activity
196 def _spawn_activity_after(self, seconds, activity, *args, **kwargs):
197 self._validate_activity(activity)
199 # Schedule to spawn a new greenthread after requested delay
200 greenthread = hub.spawn_after(seconds, activity.start, *args,
202 self._child_thread_map[activity.name] = greenthread
203 self._child_activity_map[activity.name] = activity
206 def _validate_callable(self, callable_):
207 if callable_ is None:
208 raise ActivityException(desc='Callable cannot be None')
210 if not hasattr(callable_, '__call__'):
211 raise ActivityException(desc='Currently only supports instances'
212 ' that have __call__ as callable which'
213 ' is missing in given arg.')
214 if not self._started:
215 raise ActivityException(desc='Tried to spawn a child thread '
216 'before this Activity was started.')
218 def _spawn(self, name, callable_, *args, **kwargs):
219 self._validate_callable(callable_)
220 greenthread = hub.spawn(callable_, *args, **kwargs)
221 self._child_thread_map[name] = greenthread
224 def _spawn_after(self, name, seconds, callable_, *args, **kwargs):
225 self._validate_callable(callable_)
226 greenthread = hub.spawn_after(seconds, callable_, *args, **kwargs)
227 self._child_thread_map[name] = greenthread
230 def _create_timer(self, name, func, *arg, **kwarg):
231 timer = LoopingCall(func, *arg, **kwarg)
232 self._timers[name] = timer
236 def _run(self, *args, **kwargs):
237 """Main activity of this class.
239 Can launch other activity/callables here.
240 Sub-classes should override this method.
242 raise NotImplementedError()
244 def start(self, *args, **kwargs):
245 """Starts the main activity of this class.
247 Calls *_run* and calls *stop* when *_run* is finished.
248 This method should be run in a new greenthread as it may not return
252 raise ActivityException(desc='Activity already started')
256 self._run(*args, **kwargs)
257 except BGPSException:
258 LOG.error(traceback.format_exc())
260 if self.started: # could have been stopped somewhere else
263 def pause(self, seconds=0):
264 """Relinquishes hub for given number of seconds.
266 In other words is puts to sleep to give other greenthread a chance to
271 def _stop_child_activities(self, name=None):
272 """Stop all child activities spawn by this activity.
274 # Makes a list copy of items() to avoid dictionary size changed
276 for child_name, child in list(self._child_activity_map.items()):
277 if name is not None and name != child_name:
279 LOG.debug('%s: Stopping child activity %s ', self.name, child_name)
282 self._child_activity_map.pop(child_name, None)
284 def _stop_child_threads(self, name=None):
285 """Stops all threads spawn by this activity.
287 for thread_name, thread in list(self._child_thread_map.items()):
288 if name is not None and thread_name is name:
289 LOG.debug('%s: Stopping child thread %s',
290 self.name, thread_name)
292 self._child_thread_map.pop(thread_name, None)
294 def _close_asso_sockets(self):
295 """Closes all the sockets linked to this activity.
297 for sock_name, sock in list(self._asso_socket_map.items()):
298 LOG.debug('%s: Closing socket %s - %s', self.name, sock_name, sock)
301 def _stop_timers(self):
302 for timer_name, timer in list(self._timers.items()):
303 LOG.debug('%s: Stopping timer %s', self.name, timer_name)
307 """Stops all child threads and activities and closes associated
310 Re-initializes this activity to be able to start again.
311 Raise `ActivityException` if activity is not currently started.
314 raise ActivityException(desc='Cannot call stop when activity is '
315 'not started or has been stopped already.')
317 LOG.debug('Stopping activity %s.', self.name)
319 self._stop_child_activities()
320 self._stop_child_threads()
321 self._close_asso_sockets()
323 # Setup activity for start again.
324 self._started = False
325 self._asso_socket_map = weakref.WeakValueDictionary()
326 self._child_activity_map = weakref.WeakValueDictionary()
327 self._child_thread_map = weakref.WeakValueDictionary()
328 self._timers = weakref.WeakValueDictionary()
329 LOG.debug('Stopping activity %s finished.', self.name)
331 def _canonicalize_ip(self, ip):
332 addr = netaddr.IPAddress(ip)
333 if addr.is_ipv4_mapped():
334 ip = str(addr.ipv4())
337 def get_remotename(self, sock):
338 addr, port = sock.getpeername()[:2]
339 return self._canonicalize_ip(addr), str(port)
341 def get_localname(self, sock):
342 addr, port = sock.getsockname()[:2]
343 return self._canonicalize_ip(addr), str(port)
345 def _create_listen_socket(self, family, loc_addr):
346 s = socket.socket(family)
347 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
352 def _listen_socket_loop(self, s, conn_handle):
354 sock, client_address = s.accept()
355 client_address, port = self.get_remotename(sock)
356 LOG.debug('Connect request received from client for port'
357 ' %s:%s', client_address, port)
358 client_name = self.name + '_client@' + client_address
359 self._asso_socket_map[client_name] = sock
360 self._spawn(client_name, conn_handle, sock)
362 def _listen_tcp(self, loc_addr, conn_handle):
363 """Creates a TCP server socket which listens on `port` number.
365 For each connection `server_factory` starts a new protocol.
367 info = socket.getaddrinfo(loc_addr[0], loc_addr[1], socket.AF_UNSPEC,
368 socket.SOCK_STREAM, 0, socket.AI_PASSIVE)
371 af, socktype, proto, _, sa = res
374 sock = socket.socket(af, socktype, proto)
375 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
376 if af == socket.AF_INET6:
377 sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
381 listen_sockets[sa] = sock
382 except socket.error as e:
383 LOG.error('Error creating socket: %s', e)
390 for sa in listen_sockets:
391 name = self.name + '_server@' + str(sa[0])
392 self._asso_socket_map[name] = listen_sockets[sa]
395 server = eventlet.spawn(self._listen_socket_loop,
396 listen_sockets[sa], conn_handle)
398 self._child_thread_map[name] = server
401 server = self._spawn(name, self._listen_socket_loop,
402 listen_sockets[sa], conn_handle)
403 return server, listen_sockets
405 def _connect_tcp(self, peer_addr, conn_handler, time_out=None,
406 bind_address=None, password=None):
407 """Creates a TCP connection to given peer address.
409 Tries to create a socket for `timeout` number of seconds. If
410 successful, uses the socket instance to start `client_factory`.
411 The socket is bound to `bind_address` if specified.
413 LOG.debug('Connect TCP called for %s:%s', peer_addr[0], peer_addr[1])
414 if ip.valid_ipv4(peer_addr[0]):
415 family = socket.AF_INET
417 family = socket.AF_INET6
418 with Timeout(time_out, socket.error):
419 sock = socket.socket(family)
421 sock.bind(bind_address)
423 sockopt.set_tcp_md5sig(sock, peer_addr[0], password)
424 sock.connect(peer_addr)
425 # socket.error exception is raised in case of timeout and
426 # the following code is executed only when the connection
429 # Connection name for pro-active connection is made up of
430 # local end address + remote end address
431 local = self.get_localname(sock)[0]
432 remote = self.get_remotename(sock)[0]
433 conn_name = ('L: ' + local + ', R: ' + remote)
434 self._asso_socket_map[conn_name] = sock
435 # If connection is established, we call connection handler
437 self._spawn(conn_name, conn_handler, sock)
445 """An entity to which we send out messages (eg. BGP routes)."""
450 # A circular list type in which objects are linked to each
451 # other using the 'next_sink_out_route' and 'prev_sink_out_route'
454 OutgoingMsgList = CircularListType(next_attr_name='next_sink_out_route',
455 prev_attr_name='prev_sink_out_route')
457 # Next available index that can identify an instance uniquely.
462 """Increments the sink index and returns the value."""
467 # A small integer that represents this sink.
468 self.index = Sink.next_index()
470 # Create an event for signal enqueuing.
471 from .utils.evtlet import EventletIOFactory
472 self.outgoing_msg_event = EventletIOFactory.create_custom_event()
474 self.messages_queued = 0
475 # List of msgs. that are to be sent to this peer. Each item
476 # in the list is an instance of OutgoingRoute.
477 self.outgoing_msg_list = Sink.OutgoingMsgList()
479 def clear_outgoing_msg_list(self):
480 self.outgoing_msg_list = Sink.OutgoingMsgList()
482 def enque_outgoing_msg(self, msg):
483 self.outgoing_msg_list.append(msg)
484 self.outgoing_msg_event.set()
486 self.messages_queued += 1
488 def enque_first_outgoing_msg(self, msg):
489 self.outgoing_msg_list.prepend(msg)
490 self.outgoing_msg_event.set()
496 """Pops and returns the first outgoing message from the list.
498 If message list currently has no messages, the calling thread will
499 be put to sleep until we have at-least one message in the list that
500 can be popped and returned.
502 # We pick the first outgoing available and send it.
503 outgoing_msg = self.outgoing_msg_list.pop_first()
504 # If we do not have any outgoing msg., we wait.
505 if outgoing_msg is None:
506 self.outgoing_msg_event.clear()
507 self.outgoing_msg_event.wait()
508 outgoing_msg = self.outgoing_msg_list.pop_first()
512 # For Python 3 compatibility
519 class Source(object):
520 """An entity that gives us BGP routes. A BGP peer, for example."""
522 def __init__(self, version_num):
523 # Number that is currently being used to stamp information
524 # received from this source. We will bump this number up when
525 # the information that is now expected from the source belongs
526 # to a different logical batch. This mechanism can be used to
527 # identify stale information.
528 self.version_num = version_num
531 class FlexinetPeer(Source, Sink):
533 # Initialize source and sink
534 Source.__init__(self, 1)
538 # Registry of validators for configuration/settings.
542 def validate(**kwargs):
543 """Defines a decorator to register a validator with a name for look-up.
545 If name is not provided we use function name as name of the validator.
548 _VALIDATORS[kwargs.pop('name', func.__name__)] = func
554 def get_validator(name):
555 """Returns a validator registered for given name.
557 return _VALIDATORS.get(name)