1 # Copyright (C) 2014 Nippon Telegraph and Telephone Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 Network Controller interface to BGP.
19 Network controller w.r.t BGPS for APGW Automation project is named as APGW
20 Agent and Route Server.
28 from ryu.lib.packet import safi as subaddr_family
30 from ryu.services.protocols.bgp import api
31 from ryu.services.protocols.bgp.api.base import ApiException
32 from ryu.services.protocols.bgp.api.base import NEXT_HOP
33 from ryu.services.protocols.bgp.api.base import ORIGIN_RD
34 from ryu.services.protocols.bgp.api.base import PREFIX
35 from ryu.services.protocols.bgp.api.base import ROUTE_DISTINGUISHER
36 from ryu.services.protocols.bgp.api.base import VPN_LABEL
37 from ryu.services.protocols.bgp.base import Activity
38 from ryu.services.protocols.bgp.base import add_bgp_error_metadata
39 from ryu.services.protocols.bgp.base import BGPSException
40 from ryu.services.protocols.bgp.base import FlexinetPeer
41 from ryu.services.protocols.bgp.base import NET_CTRL_ERROR_CODE
42 from ryu.services.protocols.bgp.constants import VRF_TABLE
43 from ryu.services.protocols.bgp.rtconf.vrfs import VRF_RF
44 from ryu.services.protocols.bgp.rtconf.vrfs import VrfConf
45 from ryu.services.protocols.bgp.utils.validation import is_valid_ipv4
48 # Logger instance for this module.
49 LOG = logging.getLogger('bgpspeaker.net_ctrl')
51 # Network controller service socket constants.
52 NC_RPC_BIND_IP = 'apgw_rpc_bind_ip'
53 NC_RPC_BIND_PORT = 'apgw_rpc_bind_port'
55 # Notification symbols
56 NOTIFICATION_ADD_REMOTE_PREFIX = 'prefix.add_remote'
57 NOTIFICATION_DELETE_REMOTE_PREFIX = 'prefix.delete_remote'
58 NOTIFICATION_ADD_LOCAL_PREFIX = 'prefix.add_local'
59 NOTIFICATION_DELETE_LOCAL_PREFIX = 'prefix.delete_local'
60 NOTIFICATION_LOG = 'logging'
62 # MessagePackRPC message type constants
68 # Indexes for various RPC message types.
79 # RPC socket receive buffer size in bytes.
80 RPC_SOCK_BUFF_SIZE = 4096
83 @add_bgp_error_metadata(code=NET_CTRL_ERROR_CODE,
85 def_desc='Unknown Network controller exception')
86 class NetworkControllerError(BGPSException):
87 """Common base class for exceptions related to RPC calls.
92 class RpcSession(Activity):
93 """Provides message-pack RPC abstraction for one session.
95 It contains message-pack packer, un-packer, message ID sequence
96 and utilities that use these. It also cares about socket communication w/
99 NAME_FMT = 'RpcSession%s'
101 def __init__(self, sock, outgoing_msg_sink_iter):
102 self.peer_name = str(sock.getpeername())
103 super(RpcSession, self).__init__(self.NAME_FMT % self.peer_name)
104 self._packer = msgpack.Packer(encoding='utf-8')
105 self._unpacker = msgpack.Unpacker(encoding='utf-8')
108 self._outgoing_msg_sink_iter = outgoing_msg_sink_iter
109 self.is_connected = True
111 self.green_out = None
114 super(RpcSession, self).stop()
115 self.is_connected = False
116 LOG.info('RPC Session to %s stopped', self.peer_name)
119 # Process outgoing messages in new thread.
120 self.green_out = self._spawn('net_ctrl._process_outgoing',
121 self._process_outgoing_msg,
122 self._outgoing_msg_sink_iter)
123 # Process incoming messages in new thread.
124 self.green_in = self._spawn('net_ctrl._process_incoming',
125 self._process_incoming_msgs)
126 LOG.info('RPC Session to %s started', self.peer_name)
128 self.green_out.wait()
130 def _next_msg_id(self):
131 this_id = self._next_msgid
132 self._next_msgid += 1
135 def create_request(self, method, params):
136 msgid = self._next_msg_id()
137 return self._packer.pack([RPC_MSG_REQUEST, msgid, method, params])
139 def create_error_response(self, msgid, error):
141 raise NetworkControllerError(desc='Creating error without body!')
142 return self._packer.pack([RPC_MSG_RESPONSE, msgid, error, None])
144 def create_success_response(self, msgid, result):
146 raise NetworkControllerError(desc='Creating response without '
148 return self._packer.pack([RPC_MSG_RESPONSE, msgid, None, result])
150 def create_notification(self, method, params):
151 return self._packer.pack([RPC_MSG_NOTIFY, method, params])
153 def feed_and_get_messages(self, data):
154 self._unpacker.feed(data)
156 for msg in self._unpacker:
160 def feed_and_get_first_message(self, data):
161 self._unpacker.feed(data)
162 for msg in self._unpacker:
165 def _send_error_response(self, request, err_msg):
166 rpc_msg = self.create_error_response(request[RPC_IDX_MSG_ID],
168 return self._sendall(rpc_msg)
170 def _send_success_response(self, request, result):
171 rpc_msg = self.create_success_response(request[RPC_IDX_MSG_ID],
173 return self._sendall(rpc_msg)
175 def send_notification(self, method, params):
176 rpc_msg = self.create_notification(method, params)
177 return self._sendall(rpc_msg)
179 def _process_incoming_msgs(self):
180 LOG.debug('NetworkController started processing incoming messages')
183 while self.is_connected:
184 # Wait for request/response/notification from peer.
185 msg_buff = self._recv()
186 if len(msg_buff) == 0:
187 LOG.info('Peer %s disconnected.', self.peer_name)
188 self.is_connected = False
191 messages = self.feed_and_get_messages(msg_buff)
193 if msg[0] == RPC_MSG_REQUEST:
195 result = _handle_request(msg)
196 self._send_success_response(msg, result)
197 except BGPSException as e:
198 self._send_error_response(msg, e.message)
199 elif msg[0] == RPC_MSG_RESPONSE:
200 _handle_response(msg)
201 elif msg[0] == RPC_MSG_NOTIFY:
202 _handle_notification(msg)
204 LOG.error('Invalid message type: %r', msg)
207 # Stop outgoing connection.
209 self.green_out.kill()
211 def _process_outgoing_msg(self, sink_iter):
212 """For every message we construct a corresponding RPC message to be
213 sent over the given socket inside given RPC session.
215 This function should be launched in a new green thread as
218 LOG.debug('NetworkController processing outgoing request list.')
219 # TODO(PH): We should try not to sent routes from bgp peer that is not
220 # in established state.
221 from ryu.services.protocols.bgp.model import (
222 FlexinetOutgoingRoute)
223 while self.is_connected:
224 # sink iter is Sink instance and next is blocking so this isn't
226 for outgoing_msg in sink_iter:
227 if not self.is_connected:
230 if isinstance(outgoing_msg, FlexinetOutgoingRoute):
231 rpc_msg = _create_prefix_notification(outgoing_msg, self)
233 raise NotImplementedError(
234 'Do not handle out going message of type %s' %
235 outgoing_msg.__class__)
237 self._sendall(rpc_msg)
240 # Stop incoming connection.
245 return self._sock_wrap(self._socket.recv)(RPC_SOCK_BUFF_SIZE)
247 def _sendall(self, msg):
248 return self._sock_wrap(self._socket.sendall)(msg)
250 def _sock_wrap(self, func):
251 def wrapper(*args, **kwargs):
253 ret = func(*args, **kwargs)
255 LOG.error(traceback.format_exc())
262 def _socket_error(self):
267 def _create_prefix_notification(outgoing_msg, rpc_session):
268 """Constructs prefix notification with data from given outgoing message.
270 Given RPC session is used to create RPC notification message.
273 path = outgoing_msg.path
277 assert path.source is not None
278 if path.source != VRF_TABLE:
279 # Extract relevant info for update-add/update-delete.
280 params = [{ROUTE_DISTINGUISHER: outgoing_msg.route_dist,
281 PREFIX: vpn_nlri.prefix,
282 NEXT_HOP: path.nexthop,
283 VRF_RF: VrfConf.rf_2_vrf_rf(path.route_family)}]
284 if path.nlri.ROUTE_FAMILY.safi not in (subaddr_family.IP_FLOWSPEC,
285 subaddr_family.VPN_FLOWSPEC):
286 params[VPN_LABEL] = path.label_list[0]
288 if not path.is_withdraw:
289 # Create notification to NetworkController.
290 rpc_msg = rpc_session.create_notification(
291 NOTIFICATION_ADD_REMOTE_PREFIX, params)
293 # Create update-delete request to NetworkController.
294 rpc_msg = rpc_session.create_notification(
295 NOTIFICATION_DELETE_REMOTE_PREFIX, params)
297 # Extract relevant info for update-add/update-delete.
298 params = [{ROUTE_DISTINGUISHER: outgoing_msg.route_dist,
299 PREFIX: vpn_nlri.prefix,
300 NEXT_HOP: path.nexthop,
301 VRF_RF: VrfConf.rf_2_vrf_rf(path.route_family),
302 ORIGIN_RD: path.origin_rd}]
303 if not path.is_withdraw:
304 # Create notification to NetworkController.
305 rpc_msg = rpc_session.create_notification(
306 NOTIFICATION_ADD_LOCAL_PREFIX, params)
308 # Create update-delete request to NetworkController.
309 rpc_msg = rpc_session.create_notification(
310 NOTIFICATION_DELETE_LOCAL_PREFIX, params)
315 def _validate_rpc_ip(rpc_server_ip):
316 """Validates given ip for use as rpc host bind address.
318 if not is_valid_ipv4(rpc_server_ip):
319 raise NetworkControllerError(desc='Invalid rpc ip address.')
323 def _validate_rpc_port(port):
324 """Validates give port for use as rpc server port.
327 raise NetworkControllerError(desc='Invalid rpc port number.')
328 if isinstance(port, str):
332 raise NetworkControllerError(desc='Invalid rpc port number %s' % port)
336 class _NetworkController(FlexinetPeer, Activity):
337 """Network controller peer.
339 Provides MessagePackRPC interface for flexinet peers like Network
340 controller to peer and have RPC session with BGPS process. This RPC
341 interface provides access to BGPS API.
345 FlexinetPeer.__init__(self)
346 Activity.__init__(self, name='NETWORK_CONTROLLER')
347 # Outstanding requests, i.e. requests for which we are yet to receive
348 # response from peer. We currently do not have any requests going out.
349 self._outstanding_reqs = {}
350 # Dictionary for Peer name to RPC session.
351 self._rpc_sessions = {}
353 def _run(self, *args, **kwargs):
356 Wait for peer to connect and start rpc session with it.
357 For every connection we start and new rpc session.
359 apgw_rpc_bind_ip = _validate_rpc_ip(kwargs.pop(NC_RPC_BIND_IP))
360 apgw_rpc_bind_port = _validate_rpc_port(kwargs.pop(NC_RPC_BIND_PORT))
362 sock_addr = (apgw_rpc_bind_ip, apgw_rpc_bind_port)
363 LOG.debug('NetworkController started listening for connections...')
365 server_thread, _ = self._listen_tcp(sock_addr,
366 self._start_rpc_session)
370 def _start_rpc_session(self, sock):
371 """Starts a new RPC session with given connection.
373 session_name = RpcSession.NAME_FMT % str(sock.getpeername())
374 self._stop_child_activities(session_name)
376 rpc_session = RpcSession(sock, self)
377 self._spawn_activity(rpc_session)
379 def _send_rpc_notification_to_session(self, session, method, params):
380 if not session.is_connected:
381 # Stops disconnected RPC session.
382 self._stop_child_activities(session.name)
385 return session.send_notification(method, params)
387 def send_rpc_notification(self, method, params):
391 for session in list(self._child_activity_map.values()):
392 if not isinstance(session, RpcSession):
394 self._send_rpc_notification_to_session(session, method, params)
397 def _handle_response(response):
398 raise NotImplementedError('BGPS is not making any request hence should not'
399 ' get any response. Response: %s' % response)
402 def _handle_notification(notification):
403 LOG.debug('Notification from NetworkController<<: %s %s',
404 notification[RPC_IDX_NTF_SYM], notification[RPC_IDX_NTF_PARAM])
405 operation, params = notification[1], notification[2]
406 return api.base.call(operation, **params[0])
409 def _handle_request(request):
410 LOG.debug('Request from NetworkController<<: %s %s',
411 request[RPC_IDX_REQ_SYM], request[RPC_IDX_REQ_PARAM])
412 operation, params = request[2], request[3]
417 return api.base.call(operation, **kwargs)
419 LOG.error(traceback.format_exc())
420 raise ApiException(desc='Invalid type for RPC parameter.')
423 # Network controller singleton
424 NET_CONTROLLER = _NetworkController()