1 # Copyright (C) 2014 Nippon Telegraph and Telephone Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 BGP protocol implementation.
23 from socket import IPPROTO_TCP, TCP_NODELAY
24 from eventlet import semaphore
26 from ryu.lib.packet import bgp
27 from ryu.lib.packet.bgp import AS_TRANS
28 from ryu.lib.packet.bgp import BGPMessage
29 from ryu.lib.packet.bgp import BGPOpen
30 from ryu.lib.packet.bgp import BGPUpdate
31 from ryu.lib.packet.bgp import BGPKeepAlive
32 from ryu.lib.packet.bgp import BGPNotification
33 from ryu.lib.packet.bgp import BGP_MSG_OPEN
34 from ryu.lib.packet.bgp import BGP_MSG_UPDATE
35 from ryu.lib.packet.bgp import BGP_MSG_KEEPALIVE
36 from ryu.lib.packet.bgp import BGP_MSG_NOTIFICATION
37 from ryu.lib.packet.bgp import BGP_MSG_ROUTE_REFRESH
38 from ryu.lib.packet.bgp import BGP_CAP_FOUR_OCTET_AS_NUMBER
39 from ryu.lib.packet.bgp import BGP_CAP_ENHANCED_ROUTE_REFRESH
40 from ryu.lib.packet.bgp import BGP_CAP_MULTIPROTOCOL
41 from ryu.lib.packet.bgp import BGP_ERROR_HOLD_TIMER_EXPIRED
42 from ryu.lib.packet.bgp import BGP_ERROR_SUB_HOLD_TIMER_EXPIRED
43 from ryu.lib.packet.bgp import get_rf
45 from ryu.services.protocols.bgp.base import Activity
46 from ryu.services.protocols.bgp.base import add_bgp_error_metadata
47 from ryu.services.protocols.bgp.base import BGPSException
48 from ryu.services.protocols.bgp.base import CORE_ERROR_CODE
49 from ryu.services.protocols.bgp.constants import BGP_FSM_CONNECT
50 from ryu.services.protocols.bgp.constants import BGP_FSM_OPEN_CONFIRM
51 from ryu.services.protocols.bgp.constants import BGP_FSM_OPEN_SENT
52 from ryu.services.protocols.bgp.constants import BGP_VERSION_NUM
53 from ryu.services.protocols.bgp.protocol import Protocol
55 LOG = logging.getLogger('bgpspeaker.speaker')
57 # BGP min. and max. message lengths as per RFC.
59 BGP_MAX_MSG_LEN = 4096
61 # Keep-alive singleton.
62 _KEEP_ALIVE = BGPKeepAlive()
65 @add_bgp_error_metadata(code=CORE_ERROR_CODE, sub_code=2,
66 def_desc='Unknown error occurred related to Speaker.')
67 class BgpProtocolException(BGPSException):
68 """Base exception related to peer connection management.
73 def notification_factory(code, subcode):
74 """Returns a `Notification` message corresponding to given codes.
77 - `code`: (int) BGP error code
78 - `subcode`: (int) BGP error sub-code
80 notification = BGPNotification(code, subcode)
81 if not notification.reason:
82 raise ValueError('Invalid code/sub-code.')
87 class BgpProtocol(Protocol, Activity):
88 """Protocol that handles BGP messages.
90 MESSAGE_MARKER = (b'\xff\xff\xff\xff\xff\xff\xff\xff'
91 b'\xff\xff\xff\xff\xff\xff\xff\xff')
93 def __init__(self, socket, signal_bus, is_reactive_conn=False):
96 raise ValueError('Invalid arguments passed.')
97 self._remotename = self.get_remotename(socket)
98 self._localname = self.get_localname(socket)
99 activity_name = ('BgpProtocol %s, %s, %s' % (is_reactive_conn,
102 Activity.__init__(self, name=activity_name)
103 # Initialize instance variables.
105 self._recv_buff = b''
106 self._socket = socket
107 self._socket.setsockopt(IPPROTO_TCP, TCP_NODELAY, 1)
108 self._sendlock = semaphore.Semaphore()
109 self._signal_bus = signal_bus
110 self._holdtime = None
111 self._keepalive = None
113 # Add socket to Activity's socket container for managing it.
115 self._asso_socket_map['passive_conn'] = self._socket
117 self._asso_socket_map['active_conn'] = self._socket
118 self._open_msg = None
119 self.state = BGP_FSM_CONNECT
120 self._is_reactive = is_reactive_conn
121 self.sent_open_msg = None
122 self.recv_open_msg = None
123 self._is_bound = False
124 self.cap_four_octet_as_number = False
127 def is_reactive(self):
128 return self._is_reactive
132 return self._holdtime
136 return self._keepalive
138 def is_colliding(self, other_protocol):
139 if not isinstance(other_protocol, BgpProtocol):
140 raise ValueError('Currently only support comparing with '
143 # Compare protocol connection end point's addresses
144 if (self._remotename[0] == other_protocol._remotename[0] and
145 self._localname[0] == other_protocol._localname[0]):
150 def is_local_router_id_greater(self):
151 """Compares *True* if local router id is greater when compared to peer
154 Should only be called after protocol has reached OpenConfirm state.
156 from ryu.services.protocols.bgp.utils.bgp import from_inet_ptoi
158 if not self.state == BGP_FSM_OPEN_CONFIRM:
159 raise BgpProtocolException(desc='Can access remote router id only'
160 ' after open message is received')
161 remote_id = self.recv_open_msg.bgp_identifier
162 local_id = self.sent_open_msg.bgp_identifier
163 return from_inet_ptoi(local_id) > from_inet_ptoi(remote_id)
165 def is_enhanced_rr_cap_valid(self):
166 """Checks is enhanced route refresh capability is enabled/valid.
168 Checks sent and received `Open` messages to see if this session with
169 peer is capable of enhanced route refresh capability.
171 if not self.recv_open_msg:
172 raise ValueError('Did not yet receive peers open message.')
174 err_cap_enabled = False
175 local_caps = self.sent_open_msg.opt_param
176 peer_caps = self.recv_open_msg.opt_param
178 local_cap = [cap for cap in local_caps
179 if cap.cap_code == BGP_CAP_ENHANCED_ROUTE_REFRESH]
180 peer_cap = [cap for cap in peer_caps
181 if cap.cap_code == BGP_CAP_ENHANCED_ROUTE_REFRESH]
183 # Both local and peer should advertise ERR capability for it to be
185 if local_cap and peer_cap:
186 err_cap_enabled = True
188 return err_cap_enabled
190 def _check_route_fmly_adv(self, open_msg, route_family):
193 local_caps = open_msg.opt_param
194 for cap in local_caps:
195 # Check MP_BGP capability was advertised.
196 if cap.cap_code == BGP_CAP_MULTIPROTOCOL:
197 # Iterate over all advertised mp_bgp caps to find a match.
198 if (route_family.afi == cap.afi and
199 route_family.safi == cap.safi):
204 def is_route_family_adv(self, route_family):
205 """Checks if `route_family` was advertised to peer as per MP_BGP cap.
208 - True: if given address family was advertised.
209 - False: if given address family was not advertised.
211 return self._check_route_fmly_adv(self.sent_open_msg, route_family)
213 def is_route_family_adv_recv(self, route_family):
214 """Checks if `route_family` was advertised by peer as per MP_BGP cap.
217 - True: if given address family was advertised.
218 - False: if given address family was not advertised.
220 return self._check_route_fmly_adv(self.recv_open_msg, route_family)
223 def negotiated_afs(self):
224 local_caps = self.sent_open_msg.opt_param
225 remote_caps = self.recv_open_msg.opt_param
227 local_mbgp_cap = [cap for cap in local_caps
228 if cap.cap_code == BGP_CAP_MULTIPROTOCOL]
229 remote_mbgp_cap = [cap for cap in remote_caps
230 if cap.cap_code == BGP_CAP_MULTIPROTOCOL]
232 # Check MP_BGP capabilities were advertised.
233 if local_mbgp_cap and remote_mbgp_cap:
234 local_families = set([
235 (peer_cap.afi, peer_cap.safi)
236 for peer_cap in local_mbgp_cap
238 remote_families = set([
239 (peer_cap.afi, peer_cap.safi)
240 for peer_cap in remote_mbgp_cap
242 afi_safi = local_families.intersection(remote_families)
247 for afi, safi in afi_safi:
248 afs.append(get_rf(afi, safi))
251 def is_mbgp_cap_valid(self, route_family):
252 """Returns True if both sides of this protocol have advertise
253 capability for this address family.
255 return (self.is_route_family_adv(route_family) and
256 self.is_route_family_adv_recv(route_family))
258 def is_four_octet_as_number_cap_valid(self):
259 """Returns True if both sides of this protocol have Four-Octet
260 AS number capability."""
261 return (self.cap_four_octet_as_number and
262 self._peer.cap_four_octet_as_number)
264 def _run(self, peer):
265 """Sends open message to peer and handles received messages.
268 - `peer`: the peer to which this protocol instance is connected to.
270 # We know the peer we are connected to, we send open message.
272 self.connection_made()
274 # We wait for peer to send messages.
277 def data_received(self, next_bytes):
279 self._data_received(next_bytes)
280 except bgp.BgpExc as exc:
282 "BGPExc Exception while receiving data: "
283 "%s \n Traceback %s \n"
284 % (str(exc), traceback.format_exc())
287 self.send_notification(exc.CODE, exc.SUB_CODE)
293 def parse_msg_header(buff):
294 """Parses given `buff` into bgp message header format.
296 Returns a tuple of marker, length, type of bgp message.
298 return struct.unpack('!16sHB', buff)
300 def _data_received(self, next_bytes):
301 """Maintains buffer of bytes received from peer and extracts bgp
302 message from this buffer if enough data is received.
304 Validates bgp message marker, length, type and data and constructs
305 appropriate bgp message instance and calls handler.
308 - `next_bytes`: next set of bytes received from peer.
310 # Append buffer with received bytes.
311 self._recv_buff += next_bytes
314 # If current buffer size is less then minimum bgp message size, we
315 # return as we do not have a complete bgp message to work with.
316 if len(self._recv_buff) < BGP_MIN_MSG_LEN:
319 # Parse message header into elements.
320 auth, length, ptype = BgpProtocol.parse_msg_header(
321 self._recv_buff[:BGP_MIN_MSG_LEN])
323 # Check if we have valid bgp message marker.
324 # We should get default marker since we are not supporting any
326 if (auth != BgpProtocol.MESSAGE_MARKER):
327 LOG.error('Invalid message marker received: %s', auth)
330 # Check if we have valid bgp message length.
331 check = (length < BGP_MIN_MSG_LEN or length > BGP_MAX_MSG_LEN)
333 # RFC says: The minimum length of the OPEN message is 29
334 # octets (including the message header).
335 check2 = (ptype == BGP_MSG_OPEN and length < BGPOpen._MIN_LEN)
337 # RFC says: A KEEPALIVE message consists of only the
338 # message header and has a length of 19 octets.
339 check3 = (ptype == BGP_MSG_KEEPALIVE and
340 length != BGPKeepAlive._MIN_LEN)
342 # RFC says: The minimum length of the UPDATE message is 23
344 check4 = (ptype == BGP_MSG_UPDATE and
345 length < BGPUpdate._MIN_LEN)
347 if any((check, check2, check3, check4)):
348 raise bgp.BadLen(ptype, length)
350 # If we have partial message we wait for rest of the message.
351 if len(self._recv_buff) < length:
353 msg, _, rest = BGPMessage.parser(self._recv_buff)
354 self._recv_buff = rest
356 # If we have a valid bgp message we call message handler.
357 self._handle_msg(msg)
359 def send_notification(self, code, subcode):
360 """Utility to send notification message.
362 Closes the socket after sending the message.
364 - `socket`: (socket) - socket over which to send notification
366 - `code`: (int) - BGP Notification code
367 - `subcode`: (int) - BGP Notification sub-code
369 RFC ref: http://tools.ietf.org/html/rfc4486
370 http://www.iana.org/assignments/bgp-parameters/bgp-parameters.xhtml
372 notification = BGPNotification(code, subcode)
373 reason = notification.reason
374 self._send_with_lock(notification)
375 self._signal_bus.bgp_error(self._peer, code, subcode, reason)
376 if len(self._localname):
377 LOG.error('Sent notification to %r >> %s', self._localname,
381 def _send_with_lock(self, msg):
382 self._sendlock.acquire()
384 self._socket.sendall(msg.serialize())
386 self.connection_lost('failed to write to socket')
388 self._sendlock.release()
392 raise BgpProtocolException('Tried to send message to peer when '
393 'this protocol instance is not started'
394 ' or is no longer is started state.')
395 self._send_with_lock(msg)
397 if msg.type == BGP_MSG_NOTIFICATION:
398 LOG.error('Sent notification to %s >> %s', self._remotename, msg)
400 self._signal_bus.bgp_notification_sent(self._peer, msg)
402 LOG.debug('Sent msg to %s >> %s', self._remotename, msg)
407 def _validate_open_msg(self, open_msg):
408 """Validates BGP OPEN message according from application context.
410 Parsing modules takes care of validating OPEN message that need no
411 context. But here we validate it according to current application
412 settings. RTC or RR/ERR are MUST capability if peer does not support
413 either one of them we have to end session.
415 assert open_msg.type == BGP_MSG_OPEN
417 opt_param_cap_map = open_msg.opt_param_cap_map
419 # Validate remote AS number.
420 remote_as = open_msg.my_as
421 # Try to get AS number from Four-Octet AS number capability.
422 cap4as = opt_param_cap_map.get(BGP_CAP_FOUR_OCTET_AS_NUMBER, None)
424 if remote_as == AS_TRANS:
425 # Raise Bad Peer AS error message, if my_as is AS_TRANS
426 # and without Four-Octet AS number capability.
427 raise bgp.BadPeerAs()
428 self.cap_four_octet_as_number = False
430 # Note: Even if the peer has Four-Octet AS number capability,
431 # keep the local capability setting
432 remote_as = cap4as.as_number
433 self.cap_four_octet_as_number = True
434 # Validate remote AS number with local setting.
435 if remote_as != self._peer.remote_as:
436 raise bgp.BadPeerAs()
438 # Validate bgp version number.
439 if open_msg.version != BGP_VERSION_NUM:
440 raise bgp.UnsupportedVersion(BGP_VERSION_NUM)
442 def _handle_msg(self, msg):
443 """When a BGP message is received, send it to peer.
445 Open messages are validated here. Peer handler is called to handle each
446 message except for *Open* and *Notification* message. On receiving
447 *Notification* message we close connection with peer.
449 LOG.debug('Received msg from %s << %s', self._remotename, msg)
451 # If we receive open message we try to bind to protocol
452 if msg.type == BGP_MSG_OPEN:
453 if self.state == BGP_FSM_OPEN_SENT:
454 # Validate open message.
455 self._validate_open_msg(msg)
456 self.recv_open_msg = msg
457 self.state = BGP_FSM_OPEN_CONFIRM
458 self._peer.state.bgp_state = self.state
460 # Try to bind this protocol to peer.
461 self._is_bound = self._peer.bind_protocol(self)
463 # If this protocol failed to bind to peer.
464 if not self._is_bound:
465 # Failure to bind to peer indicates connection collision
466 # resolution choose different instance of protocol and this
467 # instance has to close. Before closing it sends
468 # appropriate notification msg. to peer.
469 raise bgp.CollisionResolution()
471 # If peer sends Hold Time as zero, then according to RFC we do
472 # not set Hold Time and Keep Alive timer.
473 if msg.hold_time == 0:
474 LOG.info('The Hold Time sent by the peer is zero, hence '
475 'not setting any Hold Time and Keep Alive'
478 # Start Keep Alive timer considering Hold Time preference
480 self._start_timers(msg.hold_time)
481 self._send_keepalive()
483 # Peer does not see open message.
486 # If we receive a Open message out of order
487 LOG.error('Open message received when current state is not '
489 # Received out-of-order open message
490 # We raise Finite state machine error
491 raise bgp.FiniteStateMachineError()
492 elif msg.type == BGP_MSG_NOTIFICATION:
494 self._signal_bus.bgp_notification_received(self._peer, msg)
495 # If we receive notification message
496 LOG.error('Received notification message, hence closing '
497 'connection %s', msg)
501 # If we receive keepalive or update message, we reset expire timer.
502 if (msg.type == BGP_MSG_KEEPALIVE or
503 msg.type == BGP_MSG_UPDATE):
507 # Call peer message handler for appropriate messages.
509 (BGP_MSG_UPDATE, BGP_MSG_KEEPALIVE, BGP_MSG_ROUTE_REFRESH)):
510 self._peer.handle_msg(msg)
511 # We give chance to other threads to run.
514 def _start_timers(self, peer_holdtime):
515 """Starts keepalive and expire timers.
517 Hold time is set to min. of peer and configured/default hold time.
518 Starts keep alive timer and expire timer based on this value.
520 neg_timer = min(self._holdtime, peer_holdtime)
521 if neg_timer < self._holdtime:
522 LOG.info('Negotiated hold time (%s) is lower then '
523 'configured/default (%s).', neg_timer, self._holdtime)
524 # We use negotiated timer value.
525 self._holdtime = neg_timer
526 self._keepalive = self._create_timer('Keepalive Timer',
527 self._send_keepalive)
528 interval = self._holdtime // 3
529 self._keepalive.start(interval, now=False)
530 # Setup the expire timer.
531 self._expiry = self._create_timer('Holdtime Timer', self._expired)
532 self._expiry.start(self._holdtime, now=False)
533 LOG.debug('Started keep-alive and expire timer for negotiated hold'
534 'time %s', self._holdtime)
537 """Hold timer expired event handler.
539 LOG.info('Negotiated hold time %s expired.', self._holdtime)
540 code = BGP_ERROR_HOLD_TIMER_EXPIRED
541 subcode = BGP_ERROR_SUB_HOLD_TIMER_EXPIRED
542 self.send_notification(code, subcode)
543 self.connection_lost('Negotiated hold time %s expired.' %
547 def _send_keepalive(self):
548 self.send(_KEEP_ALIVE)
550 def _recv_loop(self):
551 """Sits in tight loop collecting data received from peer and
554 required_len = BGP_MIN_MSG_LEN
555 conn_lost_reason = "Connection lost as protocol is no longer active"
558 next_bytes = self._socket.recv(required_len)
559 if len(next_bytes) == 0:
560 conn_lost_reason = 'Peer closed connection'
562 self.data_received(next_bytes)
563 except socket.error as err:
564 conn_lost_reason = 'Connection to peer lost: %s.' % err
565 except bgp.BgpExc as ex:
566 conn_lost_reason = 'Connection to peer lost, reason: %s.' % ex
567 except Exception as e:
568 LOG.debug(traceback.format_exc())
569 conn_lost_reason = str(e)
571 self.connection_lost(conn_lost_reason)
573 def connection_made(self):
574 """Connection to peer handler.
576 We send bgp open message to peer and initialize related attributes.
578 assert self.state == BGP_FSM_CONNECT
579 # We have a connection with peer we send open message.
580 open_msg = self._peer.create_open_msg()
581 self._holdtime = open_msg.hold_time
582 self.state = BGP_FSM_OPEN_SENT
583 if not self.is_reactive:
584 self._peer.state.bgp_state = self.state
585 self.sent_open_msg = open_msg
587 self._peer.connection_made()
589 def connection_lost(self, reason):
590 """Stops all timers and notifies peer that connection is lost.
594 state = self._peer.state.bgp_state
595 if self._is_bound or state == BGP_FSM_OPEN_SENT:
596 self._peer.connection_lost(reason)
603 LOG.info('Connection to peer closed for unknown reasons.')