1 # Copyright (C) 2014 Nippon Telegraph and Telephone Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
19 Provides CoreService which is responsible for establishing bgp sessions with
20 peers and maintains VRFs and Global tables.
26 from ryu.lib.packet.bgp import BGP_ERROR_CEASE
27 from ryu.lib.packet.bgp import BGP_ERROR_SUB_CONNECTION_RESET
28 from ryu.lib.packet.bgp import BGP_ERROR_SUB_CONNECTION_COLLISION_RESOLUTION
29 from ryu.lib.packet.bgp import RF_RTC_UC
30 from ryu.lib.packet.bgp import BGP_ATTR_ORIGIN_INCOMPLETE
32 from ryu.services.protocols.bgp.base import Activity
33 from ryu.services.protocols.bgp.base import add_bgp_error_metadata
34 from ryu.services.protocols.bgp.base import BGPSException
35 from ryu.services.protocols.bgp.base import CORE_ERROR_CODE
36 from ryu.services.protocols.bgp.constants import STD_BGP_SERVER_PORT_NUM
37 from ryu.services.protocols.bgp import core_managers
38 from ryu.services.protocols.bgp.model import FlexinetOutgoingRoute
39 from ryu.services.protocols.bgp.protocol import Factory
40 from ryu.services.protocols.bgp.signals.emit import BgpSignalBus
41 from ryu.services.protocols.bgp.speaker import BgpProtocol
42 from ryu.services.protocols.bgp.utils.rtfilter import RouteTargetManager
43 from ryu.services.protocols.bgp.rtconf.neighbors import CONNECT_MODE_ACTIVE
44 from ryu.services.protocols.bgp.utils import stats
45 from ryu.services.protocols.bgp.bmp import BMPClient
46 from ryu.lib import sockopt
47 from ryu.lib import ip
50 LOG = logging.getLogger('bgpspeaker.core')
52 # Interface IP address on which to run bgp server. Core service listens on all
53 # interfaces of the host on port 179 - standard bgp port.
56 # Required dictates that Origin attribute be incomplete
57 EXPECTED_ORIGIN = BGP_ATTR_ORIGIN_INCOMPLETE
60 @add_bgp_error_metadata(code=CORE_ERROR_CODE, sub_code=1,
61 def_desc='Unknown error occurred related to core.')
62 class BgpCoreError(BGPSException):
63 """Base exception related to all tables and peer management.
68 class CoreService(Factory, Activity):
69 """A service that maintains eBGP/iBGP sessions with BGP peers.
71 Two instances of this class don't share any BGP state with each
72 other. Manages peers, tables for various address-families, etc.
75 protocol = BgpProtocol
77 def __init__(self, common_conf, neighbors_conf, vrfs_conf):
78 self._common_config = common_conf
79 self._neighbors_conf = neighbors_conf
80 self._vrfs_conf = vrfs_conf
82 Activity.__init__(self, name='core_service')
84 self._signal_bus = BgpSignalBus()
85 self._init_signal_listeners()
87 self._rt_mgr = RouteTargetManager(self, neighbors_conf, vrfs_conf)
89 self._table_manager = core_managers.TableCoreManager(
93 self._importmap_manager = core_managers.ImportMapManager()
95 # Autonomous system number of this BGP speaker.
96 self._asn = self._common_config.local_as
98 self._peer_manager = core_managers.PeerManager(
100 self._neighbors_conf,
103 # Initialize sink for flexinet-peers
106 self._conf_manager = core_managers.ConfigurationManager(
107 self, common_conf, vrfs_conf, neighbors_conf
110 # Register Flexinet peer sink
111 from ryu.services.protocols.bgp.net_ctrl import NET_CONTROLLER
113 self.register_flexinet_sink(NET_CONTROLLER)
115 # State per route family
117 # Value: BgpInstanceRf
120 # Protocol factories for pro-active and re-active bgp-sessions.
121 self.client_factory = None
122 self.server_factory = None
126 self._next_hop_label = {}
128 # BgpProcessor instance (initialized during start)
129 self._bgp_processor = None
131 # BMP clients key: (host, port) value: BMPClient instance
134 def _init_signal_listeners(self):
135 self._signal_bus.register_listener(
136 BgpSignalBus.BGP_DEST_CHANGED,
137 lambda _, dest: self.enqueue_for_bgp_processing(dest)
139 self._signal_bus.register_listener(
140 BgpSignalBus.BGP_VRF_REMOVED,
141 lambda _, route_dist: self.on_vrf_removed(route_dist)
143 self._signal_bus.register_listener(
144 BgpSignalBus.BGP_VRF_ADDED,
145 lambda _, vrf_conf: self.on_vrf_added(vrf_conf)
147 self._signal_bus.register_listener(
148 BgpSignalBus.BGP_VRF_STATS_CONFIG_CHANGED,
149 lambda _, vrf_conf: self.on_stats_config_change(vrf_conf)
154 return self._common_config.router_id
157 def global_interested_rts(self):
158 return self._rt_mgr.global_interested_rts
165 def table_manager(self):
166 return self._table_manager
169 def importmap_manager(self):
170 return self._importmap_manager
173 def peer_manager(self):
174 return self._peer_manager
177 def rt_manager(self):
181 def signal_bus(self):
182 return self._signal_bus
184 def enqueue_for_bgp_processing(self, dest):
185 return self._bgp_processor.enqueue(dest)
187 def on_vrf_removed(self, route_dist):
188 # Remove stats timer linked with this vrf.
189 vrf_stats_timer = self._timers.get(route_dist)
191 vrf_stats_timer.stop()
192 del self._timers[route_dist]
194 def on_vrf_added(self, vrf_conf):
195 # Setup statistics timer.
196 rd = vrf_conf.route_dist
197 rf = vrf_conf.route_family
198 vrf_table = self._table_manager.get_vrf_table(rd, rf)
199 vrf_stats_timer = self._create_timer(
202 stats_source=vrf_table.get_stats_summary_dict
205 # Start statistics timer if applicable.
206 if vrf_conf.stats_log_enabled:
207 vrf_stats_timer.start(vrf_conf.stats_time)
209 def on_stats_config_change(self, vrf_conf):
210 vrf_stats_timer = self._timers.get(
213 vrf_stats_timer.stop()
214 vrf_stats_timer.start(vrf_conf.stats_time)
216 def _run(self, *args, **kwargs):
217 from ryu.services.protocols.bgp.processor import BgpProcessor
218 # Initialize bgp processor.
219 self._bgp_processor = BgpProcessor(self)
220 # Start BgpProcessor in a separate thread.
221 processor_thread = self._spawn_activity(self._bgp_processor)
223 # Pro-actively try to establish bgp-session with peers.
224 for peer in self._peer_manager.iterpeers:
225 self._spawn_activity(peer, self.start_protocol)
227 # Reactively establish bgp-session with peer by listening on
228 # the given server hosts and port for connection requests.
229 waiter = kwargs.pop('waiter')
231 self.listen_sockets = {}
232 if self._common_config.bgp_server_port != 0:
233 for host in self._common_config.bgp_server_hosts:
234 server_thread, sockets = self._listen_tcp(
235 (host, self._common_config.bgp_server_port),
237 self.listen_sockets.update(sockets)
239 processor_thread.wait()
241 # ========================================================================
242 # RTC address family related utilities
243 # ========================================================================
245 def update_rtfilters(self):
246 """Updates RT filters for each peer.
248 Should be called if a new RT Nlri's have changed based on the setting.
249 Currently only used by `Processor` to update the RT filters after it
250 has processed a RT destination. If RT filter has changed for a peer we
251 call RT filter change handler.
253 # Update RT filter for all peers
254 # TODO(PH): Check if getting this map can be optimized (if expensive)
255 new_peer_to_rtfilter_map = self._compute_rtfilter_map()
257 # If we have new best path for RT NLRI, we have to update peer RT
258 # filters and take appropriate action of sending them NLRIs for other
259 # address-families as per new RT filter if necessary.
260 for peer in self._peer_manager.iterpeers:
261 pre_rt_filter = self._rt_mgr.peer_to_rtfilter_map.get(peer, set())
262 curr_rt_filter = new_peer_to_rtfilter_map.get(peer, set())
264 old_rts = pre_rt_filter - curr_rt_filter
265 new_rts = curr_rt_filter - pre_rt_filter
266 # If interested RTs for a peer changes
267 if new_rts or old_rts:
268 LOG.debug('RT Filter for peer %s updated: '
269 'Added RTs %s, Removed Rts %s',
270 peer.ip_address, new_rts, old_rts)
271 self._on_update_rt_filter(peer, new_rts, old_rts)
272 # Update to new RT filters
273 self._peer_manager.set_peer_to_rtfilter_map(new_peer_to_rtfilter_map)
274 self._rt_mgr.peer_to_rtfilter_map = new_peer_to_rtfilter_map
275 LOG.debug('Updated RT filters: %s', self._rt_mgr.peer_to_rtfilter_map)
276 # Update interested RTs i.e. RTs on the path that will be installed
278 self._rt_mgr.update_interested_rts()
280 def _on_update_rt_filter(self, peer, new_rts, old_rts):
281 """Handles update of peer RT filter.
284 - `peer`: (Peer) whose RT filter has changed.
285 - `new_rts`: (set) of new RTs that peer is interested in.
286 - `old_rts`: (set) of RTs that peers is no longer interested in.
288 for table in self._table_manager._global_tables.values():
289 if table.route_family == RF_RTC_UC:
291 self._spawn('rt_filter_chg_%s' % peer,
292 self._rt_mgr.on_rt_filter_chg_sync_peer,
293 peer, new_rts, old_rts, table)
294 LOG.debug('RT Filter change handler launched for route_family %s',
297 def _compute_rtfilter_map(self):
298 """Returns neighbor's RT filter (permit/allow filter based on RT).
300 Walks RT filter tree and computes current RT filters for each peer that
301 have advertised RT NLRIs.
303 dict of peer, and `set` of rts that a particular neighbor is
308 def get_neigh_filter(neigh):
309 neigh_filter = rtfilter_map.get(neigh)
310 # Lazy creation of neighbor RT filter
311 if neigh_filter is None:
313 rtfilter_map[neigh] = neigh_filter
316 # Check if we have to use all paths or just best path
317 if self._common_config.max_path_ext_rtfilter_all:
318 # We have to look at all paths for a RtDest
319 for rtcdest in self._table_manager.get_rtc_table().values():
320 known_path_list = rtcdest.known_path_list
321 for path in known_path_list:
328 neigh_filter = get_neigh_filter(neigh)
329 neigh_filter.add(path.nlri.route_target)
331 # We iterate over all destination of the RTC table and for iBGP
332 # peers we use all known paths' RTs for RT filter and for eBGP
333 # peers we only consider best-paths' RTs for RT filter
334 for rtcdest in self._table_manager.get_rtc_table().values():
335 path = rtcdest.best_path
336 # If this destination does not have any path, we continue
341 # Consider only eBGP peers and ignore NC
342 if neigh and neigh.is_ebgp_peer():
343 # For eBGP peers we use only best-path to learn RT filter
344 neigh_filter = get_neigh_filter(neigh)
345 neigh_filter.add(path.nlri.route_target)
347 # For iBGP peers we use all known paths to learn RT filter
348 known_path_list = rtcdest.known_path_list
349 for path in known_path_list:
351 # We ignore NC, and eBGP peers
352 if neigh and not neigh.is_ebgp_peer():
353 neigh_filter = get_neigh_filter(neigh)
354 neigh_filter.add(path.nlri.route_target)
358 # ========================================================================
359 # Peer or Neighbor related handles/utilities.
360 # ========================================================================
361 def register_flexinet_sink(self, sink):
362 self._sinks.add(sink)
364 def unregister_flexinet_sink(self, sink):
365 self._sinks.remove(sink)
367 def update_flexinet_peers(self, path, route_dist):
368 for sink in self._sinks:
369 out_route = FlexinetOutgoingRoute(path, route_dist)
370 sink.enque_outgoing_msg(out_route)
372 def _set_password(self, address, password):
373 if ip.valid_ipv4(address):
374 family = socket.AF_INET
376 family = socket.AF_INET6
378 for sock in self.listen_sockets.values():
379 if sock.family == family:
380 sockopt.set_tcp_md5sig(sock, address, password)
382 def on_peer_added(self, peer):
383 if peer._neigh_conf.password:
384 self._set_password(peer._neigh_conf.ip_address,
385 peer._neigh_conf.password)
388 self._spawn_activity(
389 peer, self.start_protocol
392 # We need to handle new RTC_AS
393 if peer.rtc_as != self.asn:
395 'NEW_RTC_AS_HANDLER %s' % peer.rtc_as,
396 self._rt_mgr.update_rtc_as_set
399 def on_peer_removed(self, peer):
400 if peer._neigh_conf.password:
401 # setting zero length key means deleting the key
402 self._set_password(peer._neigh_conf.ip_address, '')
404 if peer.rtc_as != self.asn:
406 'OLD_RTC_AS_HANDLER %s' % peer.rtc_as,
407 self._rt_mgr.update_rtc_as_set
410 def build_protocol(self, socket):
412 # Check if its a reactive connection or pro-active connection
413 _, remote_port = self.get_remotename(socket)
414 remote_port = int(remote_port)
415 is_reactive_conn = True
416 if remote_port == STD_BGP_SERVER_PORT_NUM:
417 is_reactive_conn = False
419 bgp_protocol = self.protocol(
422 is_reactive_conn=is_reactive_conn
426 def start_protocol(self, socket):
427 """Handler of new connection requests on bgp server port.
429 Checks if new connection request is valid and starts new instance of
433 peer_addr, peer_port = self.get_remotename(socket)
434 peer = self._peer_manager.get_by_addr(peer_addr)
435 bgp_proto = self.build_protocol(socket)
437 # We reject this connection request from peer:
438 # 1) If we have connection initiated by a peer that is not in our
440 # 2) If this neighbor is not enabled according to configuration.
441 if not peer or not peer.enabled:
442 LOG.debug('Closed connection %s %s:%s as it is not a recognized'
443 ' peer.', 'from' if bgp_proto.is_reactive else 'to',
444 peer_addr, peer_port)
445 # Send connection rejected notification as per RFC
446 code = BGP_ERROR_CEASE
447 subcode = BGP_ERROR_SUB_CONNECTION_RESET
448 bgp_proto.send_notification(code, subcode)
449 elif bgp_proto.is_reactive and \
450 peer.connect_mode is CONNECT_MODE_ACTIVE:
451 LOG.debug('Closed connection from %s:%s as connect_mode is'
452 ' configured ACTIVE.', peer_addr, peer_port)
453 # Send connection rejected notification as per RFC
454 code = BGP_ERROR_CEASE
455 subcode = BGP_ERROR_SUB_CONNECTION_RESET
456 bgp_proto.send_notification(code, subcode)
457 elif not (peer.in_idle() or peer.in_active() or peer.in_connect()):
458 LOG.debug('Closing connection to %s:%s as we have connection'
459 ' in state other than IDLE or ACTIVE,'
460 ' i.e. connection resolution',
461 peer_addr, peer_port)
462 # Send Connection Collision Resolution notification as per RFC.
463 code = BGP_ERROR_CEASE
464 subcode = BGP_ERROR_SUB_CONNECTION_COLLISION_RESOLUTION
465 bgp_proto.send_notification(code, subcode)
467 bind_ip, bind_port = self.get_localname(socket)
468 peer._host_bind_ip = bind_ip
469 peer._host_bind_port = bind_port
470 self._spawn_activity(bgp_proto, peer)
472 def start_bmp(self, host, port):
473 if (host, port) in self.bmpclients:
474 bmpclient = self.bmpclients[(host, port)]
475 if bmpclient.started:
476 LOG.warning("bmpclient is already running for %s:%s",
479 bmpclient = BMPClient(self, host, port)
480 self.bmpclients[(host, port)] = bmpclient
481 self._spawn_activity(bmpclient)
484 def stop_bmp(self, host, port):
485 if (host, port) not in self.bmpclients:
486 LOG.warning("no bmpclient is running for %s:%s", host, port)
489 bmpclient = self.bmpclients[(host, port)]