backing up
[vsorcdistro/.git] / ryu / build / lib.linux-armv7l-2.7 / ryu / services / protocols / bgp / peer.py
1 # Copyright (C) 2014 Nippon Telegraph and Telephone Corporation.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #    http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 """
16  BGP peer related classes and utils.
17 """
18 from collections import namedtuple
19 import logging
20 import socket
21 import time
22 import traceback
23
24 from six.moves import zip_longest
25
26 from ryu.services.protocols.bgp.base import Activity
27 from ryu.services.protocols.bgp.base import Sink
28 from ryu.services.protocols.bgp.base import Source
29 from ryu.services.protocols.bgp.base import SUPPORTED_GLOBAL_RF
30 from ryu.services.protocols.bgp import constants as const
31 from ryu.services.protocols.bgp.model import OutgoingRoute
32 from ryu.services.protocols.bgp.model import SentRoute
33 from ryu.services.protocols.bgp.info_base.base import PrefixFilter
34 from ryu.services.protocols.bgp.info_base.base import AttributeMap
35 from ryu.services.protocols.bgp.model import ReceivedRoute
36 from ryu.services.protocols.bgp.net_ctrl import NET_CONTROLLER
37 from ryu.services.protocols.bgp.rtconf.neighbors import NeighborConfListener
38 from ryu.services.protocols.bgp.rtconf.neighbors import CONNECT_MODE_PASSIVE
39 from ryu.services.protocols.bgp.signals.emit import BgpSignalBus
40 from ryu.services.protocols.bgp.speaker import BgpProtocol
41 from ryu.services.protocols.bgp.info_base.ipv4 import Ipv4Path
42 from ryu.services.protocols.bgp.info_base.vpnv4 import Vpnv4Path
43 from ryu.services.protocols.bgp.info_base.vpnv6 import Vpnv6Path
44 from ryu.services.protocols.bgp.rtconf.vrfs import VRF_RF_IPV4, VRF_RF_IPV6
45 from ryu.services.protocols.bgp.utils import bgp as bgp_utils
46 from ryu.services.protocols.bgp.utils.evtlet import EventletIOFactory
47 from ryu.services.protocols.bgp.utils import stats
48 from ryu.services.protocols.bgp.utils.validation import is_valid_old_asn
49
50 from ryu.lib.packet import bgp
51
52 from ryu.lib.packet.bgp import RouteFamily
53 from ryu.lib.packet.bgp import RF_IPv4_UC
54 from ryu.lib.packet.bgp import RF_IPv6_UC
55 from ryu.lib.packet.bgp import RF_IPv4_VPN
56 from ryu.lib.packet.bgp import RF_IPv6_VPN
57 from ryu.lib.packet.bgp import RF_IPv4_FLOWSPEC
58 from ryu.lib.packet.bgp import RF_VPNv4_FLOWSPEC
59 from ryu.lib.packet.bgp import RF_RTC_UC
60 from ryu.lib.packet.bgp import get_rf
61
62 from ryu.lib.packet.bgp import BGPOpen
63 from ryu.lib.packet.bgp import BGPUpdate
64 from ryu.lib.packet.bgp import BGPRouteRefresh
65
66 from ryu.lib.packet.bgp import BGP_ERROR_CEASE
67 from ryu.lib.packet.bgp import BGP_ERROR_SUB_ADMINISTRATIVE_SHUTDOWN
68 from ryu.lib.packet.bgp import BGP_ERROR_SUB_CONNECTION_COLLISION_RESOLUTION
69
70
71 from ryu.lib.packet.bgp import BGP_MSG_UPDATE
72 from ryu.lib.packet.bgp import BGP_MSG_KEEPALIVE
73 from ryu.lib.packet.bgp import BGP_MSG_ROUTE_REFRESH
74
75 from ryu.lib.packet.bgp import BGPPathAttributeNextHop
76 from ryu.lib.packet.bgp import BGPPathAttributeAsPath
77 from ryu.lib.packet.bgp import BGPPathAttributeAs4Path
78 from ryu.lib.packet.bgp import BGPPathAttributeLocalPref
79 from ryu.lib.packet.bgp import BGPPathAttributeExtendedCommunities
80 from ryu.lib.packet.bgp import BGPPathAttributeOriginatorId
81 from ryu.lib.packet.bgp import BGPPathAttributeClusterList
82 from ryu.lib.packet.bgp import BGPPathAttributeMpReachNLRI
83 from ryu.lib.packet.bgp import BGPPathAttributeMpUnreachNLRI
84 from ryu.lib.packet.bgp import BGPPathAttributeCommunities
85 from ryu.lib.packet.bgp import BGPPathAttributeMultiExitDisc
86
87 from ryu.lib.packet.bgp import BGP_ATTR_TYPE_ORIGIN
88 from ryu.lib.packet.bgp import BGP_ATTR_TYPE_AGGREGATOR
89 from ryu.lib.packet.bgp import BGP_ATTR_TYPE_AS4_AGGREGATOR
90 from ryu.lib.packet.bgp import BGP_ATTR_TYPE_AS_PATH
91 from ryu.lib.packet.bgp import BGP_ATTR_TYPE_AS4_PATH
92 from ryu.lib.packet.bgp import BGP_ATTR_TYPE_NEXT_HOP
93 from ryu.lib.packet.bgp import BGP_ATTR_TYPE_MP_REACH_NLRI
94 from ryu.lib.packet.bgp import BGP_ATTR_TYPE_MP_UNREACH_NLRI
95 from ryu.lib.packet.bgp import BGP_ATTR_TYPE_MULTI_EXIT_DISC
96 from ryu.lib.packet.bgp import BGP_ATTR_TYPE_COMMUNITIES
97 from ryu.lib.packet.bgp import BGP_ATTR_TYPE_ORIGINATOR_ID
98 from ryu.lib.packet.bgp import BGP_ATTR_TYPE_CLUSTER_LIST
99 from ryu.lib.packet.bgp import BGP_ATTR_TYPE_EXTENDED_COMMUNITIES
100 from ryu.lib.packet.bgp import BGP_ATTR_TYEP_PMSI_TUNNEL_ATTRIBUTE
101
102 from ryu.lib.packet.bgp import BGPTwoOctetAsSpecificExtendedCommunity
103 from ryu.lib.packet.bgp import BGPIPv4AddressSpecificExtendedCommunity
104
105 from ryu.lib.packet import safi as subaddr_family
106
107 LOG = logging.getLogger('bgpspeaker.peer')
108
109
110 def is_valid_state(state):
111     """Returns True if given state is a valid bgp finite state machine state.
112     """
113     return state in const.BGP_FSM_VALID_STATES
114
115
116 class PeerRf(object):
117     """State maintained per-RouteFamily for a Peer."""
118
119     def __init__(self, peer, route_family, enabled=False):
120         assert peer and route_family
121
122         self.enabled = enabled
123
124         # Back pointers.
125         self.peer = peer
126         self.rf = route_family
127
128
129 PeerCounterNames = namedtuple(
130     'PeerCounterNames',
131     ('RECV_PREFIXES',
132      'RECV_UPDATES',
133      'SENT_UPDATES',
134      'RECV_NOTIFICATION',
135      'SENT_NOTIFICATION',
136      'SENT_REFRESH',
137      'RECV_REFRESH',
138      'FSM_ESTB_TRANSITIONS')
139 )(
140     'recv_prefixes',
141     'recv_updates',
142     'sent_updates',
143     'recv_notification',
144     'sent_notification',
145     'sent_refresh',
146     'recv_refresh',
147     'fms_established_transitions'
148 )
149
150
151 class PeerState(object):
152     """A BGP neighbor state. Think of this class as of information and stats
153     container for Peer.
154     """
155
156     def __init__(self, peer, signal_bus):
157         # Back pointer to peer whose stats this instances represents.
158         self.peer = peer
159         # Current state of BGP finite state machine.
160         self._bgp_state = const.BGP_FSM_IDLE
161         self._established_time = 0
162         self._last_bgp_error = None
163         self.counters = {
164             'recv_prefixes': 0,
165             'recv_updates': 0,
166             'sent_updates': 0,
167             'recv_notification': 0,
168             'sent_notification': 0,
169             'sent_refresh': 0,
170             'recv_refresh': 0,
171             'fms_established_transitions': 0,
172         }
173         self._signal_bus = signal_bus
174
175         # TODO(JK): refactor other counters to use signals also
176         self._signal_bus.register_listener(
177             ('error', 'bgp', self.peer),
178             self._remember_last_bgp_error
179         )
180
181         self._signal_bus.register_listener(
182             BgpSignalBus.BGP_NOTIFICATION_RECEIVED + (self.peer,),
183             lambda _, msg: self.incr(PeerCounterNames.RECV_NOTIFICATION)
184         )
185
186         self._signal_bus.register_listener(
187             BgpSignalBus.BGP_NOTIFICATION_SENT + (self.peer,),
188             lambda _, msg: self.incr(PeerCounterNames.SENT_NOTIFICATION)
189         )
190
191     def _remember_last_bgp_error(self, identifier, data):
192         self._last_bgp_error = dict([(k, v)
193                                      for k, v in data.items()
194                                      if k != 'peer'])
195
196     @property
197     def recv_prefix(self):
198         # Number of prefixes received from peer.
199         return self.counters[PeerCounterNames.RECV_PREFIXES]
200
201     @property
202     def bgp_state(self):
203         return self._bgp_state
204
205     @bgp_state.setter
206     def bgp_state(self, new_state):
207         old_state = self._bgp_state
208         if old_state == new_state:
209             return
210
211         self._bgp_state = new_state
212         NET_CONTROLLER.send_rpc_notification(
213             'neighbor.state',
214             {
215                 'ip_address': self.peer.ip_address,
216                 'state': new_state
217             }
218         )
219
220         # transition to Established from another state
221         if new_state == const.BGP_FSM_ESTABLISHED:
222             self.incr(PeerCounterNames.FSM_ESTB_TRANSITIONS)
223             self._established_time = time.time()
224             self._signal_bus.adj_up(self.peer)
225             NET_CONTROLLER.send_rpc_notification(
226                 'neighbor.up', {'ip_address': self.peer.ip_address}
227             )
228         # transition from Established to another state
229         elif old_state == const.BGP_FSM_ESTABLISHED:
230             self._established_time = 0
231             self._signal_bus.adj_down(self.peer)
232             NET_CONTROLLER.send_rpc_notification(
233                 'neighbor.down', {'ip_address': self.peer.ip_address}
234             )
235
236         LOG.debug('Peer %s BGP FSM went from %s to %s',
237                   self.peer.ip_address, old_state, self.bgp_state)
238
239     def incr(self, counter_name, incr_by=1):
240         if counter_name not in self.counters:
241             raise ValueError('Un-recognized counter name: %s' % counter_name)
242         counter = self.counters.setdefault(counter_name, 0)
243         counter += incr_by
244         self.counters[counter_name] = counter
245
246     def get_count(self, counter_name):
247         if counter_name not in self.counters:
248             raise ValueError('Un-recognized counter name: %s' % counter_name)
249         return self.counters.get(counter_name, 0)
250
251     @property
252     def total_msg_sent(self):
253         """Returns total number of UPDATE, NOTIFICATION and ROUTE_REFRESH
254          message sent to this peer.
255          """
256         return (self.get_count(PeerCounterNames.SENT_REFRESH) +
257                 self.get_count(PeerCounterNames.SENT_UPDATES))
258
259     @property
260     def total_msg_recv(self):
261         """Returns total number of UPDATE, NOTIFICATION and ROUTE_REFRESH
262         messages received from this peer.
263         """
264         return (self.get_count(PeerCounterNames.RECV_UPDATES) +
265                 self.get_count(PeerCounterNames.RECV_REFRESH) +
266                 self.get_count(PeerCounterNames.RECV_NOTIFICATION))
267
268     def get_stats_summary_dict(self):
269         """Returns basic stats.
270
271         Returns a `dict` with various counts and stats, see below.
272         """
273         uptime = time.time() - self._established_time \
274             if self._established_time != 0 else -1
275         return {
276             stats.UPDATE_MSG_IN: self.get_count(PeerCounterNames.RECV_UPDATES),
277             stats.UPDATE_MSG_OUT: self.get_count(
278                 PeerCounterNames.SENT_UPDATES
279             ),
280             stats.TOTAL_MSG_IN: self.total_msg_recv,
281             stats.TOTAL_MSG_OUT: self.total_msg_sent,
282             stats.FMS_EST_TRANS: self.get_count(
283                 PeerCounterNames.FSM_ESTB_TRANSITIONS
284             ),
285             stats.UPTIME: uptime
286         }
287
288
289 class Peer(Source, Sink, NeighborConfListener, Activity):
290     """A BGP neighbor/peer.
291
292     Listens on neighbor configuration changes and handles change events
293     appropriately. If peering is enabled tries 'actively'/'pro-actively' to
294     establish session with peer. Allows binding of `BgpProtocol` instances to
295     allow 'passive'/'reactive' establishment of bgp session with peer.
296     Maintains BGP state machine (may not be fully compliant with RFC). Handles
297     bgp UPDATE messages. Provides a queue to send update message to peer.
298     """
299
300     RTC_EOR_TIMER_NAME = 'RTC_EOR_Timer'
301
302     def __init__(self, common_conf, neigh_conf,
303                  core_service, signal_bus, peer_manager):
304         peer_activity_name = 'Peer: %s' % neigh_conf.ip_address
305         Activity.__init__(self, name=peer_activity_name)
306         Source.__init__(self, version_num=1)
307         Sink.__init__(self)
308         # Add listener for configuration changes.
309         NeighborConfListener.__init__(self, neigh_conf)
310
311         # Current configuration of this peer.
312         self._neigh_conf = neigh_conf
313         self._common_conf = common_conf
314         self._core_service = core_service
315         self._signal_bus = signal_bus
316         self._peer_manager = peer_manager
317
318         # Host Bind IP
319         self._host_bind_ip = None
320         self._host_bind_port = None
321
322         # TODO(PH): revisit maintaining state/stats information.
323         # Peer state.
324         self.state = PeerState(self, self._signal_bus)
325         self._periodic_stats_logger = \
326             self._create_timer('Peer State Summary Stats Timer',
327                                stats.log,
328                                stats_resource=self._neigh_conf,
329                                stats_source=self.state.get_stats_summary_dict)
330         if self._neigh_conf.stats_log_enabled:
331             self._periodic_stats_logger.start(self._neigh_conf.stats_time)
332
333         # State per route family, {RouteFamily: PeerRf,}.
334         self.rf_state = {}
335         # Get vpnv4 route family settings.
336         prf = PeerRf(self, RF_IPv4_VPN,
337                      enabled=self._neigh_conf.cap_mbgp_vpnv4)
338         self.rf_state[RF_IPv4_VPN] = prf
339         # Get vpnv6 route family settings.
340         prf = PeerRf(self, RF_IPv6_VPN, self._neigh_conf.cap_mbgp_vpnv6)
341         self.rf_state[RF_IPv6_VPN] = prf
342
343         # Bound protocol instance
344         self._protocol = None
345
346         # Setting this event starts the connect_loop loop again
347         # Clearing this event will stop the connect_loop loop
348         self._connect_retry_event = EventletIOFactory.create_custom_event()
349
350         # Reference to threads related to enhanced refresh timers.
351         self._refresh_stalepath_timer = None
352         self._refresh_max_eor_timer = None
353
354         # Latest valid Open Message
355         self.curr_open_msg = None
356
357         # RTC end-of-rib timer
358         self._rtc_eor_timer = None
359         self._sent_init_non_rtc_update = False
360         self._init_rtc_nlri_path = []
361
362         # in-bound filters
363         self._in_filters = self._neigh_conf.in_filter
364
365         # out-bound filters
366         self._out_filters = self._neigh_conf.out_filter
367
368         # Adj-rib-in
369         self._adj_rib_in = {}
370
371         # Adj-rib-out
372         self._adj_rib_out = {}
373
374         # attribute maps
375         self._attribute_maps = {}
376
377     @property
378     def remote_as(self):
379         return self._neigh_conf.remote_as
380
381     @property
382     def rtc_as(self):
383         return self._neigh_conf.rtc_as
384
385     @property
386     def ip_address(self):
387         return self._neigh_conf.ip_address
388
389     @property
390     def protocol(self):
391         return self._protocol
392
393     @property
394     def host_bind_ip(self):
395         return self._host_bind_ip
396
397     @property
398     def host_bind_port(self):
399         return self._host_bind_port
400
401     @property
402     def enabled(self):
403         return self._neigh_conf.enabled
404
405     @property
406     def med(self):
407         return self._neigh_conf.multi_exit_disc
408
409     @property
410     def local_as(self):
411         return self._neigh_conf.local_as
412
413     @property
414     def cap_four_octet_as_number(self):
415         return self._neigh_conf.cap_four_octet_as_number
416
417     @property
418     def in_filters(self):
419         return self._in_filters
420
421     @in_filters.setter
422     def in_filters(self, filters):
423         self._in_filters = [f.clone() for f in filters]
424         LOG.debug('set in-filter : %s', filters)
425         self.on_update_in_filter()
426
427     @property
428     def out_filters(self):
429         return self._out_filters
430
431     @out_filters.setter
432     def out_filters(self, filters):
433         self._out_filters = [f.clone() for f in filters]
434         LOG.debug('set out-filter : %s', filters)
435         self.on_update_out_filter()
436
437     @property
438     def adj_rib_in(self):
439         return self._adj_rib_in
440
441     @property
442     def adj_rib_out(self):
443         return self._adj_rib_out
444
445     @property
446     def is_route_server_client(self):
447         return self._neigh_conf.is_route_server_client
448
449     @property
450     def is_route_reflector_client(self):
451         return self._neigh_conf.is_route_reflector_client
452
453     @property
454     def check_first_as(self):
455         return self._neigh_conf.check_first_as
456
457     @property
458     def connect_mode(self):
459         return self._neigh_conf.connect_mode
460
461     @property
462     def attribute_maps(self):
463         return self._attribute_maps
464
465     @attribute_maps.setter
466     def attribute_maps(self, attribute_maps):
467         _attr_maps = {}
468         _attr_maps.setdefault(const.ATTR_MAPS_ORG_KEY, [])
469
470         # key is 'default' or rd_rf that represents RD and route_family
471         key = attribute_maps[const.ATTR_MAPS_LABEL_KEY]
472         at_maps = attribute_maps[const.ATTR_MAPS_VALUE]
473
474         for a in at_maps:
475             cloned = a.clone()
476             LOG.debug("AttributeMap attr_type: %s, attr_value: %s",
477                       cloned.attr_type, cloned.attr_value)
478             attr_list = _attr_maps.setdefault(cloned.attr_type, [])
479             attr_list.append(cloned)
480
481             # preserve original order of attribute_maps
482             _attr_maps[const.ATTR_MAPS_ORG_KEY].append(cloned)
483
484         self._attribute_maps[key] = _attr_maps
485         self.on_update_attribute_maps()
486
487     def is_mpbgp_cap_valid(self, route_family):
488         if not self.in_established:
489             raise ValueError('Invalid request: Peer not in established state')
490         return self._protocol.is_mbgp_cap_valid(route_family)
491
492     def is_four_octet_as_number_cap_valid(self):
493         if not self.in_established:
494             raise ValueError('Invalid request: Peer not in established state')
495         return self._protocol.is_four_octet_as_number_cap_valid()
496
497     def is_ebgp_peer(self):
498         """Returns *True* if this is a eBGP peer, else *False*."""
499         return self._common_conf.local_as != self._neigh_conf.remote_as
500
501     def in_established(self):
502         return self.state.bgp_state == const.BGP_FSM_ESTABLISHED
503
504     def in_idle(self):
505         return self.state.bgp_state == const.BGP_FSM_IDLE
506
507     def in_active(self):
508         return self.state.bgp_state == const.BGP_FSM_ACTIVE
509
510     def in_open_sent(self):
511         return self.state.bgp_state == const.BGP_FSM_OPEN_SENT
512
513     def in_open_confirm(self):
514         return self.state.bgp_state == const.BGP_FSM_OPEN_CONFIRM
515
516     def in_connect(self):
517         return self.state.bgp_state == const.BGP_FSM_CONNECT
518
519     def curr_fms_state(self):
520         return self.state.bgp_state
521
522     def is_mbgp_cap_valid(self, route_family):
523         if not self.in_established():
524             return False
525
526         return self._protocol.is_mbgp_cap_valid(route_family)
527
528     def on_chg_stats_time_conf_with_stats(self, evt):
529         # TODO(PH): provide implementation when updating neighbor is needed
530         pass
531
532     def on_chg_stats_enabled_conf_with_stats(self, evt):
533         # TODO(PH): provide implementation when updating neighbor is needed
534         pass
535
536     def on_update_enabled(self, conf_evt):
537         """Implements neighbor configuration change listener.
538         """
539         enabled = conf_evt.value
540         # If we do not have any protocol bound and configuration asks us to
541         # enable this peer, we try to establish connection again.
542         if enabled:
543             LOG.info('%s enabled', self)
544             if self._protocol and self._protocol.started:
545                 LOG.error('Tried to enable neighbor that is already enabled')
546             else:
547                 self.state.bgp_state = const.BGP_FSM_CONNECT
548                 # Restart connect loop if not already running.
549                 if not self._connect_retry_event.is_set():
550                     self._connect_retry_event.set()
551                     LOG.debug('Starting connect loop as neighbor is enabled.')
552         else:
553             LOG.info('%s disabled', self)
554             if self._protocol:
555                 # Stopping protocol will eventually trigger connection_lost
556                 # handler which will do some clean-up.
557                 # But the greenlet that is in charge of the socket may be kill
558                 # when we stop the protocol, hence we call connection_lost
559                 # here as we triggered socket to close.
560                 self._protocol.send_notification(
561                     BGP_ERROR_CEASE,
562                     BGP_ERROR_SUB_ADMINISTRATIVE_SHUTDOWN
563                 )
564                 self._protocol.stop()
565                 self._protocol = None
566                 self.state.bgp_state = const.BGP_FSM_IDLE
567             # If this peer is not enabled any-more we stop trying to make any
568             # connection.
569             LOG.debug('Disabling connect-retry as neighbor was disabled')
570             self._connect_retry_event.clear()
571
572     def on_update_med(self, conf_evt):
573         LOG.debug('on_update_med fired')
574         if self._protocol is not None and self._protocol.started:
575             negotiated_afs = self._protocol.negotiated_afs
576             for af in negotiated_afs:
577                 self._fire_route_refresh(af)
578
579     def _on_update_connect_mode(self, mode):
580         if mode is not CONNECT_MODE_PASSIVE and \
581                 'peer.connect_loop' not in self._child_thread_map:
582             LOG.debug("start connect loop. (mode: %s)", mode)
583             self._spawn('peer.connect_loop', self._connect_loop,
584                         self._client_factory)
585         elif mode is CONNECT_MODE_PASSIVE:
586             LOG.debug("stop connect loop. (mode: %s)", mode)
587             self._stop_child_threads('peer.connect_loop')
588
589     def on_update_connect_mode(self, conf_evt):
590         self._on_update_connect_mode(conf_evt.value)
591
592     def _apply_filter(self, filters, path):
593         block = False
594         blocked_cause = None
595
596         for filter_ in filters:
597             if filter_.ROUTE_FAMILY != path.ROUTE_FAMILY:
598                 continue
599
600             policy, is_matched = filter_.evaluate(path)
601             if policy == PrefixFilter.POLICY_PERMIT and is_matched:
602                 block = False
603                 break
604             elif policy == PrefixFilter.POLICY_DENY and is_matched:
605                 block = True
606                 blocked_cause = filter_.prefix + ' - DENY'
607                 break
608
609         return block, blocked_cause
610
611     def _apply_in_filter(self, path):
612         return self._apply_filter(self._in_filters, path)
613
614     def _apply_out_filter(self, path):
615         return self._apply_filter(self._out_filters, path)
616
617     def on_update_in_filter(self):
618         LOG.debug('on_update_in_filter fired')
619         for received_path in self._adj_rib_in.values():
620             LOG.debug('received_path: %s', received_path)
621             path = received_path.path
622             nlri_str = path.nlri.formatted_nlri_str
623             block, blocked_reason = self._apply_in_filter(path)
624             if block == received_path.filtered:
625                 LOG.debug('block situation not changed: %s', block)
626                 continue
627             elif block:
628                 # path wasn't blocked, but must be blocked by this update
629                 path = path.clone(for_withdrawal=True)
630                 LOG.debug('withdraw %s because of in filter update', nlri_str)
631             else:
632                 # path was blocked, but mustn't be blocked by this update
633                 LOG.debug('learn blocked %s because of in filter update',
634                           nlri_str)
635             received_path.filtered = block
636             tm = self._core_service.table_manager
637             tm.learn_path(path)
638
639     def on_update_out_filter(self):
640         LOG.debug('on_update_out_filter fired')
641         for sent_path in self._adj_rib_out.values():
642             LOG.debug('sent_path: %s', sent_path)
643             path = sent_path.path
644             nlri_str = path.nlri.formatted_nlri_str
645             block, blocked_reason = self._apply_out_filter(path)
646             if block == sent_path.filtered:
647                 LOG.debug('block situation not changed: %s', block)
648                 continue
649             elif block:
650                 # path wasn't blocked, but must be blocked by this update
651                 withdraw_clone = path.clone(for_withdrawal=True)
652                 outgoing_route = OutgoingRoute(withdraw_clone)
653                 LOG.debug('send withdraw %s because of out filter update',
654                           nlri_str)
655             else:
656                 # path was blocked, but mustn't be blocked by this update
657                 outgoing_route = OutgoingRoute(path)
658                 LOG.debug('send blocked %s because of out filter update',
659                           nlri_str)
660             sent_path.filtered = block
661             self.enque_outgoing_msg(outgoing_route)
662
663     def on_update_attribute_maps(self):
664         # resend sent_route in case of filter matching
665         LOG.debug('on_update_attribute_maps fired')
666         for sent_path in self._adj_rib_out.values():
667             LOG.debug('resend path: %s', sent_path)
668             path = sent_path.path
669             self.enque_outgoing_msg(OutgoingRoute(path))
670
671     def __str__(self):
672         return 'Peer(ip: %s, asn: %s)' % (self._neigh_conf.ip_address,
673                                           self._neigh_conf.remote_as)
674
675     def _run(self, client_factory):
676         LOG.debug('Started peer %s', self)
677         self._client_factory = client_factory
678
679         # Tries actively to establish session if CONNECT_MODE is not PASSIVE
680         self._on_update_connect_mode(self._neigh_conf.connect_mode)
681
682         # Start sink processing
683         self._process_outgoing_msg_list()
684
685     def _send_outgoing_route_refresh_msg(self, rr_msg):
686         """Sends given message `rr_msg` to peer.
687
688         Parameters:
689             - rr_msg: (RouteRefresh) route refresh message to send to peer.
690
691         Update appropriate counters and set appropriate timers.
692         """
693         assert rr_msg.type == BGP_MSG_ROUTE_REFRESH
694         self._protocol.send(rr_msg)
695         LOG.debug('RouteRefresh %s>> %s',
696                   self._neigh_conf.ip_address, rr_msg)
697         # Collect update statistics for sent refresh request.
698         if rr_msg.demarcation == 0:
699             self.state.incr(PeerCounterNames.SENT_REFRESH)
700         # If SOR is sent, we set Max. EOR timer if needed.
701         elif (rr_msg.demarcation == 1 and
702               self._common_conf.refresh_max_eor_time != 0):
703             eor_timer = self._common_conf.refresh_max_eor_time
704             # Set timer to send EOR demarcation.
705             self._spawn_after('end-of-rib-timer', eor_timer,
706                               self._enqueue_eor_msg, rr_msg)
707             LOG.debug('Enhanced RR max. EOR timer set.')
708
709     def _send_outgoing_route(self, outgoing_route):
710         """Constructs `Update` message from given `outgoing_route` and sends
711         it to peer.
712
713         Also, checks if any policies prevent sending this message.
714         Populates Adj-RIB-out with corresponding `SentRoute`.
715         """
716
717         path = outgoing_route.path
718         block, blocked_cause = self._apply_out_filter(path)
719
720         nlri_str = outgoing_route.path.nlri.formatted_nlri_str
721         sent_route = SentRoute(outgoing_route.path, self, block)
722         self._adj_rib_out[nlri_str] = sent_route
723         self._signal_bus.adj_rib_out_changed(self, sent_route)
724
725         # TODO(PH): optimized by sending several prefixes per update.
726         # Construct and send update message.
727         if not block:
728             update_msg = self._construct_update(outgoing_route)
729             self._protocol.send(update_msg)
730             # Collect update statistics.
731             self.state.incr(PeerCounterNames.SENT_UPDATES)
732         else:
733             LOG.debug('prefix : %s is not sent by filter : %s',
734                       path.nlri, blocked_cause)
735
736         # We have to create sent_route for every OutgoingRoute which is
737         # not a withdraw or was for route-refresh msg.
738         if (not outgoing_route.path.is_withdraw and
739                 not outgoing_route.for_route_refresh):
740             # Update the destination with new sent route.
741             tm = self._core_service.table_manager
742             tm.remember_sent_route(sent_route)
743
744     def _process_outgoing_msg_list(self):
745         while True:
746             outgoing_msg = None
747
748             if self._protocol is not None:
749                 # We pick the first outgoing msg. available and send it.
750                 outgoing_msg = self.outgoing_msg_list.pop_first()
751
752             # If we do not have any outgoing route, we wait.
753             if outgoing_msg is None:
754                 self.outgoing_msg_event.clear()
755                 self.outgoing_msg_event.wait()
756                 continue
757
758             # Check currently supported out-going msgs.
759             assert isinstance(
760                 outgoing_msg,
761                 (BGPRouteRefresh, BGPUpdate, OutgoingRoute)
762             ), ('Peer cannot process object: %s in its outgoing queue'
763                 % outgoing_msg)
764
765             # Send msg. to peer.
766             if isinstance(outgoing_msg, BGPRouteRefresh):
767                 self._send_outgoing_route_refresh_msg(outgoing_msg)
768             elif isinstance(outgoing_msg, OutgoingRoute):
769                 self._send_outgoing_route(outgoing_msg)
770
771             # EOR are enqueued as plain Update messages.
772             elif isinstance(outgoing_msg, BGPUpdate):
773                 self._protocol.send(outgoing_msg)
774                 LOG.debug('Update %s>> %s', self._neigh_conf.ip_address,
775                           outgoing_msg)
776                 self.state.incr(PeerCounterNames.SENT_UPDATES)
777
778     def request_route_refresh(self, *route_families):
779         """Request route refresh to peer for given `route_families`.
780
781          If no `route_families` are given, we make request for all supported
782          route families with this peer.
783         Parameters:
784             - `route_families`: list of route families to request route
785             refresh for.
786
787         If this peer is currently not in Established state, we raise exception.
788         If any of the `route_families` are invalid we raise exception.
789         """
790         # If this peer has not established session yet
791         if not self.in_established:
792             raise ValueError('Peer not in established state to satisfy'
793                              ' this request.')
794
795         skip_validation = False
796         # If request is made for all supported route_families for current
797         # session, we collect all route_families for valid for current session.
798         if len(route_families) == 0:
799             route_families = []
800             # We skip validation of route families that we collect ourselves
801             # below.
802             skip_validation = True
803             for route_family in SUPPORTED_GLOBAL_RF:
804                 if self.is_mbgp_cap_valid(route_family):
805                     route_families.append(route_family)
806
807         for route_family in route_families:
808             if (skip_validation or
809                     ((route_family in SUPPORTED_GLOBAL_RF) and
810                      # We ignore request for route_family not valid
811                      # for current session.
812                      self._protocol.is_mbgp_cap_valid(route_family))):
813                 rr_req = BGPRouteRefresh(route_family.afi, route_family.safi)
814                 self.enque_outgoing_msg(rr_req)
815                 LOG.debug('Enqueued Route Refresh message to '
816                           'peer %s for rf: %s', self, route_family)
817
818     def enque_end_of_rib(self, route_family):
819         # MP_UNREACH_NLRI Attribute.
820         mpunreach_attr = BGPPathAttributeMpUnreachNLRI(route_family.afi,
821                                                        route_family.safi,
822                                                        [])
823         update = BGPUpdate(path_attributes=[mpunreach_attr])
824         self.enque_outgoing_msg(update)
825
826     def _session_next_hop(self, path):
827         """Returns nexthop address relevant to current session
828
829         Nexthop used can depend on capabilities of the session. If VPNv6
830         capability is active and session is on IPv4 connection, we have to use
831         IPv4 mapped IPv6 address. In other cases we can use connection end
832         point/local ip address.
833         """
834         route_family = path.route_family
835
836         # By default we use BGPS's interface IP with this peer as next_hop.
837         if self._neigh_conf.next_hop:
838             next_hop = self._neigh_conf.next_hop
839         else:
840             next_hop = self.host_bind_ip
841         if route_family == RF_IPv6_VPN:
842             next_hop = self._ipv4_mapped_ipv6(next_hop)
843
844         return next_hop
845
846     @staticmethod
847     def _ipv4_mapped_ipv6(ipv4_address):
848         # Next hop ipv4_mapped ipv6
849         from netaddr import IPAddress
850         return str(IPAddress(ipv4_address).ipv6())
851
852     def _construct_as_path_attr(self, as_path_attr, as4_path_attr):
853         """Marge AS_PATH and AS4_PATH attribute instances into
854         a single AS_PATH instance."""
855
856         def _listify(li):
857             """Reconstruct AS_PATH list.
858
859             Example::
860
861                 >>> _listify([[1, 2, 3], {4, 5}, [6, 7]])
862                 [1, 2, 3, {4, 5}, 6, 7]
863             """
864             lo = []
865             for l in li:
866                 if isinstance(l, list):
867                     lo.extend(l)
868                 elif isinstance(l, set):
869                     lo.append(l)
870                 else:
871                     pass
872             return lo
873
874         # If AS4_PATH attribute is None, returns the given AS_PATH attribute
875         if as4_path_attr is None:
876             return as_path_attr
877
878         # If AS_PATH is shorter than AS4_PATH, AS4_PATH should be ignored.
879         if as_path_attr.get_as_path_len() < as4_path_attr.get_as_path_len():
880             return as_path_attr
881
882         org_as_path_list = _listify(as_path_attr.path_seg_list)
883         as4_path_list = _listify(as4_path_attr.path_seg_list)
884
885         # Reverse to compare backward.
886         org_as_path_list.reverse()
887         as4_path_list.reverse()
888
889         new_as_path_list = []
890         tmp_list = []
891         for as_path, as4_path in zip_longest(org_as_path_list, as4_path_list):
892             if as4_path is None:
893                 if isinstance(as_path, int):
894                     tmp_list.insert(0, as_path)
895                 elif isinstance(as_path, set):
896                     if tmp_list:
897                         new_as_path_list.insert(0, tmp_list)
898                         tmp_list = []
899                     new_as_path_list.insert(0, as_path)
900                 else:
901                     pass
902             elif isinstance(as4_path, int):
903                 tmp_list.insert(0, as4_path)
904             elif isinstance(as4_path, set):
905                 if tmp_list:
906                     new_as_path_list.insert(0, tmp_list)
907                     tmp_list = []
908                 new_as_path_list.insert(0, as4_path)
909             else:
910                 pass
911         if tmp_list:
912             new_as_path_list.insert(0, tmp_list)
913
914         return bgp.BGPPathAttributeAsPath(new_as_path_list)
915
916     def _trans_as_path(self, as_path_list):
917         """Translates Four-Octet AS number to AS_TRANS and separates
918         AS_PATH list into AS_PATH and AS4_PATH lists if needed.
919
920         If the neighbor does not support Four-Octet AS number,
921         this method constructs AS4_PATH list from AS_PATH list and swaps
922         non-mappable AS number in AS_PATH with AS_TRANS, then
923         returns AS_PATH list and AS4_PATH list.
924         If the neighbor supports Four-Octet AS number, returns
925         the given AS_PATH list and None.
926         """
927
928         def _swap(n):
929             if is_valid_old_asn(n):
930                 # mappable
931                 return n
932             else:
933                 # non-mappable
934                 return bgp.AS_TRANS
935
936         # If the neighbor supports Four-Octet AS number, returns
937         # the given AS_PATH list and None.
938         if self.is_four_octet_as_number_cap_valid():
939             return as_path_list, None
940
941         # If the neighbor does not support Four-Octet AS number,
942         # constructs AS4_PATH list from AS_PATH list and swaps
943         # non-mappable AS number in AS_PATH with AS_TRANS.
944         else:
945             new_as_path_list = []
946             for as_path in as_path_list:
947                 if isinstance(as_path, set):
948                     path_set = set()
949                     for as_num in as_path:
950                         path_set.add(_swap(as_num))
951                     new_as_path_list.append(path_set)
952                 elif isinstance(as_path, list):
953                     path_list = list()
954                     for as_num in as_path:
955                         path_list.append(_swap(as_num))
956                     new_as_path_list.append(path_list)
957                 else:
958                     # Ignore invalid as_path type
959                     pass
960
961             # If all of the AS_PATH list is composed of mappable four-octet
962             # AS numbers only, returns the given AS_PATH list
963             # Assumption: If the constructed AS_PATH list is the same as
964             # the given AS_PATH list, all AS number is mappable.
965             if as_path_list == new_as_path_list:
966                 return as_path_list, None
967
968             return new_as_path_list, as_path_list
969
970     def _construct_update(self, outgoing_route):
971         """Construct update message with Outgoing-routes path attribute
972         appropriately cloned/copied/updated.
973         """
974         update = None
975         path = outgoing_route.path
976         # Get copy of path's path attributes.
977         pathattr_map = path.pathattr_map
978         new_pathattr = []
979
980         if path.is_withdraw:
981             if isinstance(path, Ipv4Path):
982                 update = BGPUpdate(withdrawn_routes=[path.nlri])
983                 return update
984             else:
985                 mpunreach_attr = BGPPathAttributeMpUnreachNLRI(
986                     path.route_family.afi, path.route_family.safi, [path.nlri]
987                 )
988                 new_pathattr.append(mpunreach_attr)
989         elif self.is_route_server_client:
990             nlri_list = [path.nlri]
991             new_pathattr.extend(pathattr_map.values())
992         else:
993             if self.is_route_reflector_client:
994                 # Append ORIGINATOR_ID attribute if not already exist.
995                 if BGP_ATTR_TYPE_ORIGINATOR_ID not in pathattr_map:
996                     originator_id = path.source
997                     if originator_id is None:
998                         originator_id = self._common_conf.router_id
999                     elif isinstance(path.source, Peer):
1000                         originator_id = path.source.ip_address
1001                     new_pathattr.append(
1002                         BGPPathAttributeOriginatorId(value=originator_id))
1003
1004                 # Preppend own CLUSTER_ID into CLUSTER_LIST attribute if exist.
1005                 # Otherwise append CLUSTER_LIST attribute.
1006                 cluster_lst_attr = pathattr_map.get(BGP_ATTR_TYPE_CLUSTER_LIST)
1007                 if cluster_lst_attr:
1008                     cluster_list = list(cluster_lst_attr.value)
1009                     if self._common_conf.cluster_id not in cluster_list:
1010                         cluster_list.insert(0, self._common_conf.cluster_id)
1011                     new_pathattr.append(
1012                         BGPPathAttributeClusterList(cluster_list))
1013                 else:
1014                     new_pathattr.append(
1015                         BGPPathAttributeClusterList(
1016                             [self._common_conf.cluster_id]))
1017
1018             # Supported and un-supported/unknown attributes.
1019             origin_attr = None
1020             nexthop_attr = None
1021             as_path_attr = None
1022             as4_path_attr = None
1023             aggregator_attr = None
1024             as4_aggregator_attr = None
1025             extcomm_attr = None
1026             community_attr = None
1027             localpref_attr = None
1028             pmsi_tunnel_attr = None
1029             unknown_opttrans_attrs = None
1030             nlri_list = [path.nlri]
1031
1032             if path.route_family.safi in (subaddr_family.IP_FLOWSPEC,
1033                                           subaddr_family.VPN_FLOWSPEC):
1034                 # Flow Specification does not have next_hop.
1035                 next_hop = []
1036             elif self.is_ebgp_peer():
1037                 next_hop = self._session_next_hop(path)
1038                 if path.is_local() and path.has_nexthop():
1039                     next_hop = path.nexthop
1040             else:
1041                 next_hop = path.nexthop
1042                 # RFC 4271 allows us to change next_hop
1043                 # if configured to announce its own ip address.
1044                 # Also if the BGP route is configured without next_hop,
1045                 # we use path._session_next_hop() as next_hop.
1046                 if (self._neigh_conf.is_next_hop_self
1047                         or (path.is_local() and not path.has_nexthop())):
1048                     next_hop = self._session_next_hop(path)
1049                     LOG.debug('using %s as a next_hop address instead'
1050                               ' of path.nexthop %s', next_hop, path.nexthop)
1051
1052             nexthop_attr = BGPPathAttributeNextHop(next_hop)
1053             assert nexthop_attr, 'Missing NEXTHOP mandatory attribute.'
1054
1055             if not isinstance(path, Ipv4Path):
1056                 # We construct mpreach-nlri attribute.
1057                 mpnlri_attr = BGPPathAttributeMpReachNLRI(
1058                     path.route_family.afi,
1059                     path.route_family.safi,
1060                     next_hop,
1061                     nlri_list
1062                 )
1063
1064             # ORIGIN Attribute.
1065             # According to RFC this attribute value SHOULD NOT be changed by
1066             # any other speaker.
1067             origin_attr = pathattr_map.get(BGP_ATTR_TYPE_ORIGIN)
1068             assert origin_attr, 'Missing ORIGIN mandatory attribute.'
1069
1070             # AS_PATH Attribute.
1071             # Construct AS-path-attr using paths AS_PATH attr. with local AS as
1072             # first item.
1073             path_aspath = pathattr_map.get(BGP_ATTR_TYPE_AS_PATH)
1074             assert path_aspath, 'Missing AS_PATH mandatory attribute.'
1075             # Deep copy AS_PATH attr value
1076             as_path_list = path_aspath.path_seg_list
1077             # If this is a iBGP peer.
1078             if not self.is_ebgp_peer():
1079                 # When a given BGP speaker advertises the route to an internal
1080                 # peer, the advertising speaker SHALL NOT modify the AS_PATH
1081                 # attribute associated with the route.
1082                 pass
1083             else:
1084                 # When a given BGP speaker advertises the route to an external
1085                 # peer, the advertising speaker updates the AS_PATH attribute
1086                 # as follows:
1087                 # 1) if the first path segment of the AS_PATH is of type
1088                 #    AS_SEQUENCE, the local system prepends its own AS num as
1089                 #    the last element of the sequence (put it in the left-most
1090                 #    position with respect to the position of  octets in the
1091                 #    protocol message).  If the act of prepending will cause an
1092                 #    overflow in the AS_PATH segment (i.e.,  more than 255
1093                 #    ASes), it SHOULD prepend a new segment of type AS_SEQUENCE
1094                 #    and prepend its own AS number to this new segment.
1095                 #
1096                 # 2) if the first path segment of the AS_PATH is of type AS_SET
1097                 #    , the local system prepends a new path segment of type
1098                 #    AS_SEQUENCE to the AS_PATH, including its own AS number in
1099                 #    that segment.
1100                 #
1101                 # 3) if the AS_PATH is empty, the local system creates a path
1102                 #    segment of type AS_SEQUENCE, places its own AS into that
1103                 #    segment, and places that segment into the AS_PATH.
1104                 if (len(as_path_list) > 0 and
1105                         isinstance(as_path_list[0], list) and
1106                         len(as_path_list[0]) < 255):
1107                     as_path_list[0].insert(0, self.local_as)
1108                 else:
1109                     as_path_list.insert(0, [self.local_as])
1110             # Construct AS4_PATH list from AS_PATH list and swap
1111             # non-mappable AS number with AS_TRANS in AS_PATH.
1112             as_path_list, as4_path_list = self._trans_as_path(
1113                 as_path_list)
1114             # If the neighbor supports Four-Octet AS number, send AS_PATH
1115             # in Four-Octet.
1116             if self.is_four_octet_as_number_cap_valid():
1117                 as_path_attr = BGPPathAttributeAsPath(
1118                     as_path_list, as_pack_str='!I')  # specify Four-Octet.
1119             # Otherwise, send AS_PATH in Two-Octet.
1120             else:
1121                 as_path_attr = BGPPathAttributeAsPath(as_path_list)
1122             # If needed, send AS4_PATH attribute.
1123             if as4_path_list:
1124                 as4_path_attr = BGPPathAttributeAs4Path(as4_path_list)
1125
1126             # AGGREGATOR Attribute.
1127             aggregator_attr = pathattr_map.get(BGP_ATTR_TYPE_AGGREGATOR)
1128             # If the neighbor does not support Four-Octet AS number,
1129             # swap non-mappable AS number with AS_TRANS.
1130             if (aggregator_attr and
1131                     not self.is_four_octet_as_number_cap_valid()):
1132                 # If AS number of AGGREGATOR is Four-Octet AS number,
1133                 # swap with AS_TRANS, else do not.
1134                 aggregator_as_number = aggregator_attr.as_number
1135                 if not is_valid_old_asn(aggregator_as_number):
1136                     aggregator_attr = bgp.BGPPathAttributeAggregator(
1137                         bgp.AS_TRANS, aggregator_attr.addr)
1138                     as4_aggregator_attr = bgp.BGPPathAttributeAs4Aggregator(
1139                         aggregator_as_number, aggregator_attr.addr)
1140
1141             # MULTI_EXIT_DISC Attribute.
1142             # For eBGP session we can send multi-exit-disc if configured.
1143             multi_exit_disc = None
1144             if self.is_ebgp_peer():
1145                 if self._neigh_conf.multi_exit_disc:
1146                     multi_exit_disc = BGPPathAttributeMultiExitDisc(
1147                         self._neigh_conf.multi_exit_disc
1148                     )
1149                 else:
1150                     pass
1151             if not self.is_ebgp_peer():
1152                 multi_exit_disc = pathattr_map.get(
1153                     BGP_ATTR_TYPE_MULTI_EXIT_DISC)
1154
1155             # LOCAL_PREF Attribute.
1156             if not self.is_ebgp_peer():
1157                 # For iBGP peers we are required to send local-pref attribute
1158                 # for connected or local prefixes. We check if the path matches
1159                 # attribute_maps and set local-pref value.
1160                 # If the path doesn't match, we set default local-pref given
1161                 # from the user. The default value is 100.
1162                 localpref_attr = BGPPathAttributeLocalPref(
1163                     self._common_conf.local_pref)
1164                 key = const.ATTR_MAPS_LABEL_DEFAULT
1165
1166                 if isinstance(path, (Vpnv4Path, Vpnv6Path)):
1167                     nlri = nlri_list[0]
1168                     rf = VRF_RF_IPV4 if isinstance(path, Vpnv4Path)\
1169                         else VRF_RF_IPV6
1170                     key = ':'.join([nlri.route_dist, rf])
1171
1172                 attr_type = AttributeMap.ATTR_LOCAL_PREF
1173                 at_maps = self._attribute_maps.get(key, {})
1174                 result = self._lookup_attribute_map(at_maps, attr_type, path)
1175                 if result:
1176                     localpref_attr = result
1177
1178             # COMMUNITY Attribute.
1179             community_attr = pathattr_map.get(BGP_ATTR_TYPE_COMMUNITIES)
1180
1181             # EXTENDED COMMUNITY Attribute.
1182             # Construct ExtCommunity path-attr based on given.
1183             path_extcomm_attr = pathattr_map.get(
1184                 BGP_ATTR_TYPE_EXTENDED_COMMUNITIES
1185             )
1186             if path_extcomm_attr:
1187                 # SOO list can be configured per VRF and/or per Neighbor.
1188                 # NeighborConf has this setting we add this to existing list.
1189                 communities = path_extcomm_attr.communities
1190                 if self._neigh_conf.soo_list:
1191                     # construct extended community
1192                     soo_list = self._neigh_conf.soo_list
1193                     subtype = 0x03
1194                     for soo in soo_list:
1195                         first, second = soo.split(':')
1196                         if '.' in first:
1197                             c = BGPIPv4AddressSpecificExtendedCommunity(
1198                                 subtype=subtype,
1199                                 ipv4_address=first,
1200                                 local_administrator=int(second))
1201                         else:
1202                             c = BGPTwoOctetAsSpecificExtendedCommunity(
1203                                 subtype=subtype,
1204                                 as_number=int(first),
1205                                 local_administrator=int(second))
1206                         communities.append(c)
1207
1208                 extcomm_attr = BGPPathAttributeExtendedCommunities(
1209                     communities=communities
1210                 )
1211
1212                 pmsi_tunnel_attr = pathattr_map.get(
1213                     BGP_ATTR_TYEP_PMSI_TUNNEL_ATTRIBUTE
1214                 )
1215
1216             # UNKNOWN Attributes.
1217             # Get optional transitive path attributes
1218             unknown_opttrans_attrs = bgp_utils.get_unknown_opttrans_attr(path)
1219
1220             # Ordering path attributes according to type as RFC says. We set
1221             # MPReachNLRI first as advised by experts as a new trend in BGP
1222             # implementation.
1223             if isinstance(path, Ipv4Path):
1224                 new_pathattr.append(nexthop_attr)
1225             else:
1226                 new_pathattr.append(mpnlri_attr)
1227
1228             new_pathattr.append(origin_attr)
1229             new_pathattr.append(as_path_attr)
1230             if as4_path_attr:
1231                 new_pathattr.append(as4_path_attr)
1232             if aggregator_attr:
1233                 new_pathattr.append(aggregator_attr)
1234             if as4_aggregator_attr:
1235                 new_pathattr.append(as4_aggregator_attr)
1236             if multi_exit_disc:
1237                 new_pathattr.append(multi_exit_disc)
1238             if localpref_attr:
1239                 new_pathattr.append(localpref_attr)
1240             if community_attr:
1241                 new_pathattr.append(community_attr)
1242             if extcomm_attr:
1243                 new_pathattr.append(extcomm_attr)
1244             if pmsi_tunnel_attr:
1245                 new_pathattr.append(pmsi_tunnel_attr)
1246             if unknown_opttrans_attrs:
1247                 new_pathattr.extend(unknown_opttrans_attrs.values())
1248
1249         if isinstance(path, Ipv4Path):
1250             update = BGPUpdate(path_attributes=new_pathattr,
1251                                nlri=nlri_list)
1252         else:
1253             update = BGPUpdate(path_attributes=new_pathattr)
1254         return update
1255
1256     def _connect_loop(self, client_factory):
1257         """In the current greenlet we try to establish connection with peer.
1258
1259         This greenlet will spin another greenlet to handle incoming data
1260         from the peer once connection is established.
1261         """
1262         # If current configuration allow, enable active session establishment.
1263         if self._neigh_conf.enabled:
1264             self._connect_retry_event.set()
1265
1266         while True:
1267             self._connect_retry_event.wait()
1268
1269             # Reconnecting immediately after closing connection may be not very
1270             # well seen by some peers (ALU?)
1271             self.pause(1.0)
1272             if self.state.bgp_state in \
1273                     (const.BGP_FSM_IDLE, const.BGP_FSM_ACTIVE):
1274
1275                 # Check if we have to stop or retry
1276                 self.state.bgp_state = const.BGP_FSM_CONNECT
1277                 # If we have specific host interface to bind to, we will do so
1278                 # else we will bind to system default.
1279                 if self._neigh_conf.host_bind_ip and \
1280                         self._neigh_conf.host_bind_port:
1281                     bind_addr = (self._neigh_conf.host_bind_ip,
1282                                  self._neigh_conf.host_bind_port)
1283                 else:
1284                     bind_addr = None
1285                 peer_address = (self._neigh_conf.ip_address,
1286                                 self._neigh_conf.port)
1287
1288                 if bind_addr:
1289                     LOG.debug('%s trying to connect from'
1290                               '%s to %s', self, bind_addr, peer_address)
1291                 else:
1292                     LOG.debug('%s trying to connect to %s', self, peer_address)
1293                 tcp_conn_timeout = self._common_conf.tcp_conn_timeout
1294                 try:
1295                     password = self._neigh_conf.password
1296                     self._connect_tcp(peer_address,
1297                                       client_factory,
1298                                       time_out=tcp_conn_timeout,
1299                                       bind_address=bind_addr,
1300                                       password=password)
1301                 except socket.error:
1302                     self.state.bgp_state = const.BGP_FSM_ACTIVE
1303                     if LOG.isEnabledFor(logging.DEBUG):
1304                         LOG.debug('Socket could not be created in time'
1305                                   ' (%s secs), reason %s', tcp_conn_timeout,
1306                                   traceback.format_exc())
1307                     LOG.info('Will try to reconnect to %s after %s secs: %s',
1308                              self._neigh_conf.ip_address,
1309                              self._common_conf.bgp_conn_retry_time,
1310                              self._connect_retry_event.is_set())
1311
1312             self.pause(self._common_conf.bgp_conn_retry_time)
1313
1314     def _set_protocol(self, proto):
1315         self._protocol = proto
1316
1317         # Update state attributes
1318         self.state.peer_ip, self.state.peer_port = self._protocol._remotename
1319         self.state.local_ip, self.state.local_port = self._protocol._localname
1320 #         self.state.bgp_state = self._protocol.state
1321         # Stop connect_loop retry timer as we are now connected
1322         if self._protocol and self._connect_retry_event.is_set():
1323             self._connect_retry_event.clear()
1324             LOG.debug('Connect retry event for %s is cleared', self)
1325
1326         if self._protocol and self.outgoing_msg_event.is_set():
1327             # Start processing sink.
1328             self.outgoing_msg_event.set()
1329             LOG.debug('Processing of outgoing msg. started for %s.', self)
1330
1331     def _send_collision_err_and_stop(self, protocol):
1332         code = BGP_ERROR_CEASE
1333         subcode = BGP_ERROR_SUB_CONNECTION_COLLISION_RESOLUTION
1334         self._signal_bus.bgp_error(self, code, subcode, None)
1335         protocol.send_notification(code, subcode)
1336         protocol.stop()
1337
1338     def bind_protocol(self, proto):
1339         """Tries to bind given protocol to this peer.
1340
1341         Should only be called by `proto` trying to bind.
1342         Once bound this protocol instance will be used to communicate with
1343         peer. If another protocol is already bound, connection collision
1344         resolution takes place.
1345         """
1346         LOG.debug('Trying to bind protocol %s to peer %s', proto, self)
1347         # Validate input.
1348         if not isinstance(proto, BgpProtocol):
1349             raise ValueError('Currently only supports valid instances of'
1350                              ' `BgpProtocol`')
1351
1352         if proto.state != const.BGP_FSM_OPEN_CONFIRM:
1353             raise ValueError('Only protocols in OpenConfirm state can be'
1354                              ' bound')
1355
1356         # If we are not bound to any protocol
1357         is_bound = False
1358         if not self._protocol:
1359             self._set_protocol(proto)
1360             is_bound = True
1361         else:
1362             # If existing protocol is already established, we raise exception.
1363             if self.state.bgp_state != const.BGP_FSM_IDLE:
1364                 LOG.debug('Currently in %s state, hence will send collision'
1365                           ' Notification to close this protocol.',
1366                           self.state.bgp_state)
1367                 self._send_collision_err_and_stop(proto)
1368                 return
1369
1370             # If we have a collision that need to be resolved
1371             assert proto.is_colliding(self._protocol), \
1372                 ('Tried to bind second protocol that is not colliding with '
1373                  'first/bound protocol')
1374             LOG.debug('Currently have one protocol in %s state and '
1375                       'another protocol in %s state',
1376                       self._protocol.state, proto.state)
1377             # Protocol that is already bound
1378             first_protocol = self._protocol
1379             assert ((first_protocol.is_reactive and not proto.is_reactive) or
1380                     (proto.is_reactive and not first_protocol.is_reactive))
1381             # Connection initiated by peer.
1382             reactive_proto = None
1383             # Connection initiated locally.
1384             proactive_proto = None
1385             # Identify which protocol was initiated by which peer.
1386             if proto.is_reactive:
1387                 reactive_proto = proto
1388                 proactive_proto = self._protocol
1389             else:
1390                 reactive_proto = self._protocol
1391                 proactive_proto = proto
1392
1393             LOG.debug('Pro-active/Active protocol %s', proactive_proto)
1394             # We compare bgp local and remote router id and keep the protocol
1395             # that was initiated by peer with highest id.
1396             if proto.is_local_router_id_greater():
1397                 self._set_protocol(proactive_proto)
1398             else:
1399                 self._set_protocol(reactive_proto)
1400
1401             if self._protocol is not proto:
1402                 # If new proto did not win collision we return False to
1403                 # indicate this.
1404                 is_bound = False
1405             else:
1406                 # If first protocol did not win collision resolution we
1407                 # we send notification to peer and stop it
1408                 self._send_collision_err_and_stop(first_protocol)
1409                 is_bound = True
1410
1411         return is_bound
1412
1413     def create_open_msg(self):
1414         """Create `Open` message using current settings.
1415
1416         Current setting include capabilities, timers and ids.
1417         """
1418         asnum = self.local_as
1419         # If local AS number is not Two-Octet AS number, swaps with AS_TRANS.
1420         if not is_valid_old_asn(asnum):
1421             asnum = bgp.AS_TRANS
1422         bgpid = self._common_conf.router_id
1423         holdtime = self._neigh_conf.hold_time
1424
1425         def flatten(L):
1426             if isinstance(L, list):
1427                 for i in range(len(L)):
1428                     for e in flatten(L[i]):
1429                         yield e
1430             else:
1431                 yield L
1432         opts = list(flatten(
1433             list(self._neigh_conf.get_configured_capabilities().values())))
1434         open_msg = BGPOpen(
1435             my_as=asnum,
1436             bgp_identifier=bgpid,
1437             version=const.BGP_VERSION_NUM,
1438             hold_time=holdtime,
1439             opt_param=opts
1440         )
1441         return open_msg
1442
1443     def _validate_update_msg(self, update_msg):
1444         """Validate update message as per RFC.
1445
1446         Here we validate the message after it has been parsed. Message
1447         has already been validated against some errors inside parsing
1448         library.
1449         """
1450         # TODO(PH): finish providing implementation, currently low priority
1451         assert update_msg.type == BGP_MSG_UPDATE
1452         # An UPDATE message may be received only in the Established state.
1453         # Receiving an UPDATE message in any other state is an error.
1454         if self.state.bgp_state != const.BGP_FSM_ESTABLISHED:
1455             LOG.error('Received UPDATE message when not in ESTABLISHED'
1456                       ' state.')
1457             raise bgp.FiniteStateMachineError()
1458
1459         mp_reach_attr = update_msg.get_path_attr(
1460             BGP_ATTR_TYPE_MP_REACH_NLRI
1461         )
1462         mp_unreach_attr = update_msg.get_path_attr(
1463             BGP_ATTR_TYPE_MP_UNREACH_NLRI
1464         )
1465
1466         # non-MPBGP Update msg.
1467         if not (mp_reach_attr or mp_unreach_attr):
1468             if not self.is_mpbgp_cap_valid(RF_IPv4_UC):
1469                 LOG.error('Got UPDATE message with un-available'
1470                           ' afi/safi %s', RF_IPv4_UC)
1471             nlri_list = update_msg.nlri
1472             if len(nlri_list) > 0:
1473                 # Check for missing well-known mandatory attributes.
1474                 aspath = update_msg.get_path_attr(BGP_ATTR_TYPE_AS_PATH)
1475                 if not aspath:
1476                     raise bgp.MissingWellKnown(
1477                         BGP_ATTR_TYPE_AS_PATH)
1478
1479                 if (self.check_first_as and self.is_ebgp_peer() and
1480                         not aspath.has_matching_leftmost(self.remote_as)):
1481                     LOG.error('First AS check fails. Raise appropriate'
1482                               ' exception.')
1483                     raise bgp.MalformedAsPath()
1484
1485                 origin = update_msg.get_path_attr(BGP_ATTR_TYPE_ORIGIN)
1486                 if not origin:
1487                     raise bgp.MissingWellKnown(BGP_ATTR_TYPE_ORIGIN)
1488
1489                 nexthop = update_msg.get_path_attr(BGP_ATTR_TYPE_NEXT_HOP)
1490                 if not nexthop:
1491                     raise bgp.MissingWellKnown(BGP_ATTR_TYPE_NEXT_HOP)
1492
1493             return True
1494
1495         # Check if received MP_UNREACH path attribute is of available afi/safi
1496         if mp_unreach_attr:
1497             if not self.is_mpbgp_cap_valid(mp_unreach_attr.route_family):
1498                 LOG.error('Got UPDATE message with un-available afi/safi for'
1499                           ' MP_UNREACH path attribute (non-negotiated'
1500                           ' afi/safi) %s', mp_unreach_attr.route_family)
1501                 # raise bgp.OptAttrError()
1502
1503         if mp_reach_attr:
1504             # Check if received MP_REACH path attribute is of available
1505             # afi/safi
1506             if not self.is_mpbgp_cap_valid(mp_reach_attr.route_family):
1507                 LOG.error('Got UPDATE message with un-available afi/safi for'
1508                           ' MP_UNREACH path attribute (non-negotiated'
1509                           ' afi/safi) %s', mp_reach_attr.route_family)
1510                 # raise bgp.OptAttrError()
1511
1512             # Check for missing well-known mandatory attributes.
1513             aspath = update_msg.get_path_attr(BGP_ATTR_TYPE_AS_PATH)
1514             if not aspath:
1515                 raise bgp.MissingWellKnown(BGP_ATTR_TYPE_AS_PATH)
1516
1517             if (self.check_first_as and self.is_ebgp_peer() and
1518                     not aspath.has_matching_leftmost(self.remote_as)):
1519                 LOG.error('First AS check fails. Raise appropriate exception.')
1520                 raise bgp.MalformedAsPath()
1521
1522             origin = update_msg.get_path_attr(BGP_ATTR_TYPE_ORIGIN)
1523             if not origin:
1524                 raise bgp.MissingWellKnown(BGP_ATTR_TYPE_ORIGIN)
1525
1526             # Validate Next hop.
1527             if mp_reach_attr.route_family.safi in (
1528                     subaddr_family.IP_FLOWSPEC,
1529                     subaddr_family.VPN_FLOWSPEC):
1530                 # Because the Flow Specification does not have nexthop,
1531                 # skips check.
1532                 pass
1533             elif (not mp_reach_attr.next_hop or
1534                   mp_reach_attr.next_hop == self.host_bind_ip):
1535                 LOG.error('Nexthop of received UPDATE msg. (%s) same as local'
1536                           ' interface address %s.',
1537                           mp_reach_attr.next_hop,
1538                           self.host_bind_ip)
1539                 return False
1540
1541         return True
1542
1543     def _handle_update_msg(self, update_msg):
1544         """Extracts and processes new paths or withdrawals in given
1545          `update_msg`.
1546
1547         Parameter:
1548             - `update_msg`: update message to process.
1549             - `valid_rts`: current valid/interesting rts to the application
1550             according to configuration of all VRFs.
1551         Assumes Multiprotocol Extensions capability is supported and enabled.
1552         """
1553         assert self.state.bgp_state == const.BGP_FSM_ESTABLISHED
1554
1555         # Increment count of update received.
1556         self.state.incr(PeerCounterNames.RECV_UPDATES)
1557
1558         if not self._validate_update_msg(update_msg):
1559             # If update message was not valid for some reason, we ignore its
1560             # routes.
1561             LOG.error('UPDATE message was invalid, hence ignoring its routes.')
1562             return
1563
1564         # Extract advertised path attributes and reconstruct AS_PATH attribute
1565         self._extract_and_reconstruct_as_path(update_msg)
1566
1567         # Check if path attributes have loops.
1568         if self._is_looped_path_attrs(update_msg):
1569             return
1570
1571         umsg_pattrs = update_msg.pathattr_map
1572         mp_reach_attr = umsg_pattrs.get(BGP_ATTR_TYPE_MP_REACH_NLRI, None)
1573         if mp_reach_attr:
1574             # Extract advertised MP-BGP paths from given message.
1575             self._extract_and_handle_mpbgp_new_paths(update_msg)
1576
1577         mp_unreach_attr = umsg_pattrs.get(BGP_ATTR_TYPE_MP_UNREACH_NLRI, None)
1578         if mp_unreach_attr:
1579             # Extract MP-BGP withdraws from given message.
1580             self._extract_and_handle_mpbgp_withdraws(mp_unreach_attr)
1581
1582         nlri_list = update_msg.nlri
1583         if nlri_list:
1584             # Extract advertised BGP paths from given message.
1585             self._extract_and_handle_bgp4_new_paths(update_msg)
1586
1587         withdraw_list = update_msg.withdrawn_routes
1588         if withdraw_list:
1589             # Extract BGP withdraws from given message.
1590             self._extract_and_handle_bgp4_withdraws(withdraw_list)
1591
1592     def _extract_and_reconstruct_as_path(self, update_msg):
1593         """Extracts advertised AS path attributes in the given update message
1594         and reconstructs AS_PATH from AS_PATH and AS4_PATH if needed."""
1595         umsg_pattrs = update_msg.pathattr_map
1596
1597         as_aggregator = umsg_pattrs.get(BGP_ATTR_TYPE_AGGREGATOR, None)
1598         as4_aggregator = umsg_pattrs.get(BGP_ATTR_TYPE_AS4_AGGREGATOR, None)
1599         if as_aggregator and as4_aggregator:
1600             # When both AGGREGATOR and AS4_AGGREGATOR are received,
1601             # if the AS number in the AGGREGATOR attribute is not AS_TRANS,
1602             # then:
1603             #  -  the AS4_AGGREGATOR attribute and the AS4_PATH attribute SHALL
1604             #     be ignored,
1605             #  -  the AGGREGATOR attribute SHALL be taken as the information
1606             #     about the aggregating node, and
1607             #  -  the AS_PATH attribute SHALL be taken as the AS path
1608             #     information.
1609             if as_aggregator.as_number != bgp.AS_TRANS:
1610                 update_msg.path_attributes.remove(as4_aggregator)
1611                 as4_path = umsg_pattrs.pop(BGP_ATTR_TYPE_AS4_PATH, None)
1612                 if as4_path:
1613                     update_msg.path_attributes.remove(as4_path)
1614             # Otherwise,
1615             #  -  the AGGREGATOR attribute SHALL be ignored,
1616             #  -  the AS4_AGGREGATOR attribute SHALL be taken as the
1617             #     information about the aggregating node, and
1618             #  -  the AS path information would need to be constructed,
1619             #     as in all other cases.
1620             else:
1621                 update_msg.path_attributes.remove(as_aggregator)
1622                 update_msg.path_attributes.remove(as4_aggregator)
1623                 update_msg.path_attributes.append(
1624                     bgp.BGPPathAttributeAggregator(
1625                         as_number=as4_aggregator.as_number,
1626                         addr=as4_aggregator.addr,
1627                     )
1628                 )
1629
1630         as_path = umsg_pattrs.get(BGP_ATTR_TYPE_AS_PATH, None)
1631         as4_path = umsg_pattrs.get(BGP_ATTR_TYPE_AS4_PATH, None)
1632         if as_path and as4_path:
1633             # If the number of AS numbers in the AS_PATH attribute is
1634             # less than the number of AS numbers in the AS4_PATH attribute,
1635             # then the AS4_PATH attribute SHALL be ignored, and the AS_PATH
1636             # attribute SHALL be taken as the AS path information.
1637             if as_path.get_as_path_len() < as4_path.get_as_path_len():
1638                 update_msg.path_attributes.remove(as4_path)
1639
1640             # If the number of AS numbers in the AS_PATH attribute is larger
1641             # than or equal to the number of AS numbers in the AS4_PATH
1642             # attribute, then the AS path information SHALL be constructed
1643             # by taking as many AS numbers and path segments as necessary
1644             # from the leading part of the AS_PATH attribute, and then
1645             # prepending them to the AS4_PATH attribute so that the AS path
1646             # information has a number of AS numbers identical to that of
1647             # the AS_PATH attribute.
1648             else:
1649                 update_msg.path_attributes.remove(as_path)
1650                 update_msg.path_attributes.remove(as4_path)
1651                 as_path = self._construct_as_path_attr(as_path, as4_path)
1652                 update_msg.path_attributes.append(as_path)
1653
1654     def _is_looped_path_attrs(self, update_msg):
1655         """
1656         Extracts path attributes from the given UPDATE message and checks
1657         if the given attributes have loops or not.
1658
1659         :param update_msg: UPDATE message instance.
1660         :return: True if attributes have loops. Otherwise False.
1661         """
1662         umsg_pattrs = update_msg.pathattr_map
1663         recv_open_msg = self.protocol.recv_open_msg
1664
1665         # Check if AS_PATH has loops.
1666         aspath = umsg_pattrs.get(BGP_ATTR_TYPE_AS_PATH)
1667         if (aspath is not None
1668                 and aspath.has_local_as(
1669                     self.local_as,
1670                     max_count=self._common_conf.allow_local_as_in_count)):
1671             LOG.error(
1672                 'AS_PATH on UPDATE message has loops. '
1673                 'Ignoring this message: %s',
1674                 update_msg)
1675             return
1676
1677         # Check if ORIGINATOR_ID has loops. [RFC4456]
1678         originator_id = umsg_pattrs.get(BGP_ATTR_TYPE_ORIGINATOR_ID, None)
1679         if (originator_id
1680                 and recv_open_msg.bgp_identifier == originator_id):
1681             LOG.error(
1682                 'ORIGINATOR_ID on UPDATE message has loops. '
1683                 'Ignoring this message: %s',
1684                 update_msg)
1685             return
1686
1687         # Check if CLUSTER_LIST has loops. [RFC4456]
1688         cluster_list = umsg_pattrs.get(BGP_ATTR_TYPE_CLUSTER_LIST, None)
1689         if (cluster_list
1690                 and self._common_conf.cluster_id in cluster_list.value):
1691             LOG.error(
1692                 'CLUSTER_LIST on UPDATE message has loops. '
1693                 'Ignoring this message: %s', update_msg)
1694             return
1695
1696     def _extract_and_handle_bgp4_new_paths(self, update_msg):
1697         """Extracts new paths advertised in the given update message's
1698          *MpReachNlri* attribute.
1699
1700         Assumes MPBGP capability is enabled and message was validated.
1701         Parameters:
1702             - update_msg: (Update) is assumed to be checked for all bgp
1703             message errors.
1704             - valid_rts: (iterable) current valid/configured RTs.
1705
1706         Extracted paths are added to appropriate *Destination* for further
1707         processing.
1708         """
1709         umsg_pattrs = update_msg.pathattr_map
1710         next_hop = update_msg.get_path_attr(BGP_ATTR_TYPE_NEXT_HOP).value
1711
1712         # Nothing to do if we do not have any new NLRIs in this message.
1713         msg_nlri_list = update_msg.nlri
1714         if not msg_nlri_list:
1715             LOG.debug('Update message did not have any new MP_REACH_NLRIs.')
1716             return
1717
1718         # Create path instances for each NLRI from the update message.
1719         for msg_nlri in msg_nlri_list:
1720             LOG.debug('NLRI: %s', msg_nlri)
1721             new_path = bgp_utils.create_path(
1722                 self,
1723                 msg_nlri,
1724                 pattrs=umsg_pattrs,
1725                 nexthop=next_hop
1726             )
1727             LOG.debug('Extracted paths from Update msg.: %s', new_path)
1728
1729             block, blocked_cause = self._apply_in_filter(new_path)
1730
1731             nlri_str = new_path.nlri.formatted_nlri_str
1732             received_route = ReceivedRoute(new_path, self, block)
1733             self._adj_rib_in[nlri_str] = received_route
1734             self._signal_bus.adj_rib_in_changed(self, received_route)
1735
1736             if not block:
1737                 # Update appropriate table with new paths.
1738                 tm = self._core_service.table_manager
1739                 tm.learn_path(new_path)
1740             else:
1741                 LOG.debug('prefix : %s is blocked by in-bound filter: %s',
1742                           msg_nlri, blocked_cause)
1743
1744         # If update message had any qualifying new paths, do some book-keeping.
1745         if msg_nlri_list:
1746             # Update prefix statistics.
1747             self.state.incr(PeerCounterNames.RECV_PREFIXES,
1748                             incr_by=len(msg_nlri_list))
1749             # Check if we exceed max. prefixes allowed for this neighbor.
1750             if self._neigh_conf.exceeds_max_prefix_allowed(
1751                     self.state.get_count(PeerCounterNames.RECV_PREFIXES)):
1752                 LOG.error('Max. prefix allowed for this neighbor '
1753                           'exceeded.')
1754
1755     def _extract_and_handle_bgp4_withdraws(self, withdraw_list):
1756         """Extracts withdraws advertised in the given update message's
1757          *MpUnReachNlri* attribute.
1758
1759         Assumes MPBGP capability is enabled.
1760         Parameters:
1761             - update_msg: (Update) is assumed to be checked for all bgp
1762             message errors.
1763
1764         Extracted withdraws are added to appropriate *Destination* for further
1765         processing.
1766         """
1767         msg_rf = RF_IPv4_UC
1768         w_nlris = withdraw_list
1769         if not w_nlris:
1770             # If this is EOR of some kind, handle it
1771             self._handle_eor(msg_rf)
1772
1773         for w_nlri in w_nlris:
1774             w_path = bgp_utils.create_path(
1775                 self,
1776                 w_nlri,
1777                 is_withdraw=True
1778             )
1779
1780             block, blocked_cause = self._apply_in_filter(w_path)
1781
1782             received_route = ReceivedRoute(w_path, self, block)
1783             nlri_str = w_nlri.formatted_nlri_str
1784
1785             if nlri_str in self._adj_rib_in:
1786                 del self._adj_rib_in[nlri_str]
1787                 self._signal_bus.adj_rib_in_changed(self, received_route)
1788
1789             if not block:
1790                 # Update appropriate table with withdraws.
1791                 tm = self._core_service.table_manager
1792                 tm.learn_path(w_path)
1793             else:
1794                 LOG.debug('prefix : %s is blocked by in-bound filter: %s',
1795                           nlri_str, blocked_cause)
1796
1797     def _extract_and_handle_mpbgp_new_paths(self, update_msg):
1798         """Extracts new paths advertised in the given update message's
1799          *MpReachNlri* attribute.
1800
1801         Assumes MPBGP capability is enabled and message was validated.
1802         Parameters:
1803             - update_msg: (Update) is assumed to be checked for all bgp
1804             message errors.
1805             - valid_rts: (iterable) current valid/configured RTs.
1806
1807         Extracted paths are added to appropriate *Destination* for further
1808         processing.
1809         """
1810         umsg_pattrs = update_msg.pathattr_map
1811         mpreach_nlri_attr = umsg_pattrs.get(BGP_ATTR_TYPE_MP_REACH_NLRI)
1812         assert mpreach_nlri_attr
1813
1814         msg_rf = mpreach_nlri_attr.route_family
1815         # Check if this route family is among supported route families.
1816         if msg_rf not in SUPPORTED_GLOBAL_RF:
1817             LOG.info(('Received route for route family %s which is'
1818                       ' not supported. Ignoring paths from this UPDATE: %s') %
1819                      (msg_rf, update_msg))
1820             return
1821
1822         if msg_rf in (RF_IPv4_VPN, RF_IPv6_VPN):
1823             # Check if we have Extended Communities Attribute.
1824             # TODO(PH): Check if RT_NLRI afi/safi will ever have this attribute
1825             ext_comm_attr = umsg_pattrs.get(BGP_ATTR_TYPE_EXTENDED_COMMUNITIES)
1826             # Check if we have at-least one RT is of interest to us.
1827             if not ext_comm_attr:
1828                 LOG.info('Missing Extended Communities Attribute. '
1829                          'Ignoring paths from this UPDATE: %s', update_msg)
1830                 return
1831
1832             msg_rts = ext_comm_attr.rt_list
1833             # If we do not have any RTs associated with this msg., we do not
1834             # extract any paths.
1835             if not msg_rts:
1836                 LOG.info('Received route with no RTs. Ignoring paths in this'
1837                          ' UPDATE: %s', update_msg)
1838                 return
1839
1840             # If none of the RTs in the message are of interest, we do not
1841             # extract any paths.
1842             interested_rts = self._core_service.global_interested_rts
1843             if not interested_rts.intersection(msg_rts):
1844                 LOG.info('Received route with RT %s that is of no interest to'
1845                          ' any VRFs or Peers %s.'
1846                          ' Ignoring paths from this UPDATE: %s',
1847                          msg_rts, interested_rts, update_msg)
1848                 return
1849
1850         next_hop = mpreach_nlri_attr.next_hop
1851
1852         # Nothing to do if we do not have any new NLRIs in this message.
1853         msg_nlri_list = mpreach_nlri_attr.nlri
1854         if not msg_nlri_list:
1855             LOG.debug('Update message did not have any new MP_REACH_NLRIs.')
1856             return
1857
1858         # Create path instances for each NLRI from the update message.
1859         for msg_nlri in msg_nlri_list:
1860             new_path = bgp_utils.create_path(
1861                 self,
1862                 msg_nlri,
1863                 pattrs=umsg_pattrs,
1864                 nexthop=next_hop
1865             )
1866             LOG.debug('Extracted paths from Update msg.: %s', new_path)
1867
1868             block, blocked_cause = self._apply_in_filter(new_path)
1869
1870             received_route = ReceivedRoute(new_path, self, block)
1871             nlri_str = msg_nlri.formatted_nlri_str
1872             self._adj_rib_in[nlri_str] = received_route
1873             self._signal_bus.adj_rib_in_changed(self, received_route)
1874
1875             if not block:
1876                 if msg_rf == RF_RTC_UC \
1877                         and self._init_rtc_nlri_path is not None:
1878                     self._init_rtc_nlri_path.append(new_path)
1879                 else:
1880                     # Update appropriate table with new paths.
1881                     tm = self._core_service.table_manager
1882                     tm.learn_path(new_path)
1883             else:
1884                 LOG.debug('prefix : %s is blocked by in-bound filter: %s',
1885                           msg_nlri, blocked_cause)
1886
1887         # If update message had any qualifying new paths, do some book-keeping.
1888         if msg_nlri_list:
1889             # Update prefix statistics.
1890             self.state.incr(PeerCounterNames.RECV_PREFIXES,
1891                             incr_by=len(msg_nlri_list))
1892             # Check if we exceed max. prefixes allowed for this neighbor.
1893             if self._neigh_conf.exceeds_max_prefix_allowed(
1894                     self.state.get_count(PeerCounterNames.RECV_PREFIXES)):
1895                 LOG.error('Max. prefix allowed for this neighbor '
1896                           'exceeded.')
1897
1898     def _extract_and_handle_mpbgp_withdraws(self, mp_unreach_attr):
1899         """Extracts withdraws advertised in the given update message's
1900          *MpUnReachNlri* attribute.
1901
1902         Assumes MPBGP capability is enabled.
1903         Parameters:
1904             - update_msg: (Update) is assumed to be checked for all bgp
1905             message errors.
1906
1907         Extracted withdraws are added to appropriate *Destination* for further
1908         processing.
1909         """
1910         msg_rf = mp_unreach_attr.route_family
1911         # Check if this route family is among supported route families.
1912         if msg_rf not in SUPPORTED_GLOBAL_RF:
1913             LOG.info(
1914                 'Received route family %s is not supported. '
1915                 'Ignoring withdraw routes on this UPDATE message.',
1916                 msg_rf)
1917             return
1918
1919         w_nlris = mp_unreach_attr.withdrawn_routes
1920         if not w_nlris:
1921             # If this is EOR of some kind, handle it
1922             self._handle_eor(msg_rf)
1923
1924         for w_nlri in w_nlris:
1925             w_path = bgp_utils.create_path(
1926                 self,
1927                 w_nlri,
1928                 is_withdraw=True
1929             )
1930             block, blocked_cause = self._apply_in_filter(w_path)
1931
1932             received_route = ReceivedRoute(w_path, self, block)
1933             nlri_str = w_nlri.formatted_nlri_str
1934
1935             if nlri_str in self._adj_rib_in:
1936                 del self._adj_rib_in[nlri_str]
1937                 self._signal_bus.adj_rib_in_changed(self, received_route)
1938
1939             if not block:
1940                 # Update appropriate table with withdraws.
1941                 tm = self._core_service.table_manager
1942                 tm.learn_path(w_path)
1943             else:
1944                 LOG.debug('prefix : %s is blocked by in-bound filter: %s',
1945                           w_nlri, blocked_cause)
1946
1947     def _handle_eor(self, route_family):
1948         """Currently we only handle EOR for RTC address-family.
1949
1950         We send non-rtc initial updates if not already sent.
1951         """
1952         LOG.debug('Handling EOR for %s', route_family)
1953 #         assert (route_family in SUPPORTED_GLOBAL_RF)
1954 #         assert self.is_mbgp_cap_valid(route_family)
1955
1956         if route_family == RF_RTC_UC:
1957             self._unschedule_sending_init_updates()
1958
1959             # Learn all rt_nlri at the same time As RT are learned and RT
1960             # filter get updated, qualifying NLRIs are automatically sent to
1961             # peer including initial update
1962             tm = self._core_service.table_manager
1963             for rt_nlri in self._init_rtc_nlri_path:
1964                 tm.learn_path(rt_nlri)
1965                 # Give chance to process new RT_NLRI so that we have updated RT
1966                 # filter for all peer including this peer before we communicate
1967                 # NLRIs for other address-families
1968                 self.pause(0)
1969             # Clear collection of initial RTs as we no longer need to wait for
1970             # EOR for RT NLRIs and to indicate that new RT NLRIs should be
1971             # handled in a regular fashion
1972             self._init_rtc_nlri_path = None
1973
1974     def handle_msg(self, msg):
1975         """BGP message handler.
1976
1977         BGP message handling is shared between protocol instance and peer. Peer
1978         only handles limited messages under suitable state. Here we handle
1979         KEEPALIVE, UPDATE and ROUTE_REFRESH messages. UPDATE and ROUTE_REFRESH
1980         messages are handled only after session is established.
1981         """
1982         if msg.type == BGP_MSG_KEEPALIVE:
1983             # If we receive a Keep Alive message in open_confirm state, we
1984             # transition to established state.
1985             if self.state.bgp_state == const.BGP_FSM_OPEN_CONFIRM:
1986                 self.state.bgp_state = const.BGP_FSM_ESTABLISHED
1987                 self._enqueue_init_updates()
1988
1989         elif msg.type == BGP_MSG_UPDATE:
1990             assert self.state.bgp_state == const.BGP_FSM_ESTABLISHED
1991             # Will try to process this UDPATE message further
1992             self._handle_update_msg(msg)
1993
1994         elif msg.type == BGP_MSG_ROUTE_REFRESH:
1995             # If its route-refresh message
1996             assert self.state.bgp_state == const.BGP_FSM_ESTABLISHED
1997             self._handle_route_refresh_msg(msg)
1998
1999         else:
2000             # Open/Notification messages are currently handled by protocol and
2001             # nothing is done inside peer, so should not see them here.
2002             raise ValueError('Peer does not support handling of %s'
2003                              ' message during %s state' %
2004                              (msg, self.state.bgp_state))
2005
2006     def _handle_err_sor_msg(self, afi, safi):
2007         # Check if ERR capability is enabled for this peer.
2008         if not self._protocol.is_enhanced_rr_cap_valid():
2009             LOG.error('Received Start-of-RIB (SOR) even though ERR is not'
2010                       ' enabled')
2011             return
2012
2013         # Increment the version number of this peer so that we can identify
2014         # inconsistencies/stale routes.
2015         self.version_num += 1
2016
2017         # Check if refresh_stalepath_time is enabled.
2018         rst = self._common_conf.refresh_stalepath_time
2019         if rst != 0:
2020             # Set a timer to clean the stale paths at configured time.
2021             # Clean/track inconsistent/stale routes.
2022             route_family = RouteFamily(afi, safi)
2023             if route_family in SUPPORTED_GLOBAL_RF:
2024                 self._refresh_stalepath_timer = self._spawn_after(
2025                     'err-refresh-stale-path-timer', rst,
2026                     self._core_service.table_manager.clean_stale_routes, self,
2027                     route_family)
2028                 LOG.debug('Refresh Stale Path timer set (%s sec).', rst)
2029
2030     def _handle_route_refresh_msg(self, msg):
2031         afi = msg.afi
2032         safi = msg.safi
2033         demarcation = msg.demarcation
2034
2035         # If this normal route-refresh request.
2036         if demarcation == 0:
2037             self._handle_route_refresh_req(afi, safi)
2038
2039         # If this is start of RIB (SOR) message.
2040         elif demarcation == 1:
2041             self._handle_err_sor_msg(afi, safi)
2042
2043         # If this is end of RIB (EOR) message.
2044         elif demarcation == 2:
2045             # Clean/track inconsistent/stale routes.
2046             route_family = RouteFamily(afi, safi)
2047             if route_family in SUPPORTED_GLOBAL_RF:
2048                 tm = self._core_service.table_manager
2049                 tm.clean_stale_routes(self, route_family)
2050
2051         else:
2052             LOG.error('Route refresh message has invalid demarcation %s',
2053                       demarcation)
2054
2055     def _handle_route_refresh_req(self, afi, safi):
2056         rr_af = get_rf(afi, safi)
2057         self.state.incr(PeerCounterNames.RECV_REFRESH)
2058
2059         # Check if peer has asked for route-refresh for af that was advertised
2060         if not self._protocol.is_route_family_adv(rr_af):
2061             LOG.info('Peer asked for route - refresh for un - advertised '
2062                      'address - family %s', rr_af)
2063             return
2064
2065         self._fire_route_refresh(rr_af)
2066
2067     def _fire_route_refresh(self, af):
2068         # Check if enhanced route refresh is enabled/valid.
2069         sor = None
2070         if self._protocol.is_enhanced_rr_cap_valid():
2071             # If enhanced route-refresh is valid/enabled, enqueue SOR.
2072             afi = af.afi
2073             safi = af.safi
2074             sor = BGPRouteRefresh(afi, safi, demarcation=1)
2075             self.enque_first_outgoing_msg(sor)
2076
2077         # Ask core to re-send sent routes
2078         self._peer_manager.resend_sent(af, self)
2079
2080         # If enhanced route-refresh is valid/enabled, then we enqueue EOR.
2081         if sor is not None:
2082             self._enqueue_eor_msg(sor)
2083
2084     def _enqueue_eor_msg(self, sor):
2085         """Enqueues Enhanced RR EOR if for given SOR a EOR is not already
2086         sent.
2087         """
2088         if self._protocol.is_enhanced_rr_cap_valid() and not sor.eor_sent:
2089             afi = sor.afi
2090             safi = sor.safi
2091             eor = BGPRouteRefresh(afi, safi, demarcation=2)
2092             self.enque_outgoing_msg(eor)
2093             sor.eor_sent = True
2094
2095     def _schedule_sending_init_updates(self):
2096         """Setup timer for sending best-paths for all other address-families
2097         that qualify.
2098
2099         Setup timer for sending initial updates to peer.
2100         """
2101
2102         def _enqueue_non_rtc_init_updates():
2103             LOG.debug('Scheduled queuing of initial Non-RTC UPDATEs')
2104             tm = self._core_service.table_manager
2105             self.comm_all_best_paths(tm.global_tables)
2106             self._sent_init_non_rtc_update = True
2107             # Stop the timer as we have handled RTC EOR
2108             self._rtc_eor_timer.stop()
2109             self._rtc_eor_timer = None
2110
2111         self._sent_init_non_rtc_update = False
2112         self._rtc_eor_timer = self._create_timer(
2113             Peer.RTC_EOR_TIMER_NAME,
2114             _enqueue_non_rtc_init_updates
2115         )
2116         # Start timer for sending initial updates
2117         self._rtc_eor_timer.start(const.RTC_EOR_DEFAULT_TIME, now=False)
2118         LOG.debug('Scheduled sending of initial Non-RTC UPDATEs after:'
2119                   ' %s sec', const.RTC_EOR_DEFAULT_TIME)
2120
2121     def _unschedule_sending_init_updates(self):
2122         """Un-schedules sending of initial updates
2123
2124         Stops the timer if set for sending initial updates.
2125         Returns:
2126             - True if timer was stopped
2127             - False if timer was already stopped and nothing was done
2128         """
2129         LOG.debug('Un-scheduling sending of initial Non-RTC UPDATEs'
2130                   ' (init. UPDATEs already sent: %s)',
2131                   self._sent_init_non_rtc_update)
2132         if self._rtc_eor_timer:
2133             self._rtc_eor_timer.stop()
2134             self._rtc_eor_timer = None
2135             return True
2136         return False
2137
2138     def _enqueue_init_updates(self):
2139         """Enqueues current routes to be shared with this peer."""
2140         assert self.state.bgp_state == const.BGP_FSM_ESTABLISHED
2141         if self.is_mbgp_cap_valid(RF_RTC_UC):
2142             # Enqueues all best-RTC_NLRIs to be sent as initial update to this
2143             # peer.
2144             self._peer_manager.comm_all_rt_nlris(self)
2145             self._schedule_sending_init_updates()
2146         else:
2147             # Enqueues all best-path to be sent as initial update to this peer
2148             # expect for RTC route-family.
2149             tm = self._core_service.table_manager
2150             self.comm_all_best_paths(tm.global_tables)
2151
2152     def comm_all_best_paths(self, global_tables):
2153         """Shares/communicates current best paths with this peers.
2154
2155         Can be used to send initial updates after we have established session
2156         with `peer`.
2157         """
2158         LOG.debug('Communicating current best path for all afi/safi except'
2159                   ' 1/132')
2160         # We will enqueue best path from all global destination.
2161         for route_family, table in global_tables.items():
2162             if route_family == RF_RTC_UC:
2163                 continue
2164             if self.is_mbgp_cap_valid(route_family):
2165                 for dest in table.values():
2166                     if dest.best_path:
2167                         self.communicate_path(dest.best_path)
2168
2169     def communicate_path(self, path):
2170         """Communicates `path` to this peer if it qualifies.
2171
2172         Checks if `path` should be shared/communicated with this peer according
2173         to various conditions: like bgp state, transmit side loop, local and
2174         remote AS path, community attribute, etc.
2175         """
2176         LOG.debug('Peer %s asked to communicate path', self)
2177         if not path:
2178             raise ValueError('Invalid path %s given.' % path)
2179
2180         # We do not send anything to peer who is not in established state.
2181         if not self.in_established():
2182             LOG.debug('Skipping sending path as peer is not in '
2183                       'ESTABLISHED state %s', path)
2184             return
2185
2186         # Check if this session is available for given paths afi/safi
2187         path_rf = path.route_family
2188         if not (self.is_mpbgp_cap_valid(path_rf) or
2189                 path_rf in [RF_IPv4_UC, RF_IPv6_UC]):
2190             LOG.debug('Skipping sending path as %s route family is not'
2191                       ' available for this session', path_rf)
2192             return
2193
2194         # If RTC capability is available and path afi/saif is other than  RT
2195         # nlri
2196         if path_rf != RF_RTC_UC and \
2197                 self.is_mpbgp_cap_valid(RF_RTC_UC):
2198             rtfilter = self._peer_manager.curr_peer_rtfilter(self)
2199             # If peer does not have any rtfilter or if rtfilter does not have
2200             # any RTs common with path RTs we do not share this path with the
2201             # peer
2202             if rtfilter and not path.has_rts_in(rtfilter):
2203                 LOG.debug('Skipping sending path as rffilter %s and path '
2204                           'rts %s have no RT in common',
2205                           rtfilter, path.get_rts())
2206                 return
2207
2208         # Transmit side loop detection: We check if leftmost AS matches
2209         # peers AS, if so we do not send UPDATE message to this peer.
2210         as_path = path.get_pattr(BGP_ATTR_TYPE_AS_PATH)
2211         if as_path and as_path.has_matching_leftmost(self.remote_as):
2212             LOG.debug('Skipping sending path as AS_PATH has peer AS %s',
2213                       self.remote_as)
2214             return
2215
2216         # If this peer is a route server client, we forward the path
2217         # regardless of AS PATH loop, whether the connection is iBGP or eBGP,
2218         # or path's communities.
2219         if self.is_route_server_client:
2220             outgoing_route = OutgoingRoute(path)
2221             self.enque_outgoing_msg(outgoing_route)
2222
2223         if self._neigh_conf.multi_exit_disc:
2224             med_attr = path.get_pattr(BGP_ATTR_TYPE_MULTI_EXIT_DISC)
2225             if not med_attr:
2226                 path = bgp_utils.clone_path_and_update_med_for_target_neighbor(
2227                     path,
2228                     self._neigh_conf.multi_exit_disc
2229                 )
2230
2231         # For connected/local-prefixes, we send update to all peers.
2232         if path.source is None:
2233             # Construct OutgoingRoute specific for this peer and put it in
2234             # its sink.
2235             outgoing_route = OutgoingRoute(path)
2236             self.enque_outgoing_msg(outgoing_route)
2237
2238         # If path from a bgp-peer is new best path, we share it with
2239         # all bgp-peers except the source peer and other peers in his AS.
2240         # This is default Junos setting that in Junos can be disabled with
2241         # 'advertise-peer-as' setting.
2242         elif (self != path.source or
2243               self.remote_as != path.source.remote_as):
2244             # When BGP speaker receives an UPDATE message from an internal
2245             # peer, the receiving BGP speaker SHALL NOT re-distribute the
2246             # routing information contained in that UPDATE message to other
2247             # internal peers (unless the speaker acts as a BGP Route
2248             # Reflector) [RFC4271].
2249             if (self.remote_as == self._core_service.asn
2250                     and self.remote_as == path.source.remote_as
2251                     and isinstance(path.source, Peer)
2252                     and not path.source.is_route_reflector_client
2253                     and not self.is_route_reflector_client):
2254                 LOG.debug(
2255                     'Skipping sending iBGP route to iBGP peer %s AS %s',
2256                     self.ip_address, self.remote_as)
2257                 return
2258
2259             # If new best path has community attribute, it should be taken into
2260             # account when sending UPDATE to peers.
2261             comm_attr = path.get_pattr(BGP_ATTR_TYPE_COMMUNITIES)
2262             if comm_attr:
2263                 comm_attr_na = comm_attr.has_comm_attr(
2264                     BGPPathAttributeCommunities.NO_ADVERTISE
2265                 )
2266                 # If we have NO_ADVERTISE attribute present, we do not send
2267                 # UPDATE to any peers
2268                 if comm_attr_na:
2269                     LOG.debug('Path has community attr. NO_ADVERTISE = %s'
2270                               '. Hence not advertising to peer',
2271                               comm_attr_na)
2272                     return
2273
2274                 comm_attr_ne = comm_attr.has_comm_attr(
2275                     BGPPathAttributeCommunities.NO_EXPORT
2276                 )
2277                 comm_attr_nes = comm_attr.has_comm_attr(
2278                     BGPPathAttributeCommunities.NO_EXPORT_SUBCONFED
2279                 )
2280                 # If NO_EXPORT_SUBCONFED/NO_EXPORT is one of the attribute, we
2281                 # do not advertise to eBGP peers as we do not have any
2282                 # confederation feature at this time.
2283                 if ((comm_attr_nes or comm_attr_ne) and
2284                         (self.remote_as != self._core_service.asn)):
2285                     LOG.debug('Skipping sending UPDATE to peer: %s as per '
2286                               'community attribute configuration', self)
2287                     return
2288
2289             # Construct OutgoingRoute specific for this peer and put it in
2290             # its sink.
2291             outgoing_route = OutgoingRoute(path)
2292             self.enque_outgoing_msg(outgoing_route)
2293             LOG.debug('Enqueued outgoing route %s for peer %s',
2294                       outgoing_route.path.nlri, self)
2295
2296     def connection_made(self):
2297         """Protocols connection established handler
2298         """
2299         LOG.info(
2300             'Connection to peer: %s established',
2301             self._neigh_conf.ip_address,
2302             extra={
2303                 'resource_name': self._neigh_conf.name,
2304                 'resource_id': self._neigh_conf.id
2305             }
2306         )
2307
2308     def connection_lost(self, reason):
2309         """Protocols connection lost handler.
2310         """
2311         LOG.info(
2312             'Connection to peer %s lost, reason: %s Resetting '
2313             'retry connect loop: %s' %
2314             (self._neigh_conf.ip_address, reason,
2315              self._connect_retry_event.is_set()),
2316             extra={
2317                 'resource_name': self._neigh_conf.name,
2318                 'resource_id': self._neigh_conf.id
2319             }
2320         )
2321         self.state.bgp_state = const.BGP_FSM_IDLE
2322         if self._protocol:
2323             self._protocol.stop()
2324             self._protocol = None
2325             # Create new collection for initial RT NLRIs
2326             self._init_rtc_nlri_path = []
2327             self._sent_init_non_rtc_update = False
2328             # Clear sink.
2329             self.clear_outgoing_msg_list()
2330             # Un-schedule timers
2331             self._unschedule_sending_init_updates()
2332
2333             # Increment the version number of this source.
2334             self.version_num += 1
2335             self._peer_manager.on_peer_down(self)
2336
2337             # Check configuration if neighbor is still enabled, we try
2338             # reconnecting.
2339             if self._neigh_conf.enabled:
2340                 if not self._connect_retry_event.is_set():
2341                     self._connect_retry_event.set()
2342
2343     @staticmethod
2344     def _lookup_attribute_map(attribute_map, attr_type, path):
2345         result_attr = None
2346         if attr_type in attribute_map:
2347             maps = attribute_map[attr_type]
2348             for m in maps:
2349                 cause, result = m.evaluate(path)
2350                 LOG.debug(
2351                     "local_pref evaluation result:%s, cause:%s",
2352                     result, cause)
2353                 if result:
2354                     result_attr = m.get_attribute()
2355                     break
2356         return result_attr