1 # Copyright (C) 2014 Nippon Telegraph and Telephone Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 BGP peer related classes and utils.
18 from collections import namedtuple
24 from six.moves import zip_longest
26 from ryu.services.protocols.bgp.base import Activity
27 from ryu.services.protocols.bgp.base import Sink
28 from ryu.services.protocols.bgp.base import Source
29 from ryu.services.protocols.bgp.base import SUPPORTED_GLOBAL_RF
30 from ryu.services.protocols.bgp import constants as const
31 from ryu.services.protocols.bgp.model import OutgoingRoute
32 from ryu.services.protocols.bgp.model import SentRoute
33 from ryu.services.protocols.bgp.info_base.base import PrefixFilter
34 from ryu.services.protocols.bgp.info_base.base import AttributeMap
35 from ryu.services.protocols.bgp.model import ReceivedRoute
36 from ryu.services.protocols.bgp.net_ctrl import NET_CONTROLLER
37 from ryu.services.protocols.bgp.rtconf.neighbors import NeighborConfListener
38 from ryu.services.protocols.bgp.rtconf.neighbors import CONNECT_MODE_PASSIVE
39 from ryu.services.protocols.bgp.signals.emit import BgpSignalBus
40 from ryu.services.protocols.bgp.speaker import BgpProtocol
41 from ryu.services.protocols.bgp.info_base.ipv4 import Ipv4Path
42 from ryu.services.protocols.bgp.info_base.vpnv4 import Vpnv4Path
43 from ryu.services.protocols.bgp.info_base.vpnv6 import Vpnv6Path
44 from ryu.services.protocols.bgp.rtconf.vrfs import VRF_RF_IPV4, VRF_RF_IPV6
45 from ryu.services.protocols.bgp.utils import bgp as bgp_utils
46 from ryu.services.protocols.bgp.utils.evtlet import EventletIOFactory
47 from ryu.services.protocols.bgp.utils import stats
48 from ryu.services.protocols.bgp.utils.validation import is_valid_old_asn
50 from ryu.lib.packet import bgp
52 from ryu.lib.packet.bgp import RouteFamily
53 from ryu.lib.packet.bgp import RF_IPv4_UC
54 from ryu.lib.packet.bgp import RF_IPv6_UC
55 from ryu.lib.packet.bgp import RF_IPv4_VPN
56 from ryu.lib.packet.bgp import RF_IPv6_VPN
57 from ryu.lib.packet.bgp import RF_IPv4_FLOWSPEC
58 from ryu.lib.packet.bgp import RF_VPNv4_FLOWSPEC
59 from ryu.lib.packet.bgp import RF_RTC_UC
60 from ryu.lib.packet.bgp import get_rf
62 from ryu.lib.packet.bgp import BGPOpen
63 from ryu.lib.packet.bgp import BGPUpdate
64 from ryu.lib.packet.bgp import BGPRouteRefresh
66 from ryu.lib.packet.bgp import BGP_ERROR_CEASE
67 from ryu.lib.packet.bgp import BGP_ERROR_SUB_ADMINISTRATIVE_SHUTDOWN
68 from ryu.lib.packet.bgp import BGP_ERROR_SUB_CONNECTION_COLLISION_RESOLUTION
71 from ryu.lib.packet.bgp import BGP_MSG_UPDATE
72 from ryu.lib.packet.bgp import BGP_MSG_KEEPALIVE
73 from ryu.lib.packet.bgp import BGP_MSG_ROUTE_REFRESH
75 from ryu.lib.packet.bgp import BGPPathAttributeNextHop
76 from ryu.lib.packet.bgp import BGPPathAttributeAsPath
77 from ryu.lib.packet.bgp import BGPPathAttributeAs4Path
78 from ryu.lib.packet.bgp import BGPPathAttributeLocalPref
79 from ryu.lib.packet.bgp import BGPPathAttributeExtendedCommunities
80 from ryu.lib.packet.bgp import BGPPathAttributeOriginatorId
81 from ryu.lib.packet.bgp import BGPPathAttributeClusterList
82 from ryu.lib.packet.bgp import BGPPathAttributeMpReachNLRI
83 from ryu.lib.packet.bgp import BGPPathAttributeMpUnreachNLRI
84 from ryu.lib.packet.bgp import BGPPathAttributeCommunities
85 from ryu.lib.packet.bgp import BGPPathAttributeMultiExitDisc
87 from ryu.lib.packet.bgp import BGP_ATTR_TYPE_ORIGIN
88 from ryu.lib.packet.bgp import BGP_ATTR_TYPE_AGGREGATOR
89 from ryu.lib.packet.bgp import BGP_ATTR_TYPE_AS4_AGGREGATOR
90 from ryu.lib.packet.bgp import BGP_ATTR_TYPE_AS_PATH
91 from ryu.lib.packet.bgp import BGP_ATTR_TYPE_AS4_PATH
92 from ryu.lib.packet.bgp import BGP_ATTR_TYPE_NEXT_HOP
93 from ryu.lib.packet.bgp import BGP_ATTR_TYPE_MP_REACH_NLRI
94 from ryu.lib.packet.bgp import BGP_ATTR_TYPE_MP_UNREACH_NLRI
95 from ryu.lib.packet.bgp import BGP_ATTR_TYPE_MULTI_EXIT_DISC
96 from ryu.lib.packet.bgp import BGP_ATTR_TYPE_COMMUNITIES
97 from ryu.lib.packet.bgp import BGP_ATTR_TYPE_ORIGINATOR_ID
98 from ryu.lib.packet.bgp import BGP_ATTR_TYPE_CLUSTER_LIST
99 from ryu.lib.packet.bgp import BGP_ATTR_TYPE_EXTENDED_COMMUNITIES
100 from ryu.lib.packet.bgp import BGP_ATTR_TYEP_PMSI_TUNNEL_ATTRIBUTE
102 from ryu.lib.packet.bgp import BGPTwoOctetAsSpecificExtendedCommunity
103 from ryu.lib.packet.bgp import BGPIPv4AddressSpecificExtendedCommunity
105 from ryu.lib.packet import safi as subaddr_family
107 LOG = logging.getLogger('bgpspeaker.peer')
110 def is_valid_state(state):
111 """Returns True if given state is a valid bgp finite state machine state.
113 return state in const.BGP_FSM_VALID_STATES
116 class PeerRf(object):
117 """State maintained per-RouteFamily for a Peer."""
119 def __init__(self, peer, route_family, enabled=False):
120 assert peer and route_family
122 self.enabled = enabled
126 self.rf = route_family
129 PeerCounterNames = namedtuple(
138 'FSM_ESTB_TRANSITIONS')
147 'fms_established_transitions'
151 class PeerState(object):
152 """A BGP neighbor state. Think of this class as of information and stats
156 def __init__(self, peer, signal_bus):
157 # Back pointer to peer whose stats this instances represents.
159 # Current state of BGP finite state machine.
160 self._bgp_state = const.BGP_FSM_IDLE
161 self._established_time = 0
162 self._last_bgp_error = None
167 'recv_notification': 0,
168 'sent_notification': 0,
171 'fms_established_transitions': 0,
173 self._signal_bus = signal_bus
175 # TODO(JK): refactor other counters to use signals also
176 self._signal_bus.register_listener(
177 ('error', 'bgp', self.peer),
178 self._remember_last_bgp_error
181 self._signal_bus.register_listener(
182 BgpSignalBus.BGP_NOTIFICATION_RECEIVED + (self.peer,),
183 lambda _, msg: self.incr(PeerCounterNames.RECV_NOTIFICATION)
186 self._signal_bus.register_listener(
187 BgpSignalBus.BGP_NOTIFICATION_SENT + (self.peer,),
188 lambda _, msg: self.incr(PeerCounterNames.SENT_NOTIFICATION)
191 def _remember_last_bgp_error(self, identifier, data):
192 self._last_bgp_error = dict([(k, v)
193 for k, v in data.items()
197 def recv_prefix(self):
198 # Number of prefixes received from peer.
199 return self.counters[PeerCounterNames.RECV_PREFIXES]
203 return self._bgp_state
206 def bgp_state(self, new_state):
207 old_state = self._bgp_state
208 if old_state == new_state:
211 self._bgp_state = new_state
212 NET_CONTROLLER.send_rpc_notification(
215 'ip_address': self.peer.ip_address,
220 # transition to Established from another state
221 if new_state == const.BGP_FSM_ESTABLISHED:
222 self.incr(PeerCounterNames.FSM_ESTB_TRANSITIONS)
223 self._established_time = time.time()
224 self._signal_bus.adj_up(self.peer)
225 NET_CONTROLLER.send_rpc_notification(
226 'neighbor.up', {'ip_address': self.peer.ip_address}
228 # transition from Established to another state
229 elif old_state == const.BGP_FSM_ESTABLISHED:
230 self._established_time = 0
231 self._signal_bus.adj_down(self.peer)
232 NET_CONTROLLER.send_rpc_notification(
233 'neighbor.down', {'ip_address': self.peer.ip_address}
236 LOG.debug('Peer %s BGP FSM went from %s to %s',
237 self.peer.ip_address, old_state, self.bgp_state)
239 def incr(self, counter_name, incr_by=1):
240 if counter_name not in self.counters:
241 raise ValueError('Un-recognized counter name: %s' % counter_name)
242 counter = self.counters.setdefault(counter_name, 0)
244 self.counters[counter_name] = counter
246 def get_count(self, counter_name):
247 if counter_name not in self.counters:
248 raise ValueError('Un-recognized counter name: %s' % counter_name)
249 return self.counters.get(counter_name, 0)
252 def total_msg_sent(self):
253 """Returns total number of UPDATE, NOTIFICATION and ROUTE_REFRESH
254 message sent to this peer.
256 return (self.get_count(PeerCounterNames.SENT_REFRESH) +
257 self.get_count(PeerCounterNames.SENT_UPDATES))
260 def total_msg_recv(self):
261 """Returns total number of UPDATE, NOTIFICATION and ROUTE_REFRESH
262 messages received from this peer.
264 return (self.get_count(PeerCounterNames.RECV_UPDATES) +
265 self.get_count(PeerCounterNames.RECV_REFRESH) +
266 self.get_count(PeerCounterNames.RECV_NOTIFICATION))
268 def get_stats_summary_dict(self):
269 """Returns basic stats.
271 Returns a `dict` with various counts and stats, see below.
273 uptime = time.time() - self._established_time \
274 if self._established_time != 0 else -1
276 stats.UPDATE_MSG_IN: self.get_count(PeerCounterNames.RECV_UPDATES),
277 stats.UPDATE_MSG_OUT: self.get_count(
278 PeerCounterNames.SENT_UPDATES
280 stats.TOTAL_MSG_IN: self.total_msg_recv,
281 stats.TOTAL_MSG_OUT: self.total_msg_sent,
282 stats.FMS_EST_TRANS: self.get_count(
283 PeerCounterNames.FSM_ESTB_TRANSITIONS
289 class Peer(Source, Sink, NeighborConfListener, Activity):
290 """A BGP neighbor/peer.
292 Listens on neighbor configuration changes and handles change events
293 appropriately. If peering is enabled tries 'actively'/'pro-actively' to
294 establish session with peer. Allows binding of `BgpProtocol` instances to
295 allow 'passive'/'reactive' establishment of bgp session with peer.
296 Maintains BGP state machine (may not be fully compliant with RFC). Handles
297 bgp UPDATE messages. Provides a queue to send update message to peer.
300 RTC_EOR_TIMER_NAME = 'RTC_EOR_Timer'
302 def __init__(self, common_conf, neigh_conf,
303 core_service, signal_bus, peer_manager):
304 peer_activity_name = 'Peer: %s' % neigh_conf.ip_address
305 Activity.__init__(self, name=peer_activity_name)
306 Source.__init__(self, version_num=1)
308 # Add listener for configuration changes.
309 NeighborConfListener.__init__(self, neigh_conf)
311 # Current configuration of this peer.
312 self._neigh_conf = neigh_conf
313 self._common_conf = common_conf
314 self._core_service = core_service
315 self._signal_bus = signal_bus
316 self._peer_manager = peer_manager
319 self._host_bind_ip = None
320 self._host_bind_port = None
322 # TODO(PH): revisit maintaining state/stats information.
324 self.state = PeerState(self, self._signal_bus)
325 self._periodic_stats_logger = \
326 self._create_timer('Peer State Summary Stats Timer',
328 stats_resource=self._neigh_conf,
329 stats_source=self.state.get_stats_summary_dict)
330 if self._neigh_conf.stats_log_enabled:
331 self._periodic_stats_logger.start(self._neigh_conf.stats_time)
333 # State per route family, {RouteFamily: PeerRf,}.
335 # Get vpnv4 route family settings.
336 prf = PeerRf(self, RF_IPv4_VPN,
337 enabled=self._neigh_conf.cap_mbgp_vpnv4)
338 self.rf_state[RF_IPv4_VPN] = prf
339 # Get vpnv6 route family settings.
340 prf = PeerRf(self, RF_IPv6_VPN, self._neigh_conf.cap_mbgp_vpnv6)
341 self.rf_state[RF_IPv6_VPN] = prf
343 # Bound protocol instance
344 self._protocol = None
346 # Setting this event starts the connect_loop loop again
347 # Clearing this event will stop the connect_loop loop
348 self._connect_retry_event = EventletIOFactory.create_custom_event()
350 # Reference to threads related to enhanced refresh timers.
351 self._refresh_stalepath_timer = None
352 self._refresh_max_eor_timer = None
354 # Latest valid Open Message
355 self.curr_open_msg = None
357 # RTC end-of-rib timer
358 self._rtc_eor_timer = None
359 self._sent_init_non_rtc_update = False
360 self._init_rtc_nlri_path = []
363 self._in_filters = self._neigh_conf.in_filter
366 self._out_filters = self._neigh_conf.out_filter
369 self._adj_rib_in = {}
372 self._adj_rib_out = {}
375 self._attribute_maps = {}
379 return self._neigh_conf.remote_as
383 return self._neigh_conf.rtc_as
386 def ip_address(self):
387 return self._neigh_conf.ip_address
391 return self._protocol
394 def host_bind_ip(self):
395 return self._host_bind_ip
398 def host_bind_port(self):
399 return self._host_bind_port
403 return self._neigh_conf.enabled
407 return self._neigh_conf.multi_exit_disc
411 return self._neigh_conf.local_as
414 def cap_four_octet_as_number(self):
415 return self._neigh_conf.cap_four_octet_as_number
418 def in_filters(self):
419 return self._in_filters
422 def in_filters(self, filters):
423 self._in_filters = [f.clone() for f in filters]
424 LOG.debug('set in-filter : %s', filters)
425 self.on_update_in_filter()
428 def out_filters(self):
429 return self._out_filters
432 def out_filters(self, filters):
433 self._out_filters = [f.clone() for f in filters]
434 LOG.debug('set out-filter : %s', filters)
435 self.on_update_out_filter()
438 def adj_rib_in(self):
439 return self._adj_rib_in
442 def adj_rib_out(self):
443 return self._adj_rib_out
446 def is_route_server_client(self):
447 return self._neigh_conf.is_route_server_client
450 def is_route_reflector_client(self):
451 return self._neigh_conf.is_route_reflector_client
454 def check_first_as(self):
455 return self._neigh_conf.check_first_as
458 def connect_mode(self):
459 return self._neigh_conf.connect_mode
462 def attribute_maps(self):
463 return self._attribute_maps
465 @attribute_maps.setter
466 def attribute_maps(self, attribute_maps):
468 _attr_maps.setdefault(const.ATTR_MAPS_ORG_KEY, [])
470 # key is 'default' or rd_rf that represents RD and route_family
471 key = attribute_maps[const.ATTR_MAPS_LABEL_KEY]
472 at_maps = attribute_maps[const.ATTR_MAPS_VALUE]
476 LOG.debug("AttributeMap attr_type: %s, attr_value: %s",
477 cloned.attr_type, cloned.attr_value)
478 attr_list = _attr_maps.setdefault(cloned.attr_type, [])
479 attr_list.append(cloned)
481 # preserve original order of attribute_maps
482 _attr_maps[const.ATTR_MAPS_ORG_KEY].append(cloned)
484 self._attribute_maps[key] = _attr_maps
485 self.on_update_attribute_maps()
487 def is_mpbgp_cap_valid(self, route_family):
488 if not self.in_established:
489 raise ValueError('Invalid request: Peer not in established state')
490 return self._protocol.is_mbgp_cap_valid(route_family)
492 def is_four_octet_as_number_cap_valid(self):
493 if not self.in_established:
494 raise ValueError('Invalid request: Peer not in established state')
495 return self._protocol.is_four_octet_as_number_cap_valid()
497 def is_ebgp_peer(self):
498 """Returns *True* if this is a eBGP peer, else *False*."""
499 return self._common_conf.local_as != self._neigh_conf.remote_as
501 def in_established(self):
502 return self.state.bgp_state == const.BGP_FSM_ESTABLISHED
505 return self.state.bgp_state == const.BGP_FSM_IDLE
508 return self.state.bgp_state == const.BGP_FSM_ACTIVE
510 def in_open_sent(self):
511 return self.state.bgp_state == const.BGP_FSM_OPEN_SENT
513 def in_open_confirm(self):
514 return self.state.bgp_state == const.BGP_FSM_OPEN_CONFIRM
516 def in_connect(self):
517 return self.state.bgp_state == const.BGP_FSM_CONNECT
519 def curr_fms_state(self):
520 return self.state.bgp_state
522 def is_mbgp_cap_valid(self, route_family):
523 if not self.in_established():
526 return self._protocol.is_mbgp_cap_valid(route_family)
528 def on_chg_stats_time_conf_with_stats(self, evt):
529 # TODO(PH): provide implementation when updating neighbor is needed
532 def on_chg_stats_enabled_conf_with_stats(self, evt):
533 # TODO(PH): provide implementation when updating neighbor is needed
536 def on_update_enabled(self, conf_evt):
537 """Implements neighbor configuration change listener.
539 enabled = conf_evt.value
540 # If we do not have any protocol bound and configuration asks us to
541 # enable this peer, we try to establish connection again.
543 LOG.info('%s enabled', self)
544 if self._protocol and self._protocol.started:
545 LOG.error('Tried to enable neighbor that is already enabled')
547 self.state.bgp_state = const.BGP_FSM_CONNECT
548 # Restart connect loop if not already running.
549 if not self._connect_retry_event.is_set():
550 self._connect_retry_event.set()
551 LOG.debug('Starting connect loop as neighbor is enabled.')
553 LOG.info('%s disabled', self)
555 # Stopping protocol will eventually trigger connection_lost
556 # handler which will do some clean-up.
557 # But the greenlet that is in charge of the socket may be kill
558 # when we stop the protocol, hence we call connection_lost
559 # here as we triggered socket to close.
560 self._protocol.send_notification(
562 BGP_ERROR_SUB_ADMINISTRATIVE_SHUTDOWN
564 self._protocol.stop()
565 self._protocol = None
566 self.state.bgp_state = const.BGP_FSM_IDLE
567 # If this peer is not enabled any-more we stop trying to make any
569 LOG.debug('Disabling connect-retry as neighbor was disabled')
570 self._connect_retry_event.clear()
572 def on_update_med(self, conf_evt):
573 LOG.debug('on_update_med fired')
574 if self._protocol is not None and self._protocol.started:
575 negotiated_afs = self._protocol.negotiated_afs
576 for af in negotiated_afs:
577 self._fire_route_refresh(af)
579 def _on_update_connect_mode(self, mode):
580 if mode is not CONNECT_MODE_PASSIVE and \
581 'peer.connect_loop' not in self._child_thread_map:
582 LOG.debug("start connect loop. (mode: %s)", mode)
583 self._spawn('peer.connect_loop', self._connect_loop,
584 self._client_factory)
585 elif mode is CONNECT_MODE_PASSIVE:
586 LOG.debug("stop connect loop. (mode: %s)", mode)
587 self._stop_child_threads('peer.connect_loop')
589 def on_update_connect_mode(self, conf_evt):
590 self._on_update_connect_mode(conf_evt.value)
592 def _apply_filter(self, filters, path):
596 for filter_ in filters:
597 if filter_.ROUTE_FAMILY != path.ROUTE_FAMILY:
600 policy, is_matched = filter_.evaluate(path)
601 if policy == PrefixFilter.POLICY_PERMIT and is_matched:
604 elif policy == PrefixFilter.POLICY_DENY and is_matched:
606 blocked_cause = filter_.prefix + ' - DENY'
609 return block, blocked_cause
611 def _apply_in_filter(self, path):
612 return self._apply_filter(self._in_filters, path)
614 def _apply_out_filter(self, path):
615 return self._apply_filter(self._out_filters, path)
617 def on_update_in_filter(self):
618 LOG.debug('on_update_in_filter fired')
619 for received_path in self._adj_rib_in.values():
620 LOG.debug('received_path: %s', received_path)
621 path = received_path.path
622 nlri_str = path.nlri.formatted_nlri_str
623 block, blocked_reason = self._apply_in_filter(path)
624 if block == received_path.filtered:
625 LOG.debug('block situation not changed: %s', block)
628 # path wasn't blocked, but must be blocked by this update
629 path = path.clone(for_withdrawal=True)
630 LOG.debug('withdraw %s because of in filter update', nlri_str)
632 # path was blocked, but mustn't be blocked by this update
633 LOG.debug('learn blocked %s because of in filter update',
635 received_path.filtered = block
636 tm = self._core_service.table_manager
639 def on_update_out_filter(self):
640 LOG.debug('on_update_out_filter fired')
641 for sent_path in self._adj_rib_out.values():
642 LOG.debug('sent_path: %s', sent_path)
643 path = sent_path.path
644 nlri_str = path.nlri.formatted_nlri_str
645 block, blocked_reason = self._apply_out_filter(path)
646 if block == sent_path.filtered:
647 LOG.debug('block situation not changed: %s', block)
650 # path wasn't blocked, but must be blocked by this update
651 withdraw_clone = path.clone(for_withdrawal=True)
652 outgoing_route = OutgoingRoute(withdraw_clone)
653 LOG.debug('send withdraw %s because of out filter update',
656 # path was blocked, but mustn't be blocked by this update
657 outgoing_route = OutgoingRoute(path)
658 LOG.debug('send blocked %s because of out filter update',
660 sent_path.filtered = block
661 self.enque_outgoing_msg(outgoing_route)
663 def on_update_attribute_maps(self):
664 # resend sent_route in case of filter matching
665 LOG.debug('on_update_attribute_maps fired')
666 for sent_path in self._adj_rib_out.values():
667 LOG.debug('resend path: %s', sent_path)
668 path = sent_path.path
669 self.enque_outgoing_msg(OutgoingRoute(path))
672 return 'Peer(ip: %s, asn: %s)' % (self._neigh_conf.ip_address,
673 self._neigh_conf.remote_as)
675 def _run(self, client_factory):
676 LOG.debug('Started peer %s', self)
677 self._client_factory = client_factory
679 # Tries actively to establish session if CONNECT_MODE is not PASSIVE
680 self._on_update_connect_mode(self._neigh_conf.connect_mode)
682 # Start sink processing
683 self._process_outgoing_msg_list()
685 def _send_outgoing_route_refresh_msg(self, rr_msg):
686 """Sends given message `rr_msg` to peer.
689 - rr_msg: (RouteRefresh) route refresh message to send to peer.
691 Update appropriate counters and set appropriate timers.
693 assert rr_msg.type == BGP_MSG_ROUTE_REFRESH
694 self._protocol.send(rr_msg)
695 LOG.debug('RouteRefresh %s>> %s',
696 self._neigh_conf.ip_address, rr_msg)
697 # Collect update statistics for sent refresh request.
698 if rr_msg.demarcation == 0:
699 self.state.incr(PeerCounterNames.SENT_REFRESH)
700 # If SOR is sent, we set Max. EOR timer if needed.
701 elif (rr_msg.demarcation == 1 and
702 self._common_conf.refresh_max_eor_time != 0):
703 eor_timer = self._common_conf.refresh_max_eor_time
704 # Set timer to send EOR demarcation.
705 self._spawn_after('end-of-rib-timer', eor_timer,
706 self._enqueue_eor_msg, rr_msg)
707 LOG.debug('Enhanced RR max. EOR timer set.')
709 def _send_outgoing_route(self, outgoing_route):
710 """Constructs `Update` message from given `outgoing_route` and sends
713 Also, checks if any policies prevent sending this message.
714 Populates Adj-RIB-out with corresponding `SentRoute`.
717 path = outgoing_route.path
718 block, blocked_cause = self._apply_out_filter(path)
720 nlri_str = outgoing_route.path.nlri.formatted_nlri_str
721 sent_route = SentRoute(outgoing_route.path, self, block)
722 self._adj_rib_out[nlri_str] = sent_route
723 self._signal_bus.adj_rib_out_changed(self, sent_route)
725 # TODO(PH): optimized by sending several prefixes per update.
726 # Construct and send update message.
728 update_msg = self._construct_update(outgoing_route)
729 self._protocol.send(update_msg)
730 # Collect update statistics.
731 self.state.incr(PeerCounterNames.SENT_UPDATES)
733 LOG.debug('prefix : %s is not sent by filter : %s',
734 path.nlri, blocked_cause)
736 # We have to create sent_route for every OutgoingRoute which is
737 # not a withdraw or was for route-refresh msg.
738 if (not outgoing_route.path.is_withdraw and
739 not outgoing_route.for_route_refresh):
740 # Update the destination with new sent route.
741 tm = self._core_service.table_manager
742 tm.remember_sent_route(sent_route)
744 def _process_outgoing_msg_list(self):
748 if self._protocol is not None:
749 # We pick the first outgoing msg. available and send it.
750 outgoing_msg = self.outgoing_msg_list.pop_first()
752 # If we do not have any outgoing route, we wait.
753 if outgoing_msg is None:
754 self.outgoing_msg_event.clear()
755 self.outgoing_msg_event.wait()
758 # Check currently supported out-going msgs.
761 (BGPRouteRefresh, BGPUpdate, OutgoingRoute)
762 ), ('Peer cannot process object: %s in its outgoing queue'
766 if isinstance(outgoing_msg, BGPRouteRefresh):
767 self._send_outgoing_route_refresh_msg(outgoing_msg)
768 elif isinstance(outgoing_msg, OutgoingRoute):
769 self._send_outgoing_route(outgoing_msg)
771 # EOR are enqueued as plain Update messages.
772 elif isinstance(outgoing_msg, BGPUpdate):
773 self._protocol.send(outgoing_msg)
774 LOG.debug('Update %s>> %s', self._neigh_conf.ip_address,
776 self.state.incr(PeerCounterNames.SENT_UPDATES)
778 def request_route_refresh(self, *route_families):
779 """Request route refresh to peer for given `route_families`.
781 If no `route_families` are given, we make request for all supported
782 route families with this peer.
784 - `route_families`: list of route families to request route
787 If this peer is currently not in Established state, we raise exception.
788 If any of the `route_families` are invalid we raise exception.
790 # If this peer has not established session yet
791 if not self.in_established:
792 raise ValueError('Peer not in established state to satisfy'
795 skip_validation = False
796 # If request is made for all supported route_families for current
797 # session, we collect all route_families for valid for current session.
798 if len(route_families) == 0:
800 # We skip validation of route families that we collect ourselves
802 skip_validation = True
803 for route_family in SUPPORTED_GLOBAL_RF:
804 if self.is_mbgp_cap_valid(route_family):
805 route_families.append(route_family)
807 for route_family in route_families:
808 if (skip_validation or
809 ((route_family in SUPPORTED_GLOBAL_RF) and
810 # We ignore request for route_family not valid
811 # for current session.
812 self._protocol.is_mbgp_cap_valid(route_family))):
813 rr_req = BGPRouteRefresh(route_family.afi, route_family.safi)
814 self.enque_outgoing_msg(rr_req)
815 LOG.debug('Enqueued Route Refresh message to '
816 'peer %s for rf: %s', self, route_family)
818 def enque_end_of_rib(self, route_family):
819 # MP_UNREACH_NLRI Attribute.
820 mpunreach_attr = BGPPathAttributeMpUnreachNLRI(route_family.afi,
823 update = BGPUpdate(path_attributes=[mpunreach_attr])
824 self.enque_outgoing_msg(update)
826 def _session_next_hop(self, path):
827 """Returns nexthop address relevant to current session
829 Nexthop used can depend on capabilities of the session. If VPNv6
830 capability is active and session is on IPv4 connection, we have to use
831 IPv4 mapped IPv6 address. In other cases we can use connection end
832 point/local ip address.
834 route_family = path.route_family
836 # By default we use BGPS's interface IP with this peer as next_hop.
837 if self._neigh_conf.next_hop:
838 next_hop = self._neigh_conf.next_hop
840 next_hop = self.host_bind_ip
841 if route_family == RF_IPv6_VPN:
842 next_hop = self._ipv4_mapped_ipv6(next_hop)
847 def _ipv4_mapped_ipv6(ipv4_address):
848 # Next hop ipv4_mapped ipv6
849 from netaddr import IPAddress
850 return str(IPAddress(ipv4_address).ipv6())
852 def _construct_as_path_attr(self, as_path_attr, as4_path_attr):
853 """Marge AS_PATH and AS4_PATH attribute instances into
854 a single AS_PATH instance."""
857 """Reconstruct AS_PATH list.
861 >>> _listify([[1, 2, 3], {4, 5}, [6, 7]])
862 [1, 2, 3, {4, 5}, 6, 7]
866 if isinstance(l, list):
868 elif isinstance(l, set):
874 # If AS4_PATH attribute is None, returns the given AS_PATH attribute
875 if as4_path_attr is None:
878 # If AS_PATH is shorter than AS4_PATH, AS4_PATH should be ignored.
879 if as_path_attr.get_as_path_len() < as4_path_attr.get_as_path_len():
882 org_as_path_list = _listify(as_path_attr.path_seg_list)
883 as4_path_list = _listify(as4_path_attr.path_seg_list)
885 # Reverse to compare backward.
886 org_as_path_list.reverse()
887 as4_path_list.reverse()
889 new_as_path_list = []
891 for as_path, as4_path in zip_longest(org_as_path_list, as4_path_list):
893 if isinstance(as_path, int):
894 tmp_list.insert(0, as_path)
895 elif isinstance(as_path, set):
897 new_as_path_list.insert(0, tmp_list)
899 new_as_path_list.insert(0, as_path)
902 elif isinstance(as4_path, int):
903 tmp_list.insert(0, as4_path)
904 elif isinstance(as4_path, set):
906 new_as_path_list.insert(0, tmp_list)
908 new_as_path_list.insert(0, as4_path)
912 new_as_path_list.insert(0, tmp_list)
914 return bgp.BGPPathAttributeAsPath(new_as_path_list)
916 def _trans_as_path(self, as_path_list):
917 """Translates Four-Octet AS number to AS_TRANS and separates
918 AS_PATH list into AS_PATH and AS4_PATH lists if needed.
920 If the neighbor does not support Four-Octet AS number,
921 this method constructs AS4_PATH list from AS_PATH list and swaps
922 non-mappable AS number in AS_PATH with AS_TRANS, then
923 returns AS_PATH list and AS4_PATH list.
924 If the neighbor supports Four-Octet AS number, returns
925 the given AS_PATH list and None.
929 if is_valid_old_asn(n):
936 # If the neighbor supports Four-Octet AS number, returns
937 # the given AS_PATH list and None.
938 if self.is_four_octet_as_number_cap_valid():
939 return as_path_list, None
941 # If the neighbor does not support Four-Octet AS number,
942 # constructs AS4_PATH list from AS_PATH list and swaps
943 # non-mappable AS number in AS_PATH with AS_TRANS.
945 new_as_path_list = []
946 for as_path in as_path_list:
947 if isinstance(as_path, set):
949 for as_num in as_path:
950 path_set.add(_swap(as_num))
951 new_as_path_list.append(path_set)
952 elif isinstance(as_path, list):
954 for as_num in as_path:
955 path_list.append(_swap(as_num))
956 new_as_path_list.append(path_list)
958 # Ignore invalid as_path type
961 # If all of the AS_PATH list is composed of mappable four-octet
962 # AS numbers only, returns the given AS_PATH list
963 # Assumption: If the constructed AS_PATH list is the same as
964 # the given AS_PATH list, all AS number is mappable.
965 if as_path_list == new_as_path_list:
966 return as_path_list, None
968 return new_as_path_list, as_path_list
970 def _construct_update(self, outgoing_route):
971 """Construct update message with Outgoing-routes path attribute
972 appropriately cloned/copied/updated.
975 path = outgoing_route.path
976 # Get copy of path's path attributes.
977 pathattr_map = path.pathattr_map
981 if isinstance(path, Ipv4Path):
982 update = BGPUpdate(withdrawn_routes=[path.nlri])
985 mpunreach_attr = BGPPathAttributeMpUnreachNLRI(
986 path.route_family.afi, path.route_family.safi, [path.nlri]
988 new_pathattr.append(mpunreach_attr)
989 elif self.is_route_server_client:
990 nlri_list = [path.nlri]
991 new_pathattr.extend(pathattr_map.values())
993 if self.is_route_reflector_client:
994 # Append ORIGINATOR_ID attribute if not already exist.
995 if BGP_ATTR_TYPE_ORIGINATOR_ID not in pathattr_map:
996 originator_id = path.source
997 if originator_id is None:
998 originator_id = self._common_conf.router_id
999 elif isinstance(path.source, Peer):
1000 originator_id = path.source.ip_address
1001 new_pathattr.append(
1002 BGPPathAttributeOriginatorId(value=originator_id))
1004 # Preppend own CLUSTER_ID into CLUSTER_LIST attribute if exist.
1005 # Otherwise append CLUSTER_LIST attribute.
1006 cluster_lst_attr = pathattr_map.get(BGP_ATTR_TYPE_CLUSTER_LIST)
1007 if cluster_lst_attr:
1008 cluster_list = list(cluster_lst_attr.value)
1009 if self._common_conf.cluster_id not in cluster_list:
1010 cluster_list.insert(0, self._common_conf.cluster_id)
1011 new_pathattr.append(
1012 BGPPathAttributeClusterList(cluster_list))
1014 new_pathattr.append(
1015 BGPPathAttributeClusterList(
1016 [self._common_conf.cluster_id]))
1018 # Supported and un-supported/unknown attributes.
1022 as4_path_attr = None
1023 aggregator_attr = None
1024 as4_aggregator_attr = None
1026 community_attr = None
1027 localpref_attr = None
1028 pmsi_tunnel_attr = None
1029 unknown_opttrans_attrs = None
1030 nlri_list = [path.nlri]
1032 if path.route_family.safi in (subaddr_family.IP_FLOWSPEC,
1033 subaddr_family.VPN_FLOWSPEC):
1034 # Flow Specification does not have next_hop.
1036 elif self.is_ebgp_peer():
1037 next_hop = self._session_next_hop(path)
1038 if path.is_local() and path.has_nexthop():
1039 next_hop = path.nexthop
1041 next_hop = path.nexthop
1042 # RFC 4271 allows us to change next_hop
1043 # if configured to announce its own ip address.
1044 # Also if the BGP route is configured without next_hop,
1045 # we use path._session_next_hop() as next_hop.
1046 if (self._neigh_conf.is_next_hop_self
1047 or (path.is_local() and not path.has_nexthop())):
1048 next_hop = self._session_next_hop(path)
1049 LOG.debug('using %s as a next_hop address instead'
1050 ' of path.nexthop %s', next_hop, path.nexthop)
1052 nexthop_attr = BGPPathAttributeNextHop(next_hop)
1053 assert nexthop_attr, 'Missing NEXTHOP mandatory attribute.'
1055 if not isinstance(path, Ipv4Path):
1056 # We construct mpreach-nlri attribute.
1057 mpnlri_attr = BGPPathAttributeMpReachNLRI(
1058 path.route_family.afi,
1059 path.route_family.safi,
1065 # According to RFC this attribute value SHOULD NOT be changed by
1066 # any other speaker.
1067 origin_attr = pathattr_map.get(BGP_ATTR_TYPE_ORIGIN)
1068 assert origin_attr, 'Missing ORIGIN mandatory attribute.'
1070 # AS_PATH Attribute.
1071 # Construct AS-path-attr using paths AS_PATH attr. with local AS as
1073 path_aspath = pathattr_map.get(BGP_ATTR_TYPE_AS_PATH)
1074 assert path_aspath, 'Missing AS_PATH mandatory attribute.'
1075 # Deep copy AS_PATH attr value
1076 as_path_list = path_aspath.path_seg_list
1077 # If this is a iBGP peer.
1078 if not self.is_ebgp_peer():
1079 # When a given BGP speaker advertises the route to an internal
1080 # peer, the advertising speaker SHALL NOT modify the AS_PATH
1081 # attribute associated with the route.
1084 # When a given BGP speaker advertises the route to an external
1085 # peer, the advertising speaker updates the AS_PATH attribute
1087 # 1) if the first path segment of the AS_PATH is of type
1088 # AS_SEQUENCE, the local system prepends its own AS num as
1089 # the last element of the sequence (put it in the left-most
1090 # position with respect to the position of octets in the
1091 # protocol message). If the act of prepending will cause an
1092 # overflow in the AS_PATH segment (i.e., more than 255
1093 # ASes), it SHOULD prepend a new segment of type AS_SEQUENCE
1094 # and prepend its own AS number to this new segment.
1096 # 2) if the first path segment of the AS_PATH is of type AS_SET
1097 # , the local system prepends a new path segment of type
1098 # AS_SEQUENCE to the AS_PATH, including its own AS number in
1101 # 3) if the AS_PATH is empty, the local system creates a path
1102 # segment of type AS_SEQUENCE, places its own AS into that
1103 # segment, and places that segment into the AS_PATH.
1104 if (len(as_path_list) > 0 and
1105 isinstance(as_path_list[0], list) and
1106 len(as_path_list[0]) < 255):
1107 as_path_list[0].insert(0, self.local_as)
1109 as_path_list.insert(0, [self.local_as])
1110 # Construct AS4_PATH list from AS_PATH list and swap
1111 # non-mappable AS number with AS_TRANS in AS_PATH.
1112 as_path_list, as4_path_list = self._trans_as_path(
1114 # If the neighbor supports Four-Octet AS number, send AS_PATH
1116 if self.is_four_octet_as_number_cap_valid():
1117 as_path_attr = BGPPathAttributeAsPath(
1118 as_path_list, as_pack_str='!I') # specify Four-Octet.
1119 # Otherwise, send AS_PATH in Two-Octet.
1121 as_path_attr = BGPPathAttributeAsPath(as_path_list)
1122 # If needed, send AS4_PATH attribute.
1124 as4_path_attr = BGPPathAttributeAs4Path(as4_path_list)
1126 # AGGREGATOR Attribute.
1127 aggregator_attr = pathattr_map.get(BGP_ATTR_TYPE_AGGREGATOR)
1128 # If the neighbor does not support Four-Octet AS number,
1129 # swap non-mappable AS number with AS_TRANS.
1130 if (aggregator_attr and
1131 not self.is_four_octet_as_number_cap_valid()):
1132 # If AS number of AGGREGATOR is Four-Octet AS number,
1133 # swap with AS_TRANS, else do not.
1134 aggregator_as_number = aggregator_attr.as_number
1135 if not is_valid_old_asn(aggregator_as_number):
1136 aggregator_attr = bgp.BGPPathAttributeAggregator(
1137 bgp.AS_TRANS, aggregator_attr.addr)
1138 as4_aggregator_attr = bgp.BGPPathAttributeAs4Aggregator(
1139 aggregator_as_number, aggregator_attr.addr)
1141 # MULTI_EXIT_DISC Attribute.
1142 # For eBGP session we can send multi-exit-disc if configured.
1143 multi_exit_disc = None
1144 if self.is_ebgp_peer():
1145 if self._neigh_conf.multi_exit_disc:
1146 multi_exit_disc = BGPPathAttributeMultiExitDisc(
1147 self._neigh_conf.multi_exit_disc
1151 if not self.is_ebgp_peer():
1152 multi_exit_disc = pathattr_map.get(
1153 BGP_ATTR_TYPE_MULTI_EXIT_DISC)
1155 # LOCAL_PREF Attribute.
1156 if not self.is_ebgp_peer():
1157 # For iBGP peers we are required to send local-pref attribute
1158 # for connected or local prefixes. We check if the path matches
1159 # attribute_maps and set local-pref value.
1160 # If the path doesn't match, we set default local-pref given
1161 # from the user. The default value is 100.
1162 localpref_attr = BGPPathAttributeLocalPref(
1163 self._common_conf.local_pref)
1164 key = const.ATTR_MAPS_LABEL_DEFAULT
1166 if isinstance(path, (Vpnv4Path, Vpnv6Path)):
1168 rf = VRF_RF_IPV4 if isinstance(path, Vpnv4Path)\
1170 key = ':'.join([nlri.route_dist, rf])
1172 attr_type = AttributeMap.ATTR_LOCAL_PREF
1173 at_maps = self._attribute_maps.get(key, {})
1174 result = self._lookup_attribute_map(at_maps, attr_type, path)
1176 localpref_attr = result
1178 # COMMUNITY Attribute.
1179 community_attr = pathattr_map.get(BGP_ATTR_TYPE_COMMUNITIES)
1181 # EXTENDED COMMUNITY Attribute.
1182 # Construct ExtCommunity path-attr based on given.
1183 path_extcomm_attr = pathattr_map.get(
1184 BGP_ATTR_TYPE_EXTENDED_COMMUNITIES
1186 if path_extcomm_attr:
1187 # SOO list can be configured per VRF and/or per Neighbor.
1188 # NeighborConf has this setting we add this to existing list.
1189 communities = path_extcomm_attr.communities
1190 if self._neigh_conf.soo_list:
1191 # construct extended community
1192 soo_list = self._neigh_conf.soo_list
1194 for soo in soo_list:
1195 first, second = soo.split(':')
1197 c = BGPIPv4AddressSpecificExtendedCommunity(
1200 local_administrator=int(second))
1202 c = BGPTwoOctetAsSpecificExtendedCommunity(
1204 as_number=int(first),
1205 local_administrator=int(second))
1206 communities.append(c)
1208 extcomm_attr = BGPPathAttributeExtendedCommunities(
1209 communities=communities
1212 pmsi_tunnel_attr = pathattr_map.get(
1213 BGP_ATTR_TYEP_PMSI_TUNNEL_ATTRIBUTE
1216 # UNKNOWN Attributes.
1217 # Get optional transitive path attributes
1218 unknown_opttrans_attrs = bgp_utils.get_unknown_opttrans_attr(path)
1220 # Ordering path attributes according to type as RFC says. We set
1221 # MPReachNLRI first as advised by experts as a new trend in BGP
1223 if isinstance(path, Ipv4Path):
1224 new_pathattr.append(nexthop_attr)
1226 new_pathattr.append(mpnlri_attr)
1228 new_pathattr.append(origin_attr)
1229 new_pathattr.append(as_path_attr)
1231 new_pathattr.append(as4_path_attr)
1233 new_pathattr.append(aggregator_attr)
1234 if as4_aggregator_attr:
1235 new_pathattr.append(as4_aggregator_attr)
1237 new_pathattr.append(multi_exit_disc)
1239 new_pathattr.append(localpref_attr)
1241 new_pathattr.append(community_attr)
1243 new_pathattr.append(extcomm_attr)
1244 if pmsi_tunnel_attr:
1245 new_pathattr.append(pmsi_tunnel_attr)
1246 if unknown_opttrans_attrs:
1247 new_pathattr.extend(unknown_opttrans_attrs.values())
1249 if isinstance(path, Ipv4Path):
1250 update = BGPUpdate(path_attributes=new_pathattr,
1253 update = BGPUpdate(path_attributes=new_pathattr)
1256 def _connect_loop(self, client_factory):
1257 """In the current greenlet we try to establish connection with peer.
1259 This greenlet will spin another greenlet to handle incoming data
1260 from the peer once connection is established.
1262 # If current configuration allow, enable active session establishment.
1263 if self._neigh_conf.enabled:
1264 self._connect_retry_event.set()
1267 self._connect_retry_event.wait()
1269 # Reconnecting immediately after closing connection may be not very
1270 # well seen by some peers (ALU?)
1272 if self.state.bgp_state in \
1273 (const.BGP_FSM_IDLE, const.BGP_FSM_ACTIVE):
1275 # Check if we have to stop or retry
1276 self.state.bgp_state = const.BGP_FSM_CONNECT
1277 # If we have specific host interface to bind to, we will do so
1278 # else we will bind to system default.
1279 if self._neigh_conf.host_bind_ip and \
1280 self._neigh_conf.host_bind_port:
1281 bind_addr = (self._neigh_conf.host_bind_ip,
1282 self._neigh_conf.host_bind_port)
1285 peer_address = (self._neigh_conf.ip_address,
1286 self._neigh_conf.port)
1289 LOG.debug('%s trying to connect from'
1290 '%s to %s', self, bind_addr, peer_address)
1292 LOG.debug('%s trying to connect to %s', self, peer_address)
1293 tcp_conn_timeout = self._common_conf.tcp_conn_timeout
1295 password = self._neigh_conf.password
1296 self._connect_tcp(peer_address,
1298 time_out=tcp_conn_timeout,
1299 bind_address=bind_addr,
1301 except socket.error:
1302 self.state.bgp_state = const.BGP_FSM_ACTIVE
1303 if LOG.isEnabledFor(logging.DEBUG):
1304 LOG.debug('Socket could not be created in time'
1305 ' (%s secs), reason %s', tcp_conn_timeout,
1306 traceback.format_exc())
1307 LOG.info('Will try to reconnect to %s after %s secs: %s',
1308 self._neigh_conf.ip_address,
1309 self._common_conf.bgp_conn_retry_time,
1310 self._connect_retry_event.is_set())
1312 self.pause(self._common_conf.bgp_conn_retry_time)
1314 def _set_protocol(self, proto):
1315 self._protocol = proto
1317 # Update state attributes
1318 self.state.peer_ip, self.state.peer_port = self._protocol._remotename
1319 self.state.local_ip, self.state.local_port = self._protocol._localname
1320 # self.state.bgp_state = self._protocol.state
1321 # Stop connect_loop retry timer as we are now connected
1322 if self._protocol and self._connect_retry_event.is_set():
1323 self._connect_retry_event.clear()
1324 LOG.debug('Connect retry event for %s is cleared', self)
1326 if self._protocol and self.outgoing_msg_event.is_set():
1327 # Start processing sink.
1328 self.outgoing_msg_event.set()
1329 LOG.debug('Processing of outgoing msg. started for %s.', self)
1331 def _send_collision_err_and_stop(self, protocol):
1332 code = BGP_ERROR_CEASE
1333 subcode = BGP_ERROR_SUB_CONNECTION_COLLISION_RESOLUTION
1334 self._signal_bus.bgp_error(self, code, subcode, None)
1335 protocol.send_notification(code, subcode)
1338 def bind_protocol(self, proto):
1339 """Tries to bind given protocol to this peer.
1341 Should only be called by `proto` trying to bind.
1342 Once bound this protocol instance will be used to communicate with
1343 peer. If another protocol is already bound, connection collision
1344 resolution takes place.
1346 LOG.debug('Trying to bind protocol %s to peer %s', proto, self)
1348 if not isinstance(proto, BgpProtocol):
1349 raise ValueError('Currently only supports valid instances of'
1352 if proto.state != const.BGP_FSM_OPEN_CONFIRM:
1353 raise ValueError('Only protocols in OpenConfirm state can be'
1356 # If we are not bound to any protocol
1358 if not self._protocol:
1359 self._set_protocol(proto)
1362 # If existing protocol is already established, we raise exception.
1363 if self.state.bgp_state != const.BGP_FSM_IDLE:
1364 LOG.debug('Currently in %s state, hence will send collision'
1365 ' Notification to close this protocol.',
1366 self.state.bgp_state)
1367 self._send_collision_err_and_stop(proto)
1370 # If we have a collision that need to be resolved
1371 assert proto.is_colliding(self._protocol), \
1372 ('Tried to bind second protocol that is not colliding with '
1373 'first/bound protocol')
1374 LOG.debug('Currently have one protocol in %s state and '
1375 'another protocol in %s state',
1376 self._protocol.state, proto.state)
1377 # Protocol that is already bound
1378 first_protocol = self._protocol
1379 assert ((first_protocol.is_reactive and not proto.is_reactive) or
1380 (proto.is_reactive and not first_protocol.is_reactive))
1381 # Connection initiated by peer.
1382 reactive_proto = None
1383 # Connection initiated locally.
1384 proactive_proto = None
1385 # Identify which protocol was initiated by which peer.
1386 if proto.is_reactive:
1387 reactive_proto = proto
1388 proactive_proto = self._protocol
1390 reactive_proto = self._protocol
1391 proactive_proto = proto
1393 LOG.debug('Pro-active/Active protocol %s', proactive_proto)
1394 # We compare bgp local and remote router id and keep the protocol
1395 # that was initiated by peer with highest id.
1396 if proto.is_local_router_id_greater():
1397 self._set_protocol(proactive_proto)
1399 self._set_protocol(reactive_proto)
1401 if self._protocol is not proto:
1402 # If new proto did not win collision we return False to
1406 # If first protocol did not win collision resolution we
1407 # we send notification to peer and stop it
1408 self._send_collision_err_and_stop(first_protocol)
1413 def create_open_msg(self):
1414 """Create `Open` message using current settings.
1416 Current setting include capabilities, timers and ids.
1418 asnum = self.local_as
1419 # If local AS number is not Two-Octet AS number, swaps with AS_TRANS.
1420 if not is_valid_old_asn(asnum):
1421 asnum = bgp.AS_TRANS
1422 bgpid = self._common_conf.router_id
1423 holdtime = self._neigh_conf.hold_time
1426 if isinstance(L, list):
1427 for i in range(len(L)):
1428 for e in flatten(L[i]):
1432 opts = list(flatten(
1433 list(self._neigh_conf.get_configured_capabilities().values())))
1436 bgp_identifier=bgpid,
1437 version=const.BGP_VERSION_NUM,
1443 def _validate_update_msg(self, update_msg):
1444 """Validate update message as per RFC.
1446 Here we validate the message after it has been parsed. Message
1447 has already been validated against some errors inside parsing
1450 # TODO(PH): finish providing implementation, currently low priority
1451 assert update_msg.type == BGP_MSG_UPDATE
1452 # An UPDATE message may be received only in the Established state.
1453 # Receiving an UPDATE message in any other state is an error.
1454 if self.state.bgp_state != const.BGP_FSM_ESTABLISHED:
1455 LOG.error('Received UPDATE message when not in ESTABLISHED'
1457 raise bgp.FiniteStateMachineError()
1459 mp_reach_attr = update_msg.get_path_attr(
1460 BGP_ATTR_TYPE_MP_REACH_NLRI
1462 mp_unreach_attr = update_msg.get_path_attr(
1463 BGP_ATTR_TYPE_MP_UNREACH_NLRI
1466 # non-MPBGP Update msg.
1467 if not (mp_reach_attr or mp_unreach_attr):
1468 if not self.is_mpbgp_cap_valid(RF_IPv4_UC):
1469 LOG.error('Got UPDATE message with un-available'
1470 ' afi/safi %s', RF_IPv4_UC)
1471 nlri_list = update_msg.nlri
1472 if len(nlri_list) > 0:
1473 # Check for missing well-known mandatory attributes.
1474 aspath = update_msg.get_path_attr(BGP_ATTR_TYPE_AS_PATH)
1476 raise bgp.MissingWellKnown(
1477 BGP_ATTR_TYPE_AS_PATH)
1479 if (self.check_first_as and self.is_ebgp_peer() and
1480 not aspath.has_matching_leftmost(self.remote_as)):
1481 LOG.error('First AS check fails. Raise appropriate'
1483 raise bgp.MalformedAsPath()
1485 origin = update_msg.get_path_attr(BGP_ATTR_TYPE_ORIGIN)
1487 raise bgp.MissingWellKnown(BGP_ATTR_TYPE_ORIGIN)
1489 nexthop = update_msg.get_path_attr(BGP_ATTR_TYPE_NEXT_HOP)
1491 raise bgp.MissingWellKnown(BGP_ATTR_TYPE_NEXT_HOP)
1495 # Check if received MP_UNREACH path attribute is of available afi/safi
1497 if not self.is_mpbgp_cap_valid(mp_unreach_attr.route_family):
1498 LOG.error('Got UPDATE message with un-available afi/safi for'
1499 ' MP_UNREACH path attribute (non-negotiated'
1500 ' afi/safi) %s', mp_unreach_attr.route_family)
1501 # raise bgp.OptAttrError()
1504 # Check if received MP_REACH path attribute is of available
1506 if not self.is_mpbgp_cap_valid(mp_reach_attr.route_family):
1507 LOG.error('Got UPDATE message with un-available afi/safi for'
1508 ' MP_UNREACH path attribute (non-negotiated'
1509 ' afi/safi) %s', mp_reach_attr.route_family)
1510 # raise bgp.OptAttrError()
1512 # Check for missing well-known mandatory attributes.
1513 aspath = update_msg.get_path_attr(BGP_ATTR_TYPE_AS_PATH)
1515 raise bgp.MissingWellKnown(BGP_ATTR_TYPE_AS_PATH)
1517 if (self.check_first_as and self.is_ebgp_peer() and
1518 not aspath.has_matching_leftmost(self.remote_as)):
1519 LOG.error('First AS check fails. Raise appropriate exception.')
1520 raise bgp.MalformedAsPath()
1522 origin = update_msg.get_path_attr(BGP_ATTR_TYPE_ORIGIN)
1524 raise bgp.MissingWellKnown(BGP_ATTR_TYPE_ORIGIN)
1526 # Validate Next hop.
1527 if mp_reach_attr.route_family.safi in (
1528 subaddr_family.IP_FLOWSPEC,
1529 subaddr_family.VPN_FLOWSPEC):
1530 # Because the Flow Specification does not have nexthop,
1533 elif (not mp_reach_attr.next_hop or
1534 mp_reach_attr.next_hop == self.host_bind_ip):
1535 LOG.error('Nexthop of received UPDATE msg. (%s) same as local'
1536 ' interface address %s.',
1537 mp_reach_attr.next_hop,
1543 def _handle_update_msg(self, update_msg):
1544 """Extracts and processes new paths or withdrawals in given
1548 - `update_msg`: update message to process.
1549 - `valid_rts`: current valid/interesting rts to the application
1550 according to configuration of all VRFs.
1551 Assumes Multiprotocol Extensions capability is supported and enabled.
1553 assert self.state.bgp_state == const.BGP_FSM_ESTABLISHED
1555 # Increment count of update received.
1556 self.state.incr(PeerCounterNames.RECV_UPDATES)
1558 if not self._validate_update_msg(update_msg):
1559 # If update message was not valid for some reason, we ignore its
1561 LOG.error('UPDATE message was invalid, hence ignoring its routes.')
1564 # Extract advertised path attributes and reconstruct AS_PATH attribute
1565 self._extract_and_reconstruct_as_path(update_msg)
1567 # Check if path attributes have loops.
1568 if self._is_looped_path_attrs(update_msg):
1571 umsg_pattrs = update_msg.pathattr_map
1572 mp_reach_attr = umsg_pattrs.get(BGP_ATTR_TYPE_MP_REACH_NLRI, None)
1574 # Extract advertised MP-BGP paths from given message.
1575 self._extract_and_handle_mpbgp_new_paths(update_msg)
1577 mp_unreach_attr = umsg_pattrs.get(BGP_ATTR_TYPE_MP_UNREACH_NLRI, None)
1579 # Extract MP-BGP withdraws from given message.
1580 self._extract_and_handle_mpbgp_withdraws(mp_unreach_attr)
1582 nlri_list = update_msg.nlri
1584 # Extract advertised BGP paths from given message.
1585 self._extract_and_handle_bgp4_new_paths(update_msg)
1587 withdraw_list = update_msg.withdrawn_routes
1589 # Extract BGP withdraws from given message.
1590 self._extract_and_handle_bgp4_withdraws(withdraw_list)
1592 def _extract_and_reconstruct_as_path(self, update_msg):
1593 """Extracts advertised AS path attributes in the given update message
1594 and reconstructs AS_PATH from AS_PATH and AS4_PATH if needed."""
1595 umsg_pattrs = update_msg.pathattr_map
1597 as_aggregator = umsg_pattrs.get(BGP_ATTR_TYPE_AGGREGATOR, None)
1598 as4_aggregator = umsg_pattrs.get(BGP_ATTR_TYPE_AS4_AGGREGATOR, None)
1599 if as_aggregator and as4_aggregator:
1600 # When both AGGREGATOR and AS4_AGGREGATOR are received,
1601 # if the AS number in the AGGREGATOR attribute is not AS_TRANS,
1603 # - the AS4_AGGREGATOR attribute and the AS4_PATH attribute SHALL
1605 # - the AGGREGATOR attribute SHALL be taken as the information
1606 # about the aggregating node, and
1607 # - the AS_PATH attribute SHALL be taken as the AS path
1609 if as_aggregator.as_number != bgp.AS_TRANS:
1610 update_msg.path_attributes.remove(as4_aggregator)
1611 as4_path = umsg_pattrs.pop(BGP_ATTR_TYPE_AS4_PATH, None)
1613 update_msg.path_attributes.remove(as4_path)
1615 # - the AGGREGATOR attribute SHALL be ignored,
1616 # - the AS4_AGGREGATOR attribute SHALL be taken as the
1617 # information about the aggregating node, and
1618 # - the AS path information would need to be constructed,
1619 # as in all other cases.
1621 update_msg.path_attributes.remove(as_aggregator)
1622 update_msg.path_attributes.remove(as4_aggregator)
1623 update_msg.path_attributes.append(
1624 bgp.BGPPathAttributeAggregator(
1625 as_number=as4_aggregator.as_number,
1626 addr=as4_aggregator.addr,
1630 as_path = umsg_pattrs.get(BGP_ATTR_TYPE_AS_PATH, None)
1631 as4_path = umsg_pattrs.get(BGP_ATTR_TYPE_AS4_PATH, None)
1632 if as_path and as4_path:
1633 # If the number of AS numbers in the AS_PATH attribute is
1634 # less than the number of AS numbers in the AS4_PATH attribute,
1635 # then the AS4_PATH attribute SHALL be ignored, and the AS_PATH
1636 # attribute SHALL be taken as the AS path information.
1637 if as_path.get_as_path_len() < as4_path.get_as_path_len():
1638 update_msg.path_attributes.remove(as4_path)
1640 # If the number of AS numbers in the AS_PATH attribute is larger
1641 # than or equal to the number of AS numbers in the AS4_PATH
1642 # attribute, then the AS path information SHALL be constructed
1643 # by taking as many AS numbers and path segments as necessary
1644 # from the leading part of the AS_PATH attribute, and then
1645 # prepending them to the AS4_PATH attribute so that the AS path
1646 # information has a number of AS numbers identical to that of
1647 # the AS_PATH attribute.
1649 update_msg.path_attributes.remove(as_path)
1650 update_msg.path_attributes.remove(as4_path)
1651 as_path = self._construct_as_path_attr(as_path, as4_path)
1652 update_msg.path_attributes.append(as_path)
1654 def _is_looped_path_attrs(self, update_msg):
1656 Extracts path attributes from the given UPDATE message and checks
1657 if the given attributes have loops or not.
1659 :param update_msg: UPDATE message instance.
1660 :return: True if attributes have loops. Otherwise False.
1662 umsg_pattrs = update_msg.pathattr_map
1663 recv_open_msg = self.protocol.recv_open_msg
1665 # Check if AS_PATH has loops.
1666 aspath = umsg_pattrs.get(BGP_ATTR_TYPE_AS_PATH)
1667 if (aspath is not None
1668 and aspath.has_local_as(
1670 max_count=self._common_conf.allow_local_as_in_count)):
1672 'AS_PATH on UPDATE message has loops. '
1673 'Ignoring this message: %s',
1677 # Check if ORIGINATOR_ID has loops. [RFC4456]
1678 originator_id = umsg_pattrs.get(BGP_ATTR_TYPE_ORIGINATOR_ID, None)
1680 and recv_open_msg.bgp_identifier == originator_id):
1682 'ORIGINATOR_ID on UPDATE message has loops. '
1683 'Ignoring this message: %s',
1687 # Check if CLUSTER_LIST has loops. [RFC4456]
1688 cluster_list = umsg_pattrs.get(BGP_ATTR_TYPE_CLUSTER_LIST, None)
1690 and self._common_conf.cluster_id in cluster_list.value):
1692 'CLUSTER_LIST on UPDATE message has loops. '
1693 'Ignoring this message: %s', update_msg)
1696 def _extract_and_handle_bgp4_new_paths(self, update_msg):
1697 """Extracts new paths advertised in the given update message's
1698 *MpReachNlri* attribute.
1700 Assumes MPBGP capability is enabled and message was validated.
1702 - update_msg: (Update) is assumed to be checked for all bgp
1704 - valid_rts: (iterable) current valid/configured RTs.
1706 Extracted paths are added to appropriate *Destination* for further
1709 umsg_pattrs = update_msg.pathattr_map
1710 next_hop = update_msg.get_path_attr(BGP_ATTR_TYPE_NEXT_HOP).value
1712 # Nothing to do if we do not have any new NLRIs in this message.
1713 msg_nlri_list = update_msg.nlri
1714 if not msg_nlri_list:
1715 LOG.debug('Update message did not have any new MP_REACH_NLRIs.')
1718 # Create path instances for each NLRI from the update message.
1719 for msg_nlri in msg_nlri_list:
1720 LOG.debug('NLRI: %s', msg_nlri)
1721 new_path = bgp_utils.create_path(
1727 LOG.debug('Extracted paths from Update msg.: %s', new_path)
1729 block, blocked_cause = self._apply_in_filter(new_path)
1731 nlri_str = new_path.nlri.formatted_nlri_str
1732 received_route = ReceivedRoute(new_path, self, block)
1733 self._adj_rib_in[nlri_str] = received_route
1734 self._signal_bus.adj_rib_in_changed(self, received_route)
1737 # Update appropriate table with new paths.
1738 tm = self._core_service.table_manager
1739 tm.learn_path(new_path)
1741 LOG.debug('prefix : %s is blocked by in-bound filter: %s',
1742 msg_nlri, blocked_cause)
1744 # If update message had any qualifying new paths, do some book-keeping.
1746 # Update prefix statistics.
1747 self.state.incr(PeerCounterNames.RECV_PREFIXES,
1748 incr_by=len(msg_nlri_list))
1749 # Check if we exceed max. prefixes allowed for this neighbor.
1750 if self._neigh_conf.exceeds_max_prefix_allowed(
1751 self.state.get_count(PeerCounterNames.RECV_PREFIXES)):
1752 LOG.error('Max. prefix allowed for this neighbor '
1755 def _extract_and_handle_bgp4_withdraws(self, withdraw_list):
1756 """Extracts withdraws advertised in the given update message's
1757 *MpUnReachNlri* attribute.
1759 Assumes MPBGP capability is enabled.
1761 - update_msg: (Update) is assumed to be checked for all bgp
1764 Extracted withdraws are added to appropriate *Destination* for further
1768 w_nlris = withdraw_list
1770 # If this is EOR of some kind, handle it
1771 self._handle_eor(msg_rf)
1773 for w_nlri in w_nlris:
1774 w_path = bgp_utils.create_path(
1780 block, blocked_cause = self._apply_in_filter(w_path)
1782 received_route = ReceivedRoute(w_path, self, block)
1783 nlri_str = w_nlri.formatted_nlri_str
1785 if nlri_str in self._adj_rib_in:
1786 del self._adj_rib_in[nlri_str]
1787 self._signal_bus.adj_rib_in_changed(self, received_route)
1790 # Update appropriate table with withdraws.
1791 tm = self._core_service.table_manager
1792 tm.learn_path(w_path)
1794 LOG.debug('prefix : %s is blocked by in-bound filter: %s',
1795 nlri_str, blocked_cause)
1797 def _extract_and_handle_mpbgp_new_paths(self, update_msg):
1798 """Extracts new paths advertised in the given update message's
1799 *MpReachNlri* attribute.
1801 Assumes MPBGP capability is enabled and message was validated.
1803 - update_msg: (Update) is assumed to be checked for all bgp
1805 - valid_rts: (iterable) current valid/configured RTs.
1807 Extracted paths are added to appropriate *Destination* for further
1810 umsg_pattrs = update_msg.pathattr_map
1811 mpreach_nlri_attr = umsg_pattrs.get(BGP_ATTR_TYPE_MP_REACH_NLRI)
1812 assert mpreach_nlri_attr
1814 msg_rf = mpreach_nlri_attr.route_family
1815 # Check if this route family is among supported route families.
1816 if msg_rf not in SUPPORTED_GLOBAL_RF:
1817 LOG.info(('Received route for route family %s which is'
1818 ' not supported. Ignoring paths from this UPDATE: %s') %
1819 (msg_rf, update_msg))
1822 if msg_rf in (RF_IPv4_VPN, RF_IPv6_VPN):
1823 # Check if we have Extended Communities Attribute.
1824 # TODO(PH): Check if RT_NLRI afi/safi will ever have this attribute
1825 ext_comm_attr = umsg_pattrs.get(BGP_ATTR_TYPE_EXTENDED_COMMUNITIES)
1826 # Check if we have at-least one RT is of interest to us.
1827 if not ext_comm_attr:
1828 LOG.info('Missing Extended Communities Attribute. '
1829 'Ignoring paths from this UPDATE: %s', update_msg)
1832 msg_rts = ext_comm_attr.rt_list
1833 # If we do not have any RTs associated with this msg., we do not
1834 # extract any paths.
1836 LOG.info('Received route with no RTs. Ignoring paths in this'
1837 ' UPDATE: %s', update_msg)
1840 # If none of the RTs in the message are of interest, we do not
1841 # extract any paths.
1842 interested_rts = self._core_service.global_interested_rts
1843 if not interested_rts.intersection(msg_rts):
1844 LOG.info('Received route with RT %s that is of no interest to'
1845 ' any VRFs or Peers %s.'
1846 ' Ignoring paths from this UPDATE: %s',
1847 msg_rts, interested_rts, update_msg)
1850 next_hop = mpreach_nlri_attr.next_hop
1852 # Nothing to do if we do not have any new NLRIs in this message.
1853 msg_nlri_list = mpreach_nlri_attr.nlri
1854 if not msg_nlri_list:
1855 LOG.debug('Update message did not have any new MP_REACH_NLRIs.')
1858 # Create path instances for each NLRI from the update message.
1859 for msg_nlri in msg_nlri_list:
1860 new_path = bgp_utils.create_path(
1866 LOG.debug('Extracted paths from Update msg.: %s', new_path)
1868 block, blocked_cause = self._apply_in_filter(new_path)
1870 received_route = ReceivedRoute(new_path, self, block)
1871 nlri_str = msg_nlri.formatted_nlri_str
1872 self._adj_rib_in[nlri_str] = received_route
1873 self._signal_bus.adj_rib_in_changed(self, received_route)
1876 if msg_rf == RF_RTC_UC \
1877 and self._init_rtc_nlri_path is not None:
1878 self._init_rtc_nlri_path.append(new_path)
1880 # Update appropriate table with new paths.
1881 tm = self._core_service.table_manager
1882 tm.learn_path(new_path)
1884 LOG.debug('prefix : %s is blocked by in-bound filter: %s',
1885 msg_nlri, blocked_cause)
1887 # If update message had any qualifying new paths, do some book-keeping.
1889 # Update prefix statistics.
1890 self.state.incr(PeerCounterNames.RECV_PREFIXES,
1891 incr_by=len(msg_nlri_list))
1892 # Check if we exceed max. prefixes allowed for this neighbor.
1893 if self._neigh_conf.exceeds_max_prefix_allowed(
1894 self.state.get_count(PeerCounterNames.RECV_PREFIXES)):
1895 LOG.error('Max. prefix allowed for this neighbor '
1898 def _extract_and_handle_mpbgp_withdraws(self, mp_unreach_attr):
1899 """Extracts withdraws advertised in the given update message's
1900 *MpUnReachNlri* attribute.
1902 Assumes MPBGP capability is enabled.
1904 - update_msg: (Update) is assumed to be checked for all bgp
1907 Extracted withdraws are added to appropriate *Destination* for further
1910 msg_rf = mp_unreach_attr.route_family
1911 # Check if this route family is among supported route families.
1912 if msg_rf not in SUPPORTED_GLOBAL_RF:
1914 'Received route family %s is not supported. '
1915 'Ignoring withdraw routes on this UPDATE message.',
1919 w_nlris = mp_unreach_attr.withdrawn_routes
1921 # If this is EOR of some kind, handle it
1922 self._handle_eor(msg_rf)
1924 for w_nlri in w_nlris:
1925 w_path = bgp_utils.create_path(
1930 block, blocked_cause = self._apply_in_filter(w_path)
1932 received_route = ReceivedRoute(w_path, self, block)
1933 nlri_str = w_nlri.formatted_nlri_str
1935 if nlri_str in self._adj_rib_in:
1936 del self._adj_rib_in[nlri_str]
1937 self._signal_bus.adj_rib_in_changed(self, received_route)
1940 # Update appropriate table with withdraws.
1941 tm = self._core_service.table_manager
1942 tm.learn_path(w_path)
1944 LOG.debug('prefix : %s is blocked by in-bound filter: %s',
1945 w_nlri, blocked_cause)
1947 def _handle_eor(self, route_family):
1948 """Currently we only handle EOR for RTC address-family.
1950 We send non-rtc initial updates if not already sent.
1952 LOG.debug('Handling EOR for %s', route_family)
1953 # assert (route_family in SUPPORTED_GLOBAL_RF)
1954 # assert self.is_mbgp_cap_valid(route_family)
1956 if route_family == RF_RTC_UC:
1957 self._unschedule_sending_init_updates()
1959 # Learn all rt_nlri at the same time As RT are learned and RT
1960 # filter get updated, qualifying NLRIs are automatically sent to
1961 # peer including initial update
1962 tm = self._core_service.table_manager
1963 for rt_nlri in self._init_rtc_nlri_path:
1964 tm.learn_path(rt_nlri)
1965 # Give chance to process new RT_NLRI so that we have updated RT
1966 # filter for all peer including this peer before we communicate
1967 # NLRIs for other address-families
1969 # Clear collection of initial RTs as we no longer need to wait for
1970 # EOR for RT NLRIs and to indicate that new RT NLRIs should be
1971 # handled in a regular fashion
1972 self._init_rtc_nlri_path = None
1974 def handle_msg(self, msg):
1975 """BGP message handler.
1977 BGP message handling is shared between protocol instance and peer. Peer
1978 only handles limited messages under suitable state. Here we handle
1979 KEEPALIVE, UPDATE and ROUTE_REFRESH messages. UPDATE and ROUTE_REFRESH
1980 messages are handled only after session is established.
1982 if msg.type == BGP_MSG_KEEPALIVE:
1983 # If we receive a Keep Alive message in open_confirm state, we
1984 # transition to established state.
1985 if self.state.bgp_state == const.BGP_FSM_OPEN_CONFIRM:
1986 self.state.bgp_state = const.BGP_FSM_ESTABLISHED
1987 self._enqueue_init_updates()
1989 elif msg.type == BGP_MSG_UPDATE:
1990 assert self.state.bgp_state == const.BGP_FSM_ESTABLISHED
1991 # Will try to process this UDPATE message further
1992 self._handle_update_msg(msg)
1994 elif msg.type == BGP_MSG_ROUTE_REFRESH:
1995 # If its route-refresh message
1996 assert self.state.bgp_state == const.BGP_FSM_ESTABLISHED
1997 self._handle_route_refresh_msg(msg)
2000 # Open/Notification messages are currently handled by protocol and
2001 # nothing is done inside peer, so should not see them here.
2002 raise ValueError('Peer does not support handling of %s'
2003 ' message during %s state' %
2004 (msg, self.state.bgp_state))
2006 def _handle_err_sor_msg(self, afi, safi):
2007 # Check if ERR capability is enabled for this peer.
2008 if not self._protocol.is_enhanced_rr_cap_valid():
2009 LOG.error('Received Start-of-RIB (SOR) even though ERR is not'
2013 # Increment the version number of this peer so that we can identify
2014 # inconsistencies/stale routes.
2015 self.version_num += 1
2017 # Check if refresh_stalepath_time is enabled.
2018 rst = self._common_conf.refresh_stalepath_time
2020 # Set a timer to clean the stale paths at configured time.
2021 # Clean/track inconsistent/stale routes.
2022 route_family = RouteFamily(afi, safi)
2023 if route_family in SUPPORTED_GLOBAL_RF:
2024 self._refresh_stalepath_timer = self._spawn_after(
2025 'err-refresh-stale-path-timer', rst,
2026 self._core_service.table_manager.clean_stale_routes, self,
2028 LOG.debug('Refresh Stale Path timer set (%s sec).', rst)
2030 def _handle_route_refresh_msg(self, msg):
2033 demarcation = msg.demarcation
2035 # If this normal route-refresh request.
2036 if demarcation == 0:
2037 self._handle_route_refresh_req(afi, safi)
2039 # If this is start of RIB (SOR) message.
2040 elif demarcation == 1:
2041 self._handle_err_sor_msg(afi, safi)
2043 # If this is end of RIB (EOR) message.
2044 elif demarcation == 2:
2045 # Clean/track inconsistent/stale routes.
2046 route_family = RouteFamily(afi, safi)
2047 if route_family in SUPPORTED_GLOBAL_RF:
2048 tm = self._core_service.table_manager
2049 tm.clean_stale_routes(self, route_family)
2052 LOG.error('Route refresh message has invalid demarcation %s',
2055 def _handle_route_refresh_req(self, afi, safi):
2056 rr_af = get_rf(afi, safi)
2057 self.state.incr(PeerCounterNames.RECV_REFRESH)
2059 # Check if peer has asked for route-refresh for af that was advertised
2060 if not self._protocol.is_route_family_adv(rr_af):
2061 LOG.info('Peer asked for route - refresh for un - advertised '
2062 'address - family %s', rr_af)
2065 self._fire_route_refresh(rr_af)
2067 def _fire_route_refresh(self, af):
2068 # Check if enhanced route refresh is enabled/valid.
2070 if self._protocol.is_enhanced_rr_cap_valid():
2071 # If enhanced route-refresh is valid/enabled, enqueue SOR.
2074 sor = BGPRouteRefresh(afi, safi, demarcation=1)
2075 self.enque_first_outgoing_msg(sor)
2077 # Ask core to re-send sent routes
2078 self._peer_manager.resend_sent(af, self)
2080 # If enhanced route-refresh is valid/enabled, then we enqueue EOR.
2082 self._enqueue_eor_msg(sor)
2084 def _enqueue_eor_msg(self, sor):
2085 """Enqueues Enhanced RR EOR if for given SOR a EOR is not already
2088 if self._protocol.is_enhanced_rr_cap_valid() and not sor.eor_sent:
2091 eor = BGPRouteRefresh(afi, safi, demarcation=2)
2092 self.enque_outgoing_msg(eor)
2095 def _schedule_sending_init_updates(self):
2096 """Setup timer for sending best-paths for all other address-families
2099 Setup timer for sending initial updates to peer.
2102 def _enqueue_non_rtc_init_updates():
2103 LOG.debug('Scheduled queuing of initial Non-RTC UPDATEs')
2104 tm = self._core_service.table_manager
2105 self.comm_all_best_paths(tm.global_tables)
2106 self._sent_init_non_rtc_update = True
2107 # Stop the timer as we have handled RTC EOR
2108 self._rtc_eor_timer.stop()
2109 self._rtc_eor_timer = None
2111 self._sent_init_non_rtc_update = False
2112 self._rtc_eor_timer = self._create_timer(
2113 Peer.RTC_EOR_TIMER_NAME,
2114 _enqueue_non_rtc_init_updates
2116 # Start timer for sending initial updates
2117 self._rtc_eor_timer.start(const.RTC_EOR_DEFAULT_TIME, now=False)
2118 LOG.debug('Scheduled sending of initial Non-RTC UPDATEs after:'
2119 ' %s sec', const.RTC_EOR_DEFAULT_TIME)
2121 def _unschedule_sending_init_updates(self):
2122 """Un-schedules sending of initial updates
2124 Stops the timer if set for sending initial updates.
2126 - True if timer was stopped
2127 - False if timer was already stopped and nothing was done
2129 LOG.debug('Un-scheduling sending of initial Non-RTC UPDATEs'
2130 ' (init. UPDATEs already sent: %s)',
2131 self._sent_init_non_rtc_update)
2132 if self._rtc_eor_timer:
2133 self._rtc_eor_timer.stop()
2134 self._rtc_eor_timer = None
2138 def _enqueue_init_updates(self):
2139 """Enqueues current routes to be shared with this peer."""
2140 assert self.state.bgp_state == const.BGP_FSM_ESTABLISHED
2141 if self.is_mbgp_cap_valid(RF_RTC_UC):
2142 # Enqueues all best-RTC_NLRIs to be sent as initial update to this
2144 self._peer_manager.comm_all_rt_nlris(self)
2145 self._schedule_sending_init_updates()
2147 # Enqueues all best-path to be sent as initial update to this peer
2148 # expect for RTC route-family.
2149 tm = self._core_service.table_manager
2150 self.comm_all_best_paths(tm.global_tables)
2152 def comm_all_best_paths(self, global_tables):
2153 """Shares/communicates current best paths with this peers.
2155 Can be used to send initial updates after we have established session
2158 LOG.debug('Communicating current best path for all afi/safi except'
2160 # We will enqueue best path from all global destination.
2161 for route_family, table in global_tables.items():
2162 if route_family == RF_RTC_UC:
2164 if self.is_mbgp_cap_valid(route_family):
2165 for dest in table.values():
2167 self.communicate_path(dest.best_path)
2169 def communicate_path(self, path):
2170 """Communicates `path` to this peer if it qualifies.
2172 Checks if `path` should be shared/communicated with this peer according
2173 to various conditions: like bgp state, transmit side loop, local and
2174 remote AS path, community attribute, etc.
2176 LOG.debug('Peer %s asked to communicate path', self)
2178 raise ValueError('Invalid path %s given.' % path)
2180 # We do not send anything to peer who is not in established state.
2181 if not self.in_established():
2182 LOG.debug('Skipping sending path as peer is not in '
2183 'ESTABLISHED state %s', path)
2186 # Check if this session is available for given paths afi/safi
2187 path_rf = path.route_family
2188 if not (self.is_mpbgp_cap_valid(path_rf) or
2189 path_rf in [RF_IPv4_UC, RF_IPv6_UC]):
2190 LOG.debug('Skipping sending path as %s route family is not'
2191 ' available for this session', path_rf)
2194 # If RTC capability is available and path afi/saif is other than RT
2196 if path_rf != RF_RTC_UC and \
2197 self.is_mpbgp_cap_valid(RF_RTC_UC):
2198 rtfilter = self._peer_manager.curr_peer_rtfilter(self)
2199 # If peer does not have any rtfilter or if rtfilter does not have
2200 # any RTs common with path RTs we do not share this path with the
2202 if rtfilter and not path.has_rts_in(rtfilter):
2203 LOG.debug('Skipping sending path as rffilter %s and path '
2204 'rts %s have no RT in common',
2205 rtfilter, path.get_rts())
2208 # Transmit side loop detection: We check if leftmost AS matches
2209 # peers AS, if so we do not send UPDATE message to this peer.
2210 as_path = path.get_pattr(BGP_ATTR_TYPE_AS_PATH)
2211 if as_path and as_path.has_matching_leftmost(self.remote_as):
2212 LOG.debug('Skipping sending path as AS_PATH has peer AS %s',
2216 # If this peer is a route server client, we forward the path
2217 # regardless of AS PATH loop, whether the connection is iBGP or eBGP,
2218 # or path's communities.
2219 if self.is_route_server_client:
2220 outgoing_route = OutgoingRoute(path)
2221 self.enque_outgoing_msg(outgoing_route)
2223 if self._neigh_conf.multi_exit_disc:
2224 med_attr = path.get_pattr(BGP_ATTR_TYPE_MULTI_EXIT_DISC)
2226 path = bgp_utils.clone_path_and_update_med_for_target_neighbor(
2228 self._neigh_conf.multi_exit_disc
2231 # For connected/local-prefixes, we send update to all peers.
2232 if path.source is None:
2233 # Construct OutgoingRoute specific for this peer and put it in
2235 outgoing_route = OutgoingRoute(path)
2236 self.enque_outgoing_msg(outgoing_route)
2238 # If path from a bgp-peer is new best path, we share it with
2239 # all bgp-peers except the source peer and other peers in his AS.
2240 # This is default Junos setting that in Junos can be disabled with
2241 # 'advertise-peer-as' setting.
2242 elif (self != path.source or
2243 self.remote_as != path.source.remote_as):
2244 # When BGP speaker receives an UPDATE message from an internal
2245 # peer, the receiving BGP speaker SHALL NOT re-distribute the
2246 # routing information contained in that UPDATE message to other
2247 # internal peers (unless the speaker acts as a BGP Route
2248 # Reflector) [RFC4271].
2249 if (self.remote_as == self._core_service.asn
2250 and self.remote_as == path.source.remote_as
2251 and isinstance(path.source, Peer)
2252 and not path.source.is_route_reflector_client
2253 and not self.is_route_reflector_client):
2255 'Skipping sending iBGP route to iBGP peer %s AS %s',
2256 self.ip_address, self.remote_as)
2259 # If new best path has community attribute, it should be taken into
2260 # account when sending UPDATE to peers.
2261 comm_attr = path.get_pattr(BGP_ATTR_TYPE_COMMUNITIES)
2263 comm_attr_na = comm_attr.has_comm_attr(
2264 BGPPathAttributeCommunities.NO_ADVERTISE
2266 # If we have NO_ADVERTISE attribute present, we do not send
2267 # UPDATE to any peers
2269 LOG.debug('Path has community attr. NO_ADVERTISE = %s'
2270 '. Hence not advertising to peer',
2274 comm_attr_ne = comm_attr.has_comm_attr(
2275 BGPPathAttributeCommunities.NO_EXPORT
2277 comm_attr_nes = comm_attr.has_comm_attr(
2278 BGPPathAttributeCommunities.NO_EXPORT_SUBCONFED
2280 # If NO_EXPORT_SUBCONFED/NO_EXPORT is one of the attribute, we
2281 # do not advertise to eBGP peers as we do not have any
2282 # confederation feature at this time.
2283 if ((comm_attr_nes or comm_attr_ne) and
2284 (self.remote_as != self._core_service.asn)):
2285 LOG.debug('Skipping sending UPDATE to peer: %s as per '
2286 'community attribute configuration', self)
2289 # Construct OutgoingRoute specific for this peer and put it in
2291 outgoing_route = OutgoingRoute(path)
2292 self.enque_outgoing_msg(outgoing_route)
2293 LOG.debug('Enqueued outgoing route %s for peer %s',
2294 outgoing_route.path.nlri, self)
2296 def connection_made(self):
2297 """Protocols connection established handler
2300 'Connection to peer: %s established',
2301 self._neigh_conf.ip_address,
2303 'resource_name': self._neigh_conf.name,
2304 'resource_id': self._neigh_conf.id
2308 def connection_lost(self, reason):
2309 """Protocols connection lost handler.
2312 'Connection to peer %s lost, reason: %s Resetting '
2313 'retry connect loop: %s' %
2314 (self._neigh_conf.ip_address, reason,
2315 self._connect_retry_event.is_set()),
2317 'resource_name': self._neigh_conf.name,
2318 'resource_id': self._neigh_conf.id
2321 self.state.bgp_state = const.BGP_FSM_IDLE
2323 self._protocol.stop()
2324 self._protocol = None
2325 # Create new collection for initial RT NLRIs
2326 self._init_rtc_nlri_path = []
2327 self._sent_init_non_rtc_update = False
2329 self.clear_outgoing_msg_list()
2330 # Un-schedule timers
2331 self._unschedule_sending_init_updates()
2333 # Increment the version number of this source.
2334 self.version_num += 1
2335 self._peer_manager.on_peer_down(self)
2337 # Check configuration if neighbor is still enabled, we try
2339 if self._neigh_conf.enabled:
2340 if not self._connect_retry_event.is_set():
2341 self._connect_retry_event.set()
2344 def _lookup_attribute_map(attribute_map, attr_type, path):
2346 if attr_type in attribute_map:
2347 maps = attribute_map[attr_type]
2349 cause, result = m.evaluate(path)
2351 "local_pref evaluation result:%s, cause:%s",
2354 result_attr = m.get_attribute()