backing up
[vsorcdistro/.git] / ryu / build / lib.linux-armv7l-2.7 / ryu / services / protocols / bgp / net_ctrl.py
1 # Copyright (C) 2014 Nippon Telegraph and Telephone Corporation.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #    http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 """
17  Network Controller interface to BGP.
18
19  Network controller w.r.t BGPS for APGW Automation project is named as APGW
20  Agent and Route Server.
21 """
22 import logging
23 import socket
24 import traceback
25
26 import msgpack
27
28 from ryu.lib.packet import safi as subaddr_family
29
30 from ryu.services.protocols.bgp import api
31 from ryu.services.protocols.bgp.api.base import ApiException
32 from ryu.services.protocols.bgp.api.base import NEXT_HOP
33 from ryu.services.protocols.bgp.api.base import ORIGIN_RD
34 from ryu.services.protocols.bgp.api.base import PREFIX
35 from ryu.services.protocols.bgp.api.base import ROUTE_DISTINGUISHER
36 from ryu.services.protocols.bgp.api.base import VPN_LABEL
37 from ryu.services.protocols.bgp.base import Activity
38 from ryu.services.protocols.bgp.base import add_bgp_error_metadata
39 from ryu.services.protocols.bgp.base import BGPSException
40 from ryu.services.protocols.bgp.base import FlexinetPeer
41 from ryu.services.protocols.bgp.base import NET_CTRL_ERROR_CODE
42 from ryu.services.protocols.bgp.constants import VRF_TABLE
43 from ryu.services.protocols.bgp.rtconf.vrfs import VRF_RF
44 from ryu.services.protocols.bgp.rtconf.vrfs import VrfConf
45 from ryu.services.protocols.bgp.utils.validation import is_valid_ipv4
46
47
48 # Logger instance for this module.
49 LOG = logging.getLogger('bgpspeaker.net_ctrl')
50
51 # Network controller service socket constants.
52 NC_RPC_BIND_IP = 'apgw_rpc_bind_ip'
53 NC_RPC_BIND_PORT = 'apgw_rpc_bind_port'
54
55 # Notification symbols
56 NOTIFICATION_ADD_REMOTE_PREFIX = 'prefix.add_remote'
57 NOTIFICATION_DELETE_REMOTE_PREFIX = 'prefix.delete_remote'
58 NOTIFICATION_ADD_LOCAL_PREFIX = 'prefix.add_local'
59 NOTIFICATION_DELETE_LOCAL_PREFIX = 'prefix.delete_local'
60 NOTIFICATION_LOG = 'logging'
61
62 # MessagePackRPC message type constants
63 RPC_MSG_REQUEST = 0
64 RPC_MSG_RESPONSE = 1
65 RPC_MSG_NOTIFY = 2
66
67 #
68 # Indexes for various RPC message types.
69 #
70 RPC_IDX_MSG_TYP = 0
71 RPC_IDX_MSG_ID = 1
72 RPC_IDX_REQ_SYM = 2
73 RPC_IDX_REQ_PARAM = 3
74 RPC_IDX_RES_ERR = 2
75 RPC_IDX_RES_RST = 3
76 RPC_IDX_NTF_SYM = 1
77 RPC_IDX_NTF_PARAM = 2
78
79 # RPC socket receive buffer size in bytes.
80 RPC_SOCK_BUFF_SIZE = 4096
81
82
83 @add_bgp_error_metadata(code=NET_CTRL_ERROR_CODE,
84                         sub_code=1,
85                         def_desc='Unknown Network controller exception')
86 class NetworkControllerError(BGPSException):
87     """Common base class for exceptions related to RPC calls.
88     """
89     pass
90
91
92 class RpcSession(Activity):
93     """Provides message-pack RPC abstraction for one session.
94
95     It contains message-pack packer, un-packer, message ID sequence
96     and utilities that use these. It also cares about socket communication w/
97     RPC peer.
98     """
99     NAME_FMT = 'RpcSession%s'
100
101     def __init__(self, sock, outgoing_msg_sink_iter):
102         self.peer_name = str(sock.getpeername())
103         super(RpcSession, self).__init__(self.NAME_FMT % self.peer_name)
104         self._packer = msgpack.Packer(encoding='utf-8')
105         self._unpacker = msgpack.Unpacker(encoding='utf-8')
106         self._next_msgid = 0
107         self._socket = sock
108         self._outgoing_msg_sink_iter = outgoing_msg_sink_iter
109         self.is_connected = True
110         self.green_in = None
111         self.green_out = None
112
113     def stop(self):
114         super(RpcSession, self).stop()
115         self.is_connected = False
116         LOG.info('RPC Session to %s stopped', self.peer_name)
117
118     def _run(self):
119         # Process outgoing messages in new thread.
120         self.green_out = self._spawn('net_ctrl._process_outgoing',
121                                      self._process_outgoing_msg,
122                                      self._outgoing_msg_sink_iter)
123         # Process incoming messages in new thread.
124         self.green_in = self._spawn('net_ctrl._process_incoming',
125                                     self._process_incoming_msgs)
126         LOG.info('RPC Session to %s started', self.peer_name)
127         self.green_in.wait()
128         self.green_out.wait()
129
130     def _next_msg_id(self):
131         this_id = self._next_msgid
132         self._next_msgid += 1
133         return this_id
134
135     def create_request(self, method, params):
136         msgid = self._next_msg_id()
137         return self._packer.pack([RPC_MSG_REQUEST, msgid, method, params])
138
139     def create_error_response(self, msgid, error):
140         if error is None:
141             raise NetworkControllerError(desc='Creating error without body!')
142         return self._packer.pack([RPC_MSG_RESPONSE, msgid, error, None])
143
144     def create_success_response(self, msgid, result):
145         if result is None:
146             raise NetworkControllerError(desc='Creating response without '
147                                               'body!')
148         return self._packer.pack([RPC_MSG_RESPONSE, msgid, None, result])
149
150     def create_notification(self, method, params):
151         return self._packer.pack([RPC_MSG_NOTIFY, method, params])
152
153     def feed_and_get_messages(self, data):
154         self._unpacker.feed(data)
155         messages = []
156         for msg in self._unpacker:
157             messages.append(msg)
158         return messages
159
160     def feed_and_get_first_message(self, data):
161         self._unpacker.feed(data)
162         for msg in self._unpacker:
163             return msg
164
165     def _send_error_response(self, request, err_msg):
166         rpc_msg = self.create_error_response(request[RPC_IDX_MSG_ID],
167                                              str(err_msg))
168         return self._sendall(rpc_msg)
169
170     def _send_success_response(self, request, result):
171         rpc_msg = self.create_success_response(request[RPC_IDX_MSG_ID],
172                                                result)
173         return self._sendall(rpc_msg)
174
175     def send_notification(self, method, params):
176         rpc_msg = self.create_notification(method, params)
177         return self._sendall(rpc_msg)
178
179     def _process_incoming_msgs(self):
180         LOG.debug('NetworkController started processing incoming messages')
181         assert self._socket
182
183         while self.is_connected:
184             # Wait for request/response/notification from peer.
185             msg_buff = self._recv()
186             if len(msg_buff) == 0:
187                 LOG.info('Peer %s disconnected.', self.peer_name)
188                 self.is_connected = False
189                 self._socket.close()
190                 break
191             messages = self.feed_and_get_messages(msg_buff)
192             for msg in messages:
193                 if msg[0] == RPC_MSG_REQUEST:
194                     try:
195                         result = _handle_request(msg)
196                         self._send_success_response(msg, result)
197                     except BGPSException as e:
198                         self._send_error_response(msg, e.message)
199                 elif msg[0] == RPC_MSG_RESPONSE:
200                     _handle_response(msg)
201                 elif msg[0] == RPC_MSG_NOTIFY:
202                     _handle_notification(msg)
203                 else:
204                     LOG.error('Invalid message type: %r', msg)
205                 self.pause(0)
206
207         # Stop outgoing connection.
208         if self.green_out:
209             self.green_out.kill()
210
211     def _process_outgoing_msg(self, sink_iter):
212         """For every message we construct a corresponding RPC message to be
213         sent over the given socket inside given RPC session.
214
215         This function should be launched in a new green thread as
216         it loops forever.
217         """
218         LOG.debug('NetworkController processing outgoing request list.')
219         # TODO(PH): We should try not to sent routes from bgp peer that is not
220         # in established state.
221         from ryu.services.protocols.bgp.model import (
222             FlexinetOutgoingRoute)
223         while self.is_connected:
224             # sink iter is Sink instance and next is blocking so this isn't
225             # active wait.
226             for outgoing_msg in sink_iter:
227                 if not self.is_connected:
228                     self._socket.close()
229                     return
230                 if isinstance(outgoing_msg, FlexinetOutgoingRoute):
231                     rpc_msg = _create_prefix_notification(outgoing_msg, self)
232                 else:
233                     raise NotImplementedError(
234                         'Do not handle out going message of type %s' %
235                         outgoing_msg.__class__)
236                 if rpc_msg:
237                     self._sendall(rpc_msg)
238             self.pause(0)
239
240         # Stop incoming connection.
241         if self.green_in:
242             self.green_in.kill()
243
244     def _recv(self):
245         return self._sock_wrap(self._socket.recv)(RPC_SOCK_BUFF_SIZE)
246
247     def _sendall(self, msg):
248         return self._sock_wrap(self._socket.sendall)(msg)
249
250     def _sock_wrap(self, func):
251         def wrapper(*args, **kwargs):
252             try:
253                 ret = func(*args, **kwargs)
254             except socket.error:
255                 LOG.error(traceback.format_exc())
256                 self._socket_error()
257                 return
258             return ret
259
260         return wrapper
261
262     def _socket_error(self):
263         if self.started:
264             self.stop()
265
266
267 def _create_prefix_notification(outgoing_msg, rpc_session):
268     """Constructs prefix notification with data from given outgoing message.
269
270     Given RPC session is used to create RPC notification message.
271     """
272     assert outgoing_msg
273     path = outgoing_msg.path
274     assert path
275     vpn_nlri = path.nlri
276
277     assert path.source is not None
278     if path.source != VRF_TABLE:
279         # Extract relevant info for update-add/update-delete.
280         params = [{ROUTE_DISTINGUISHER: outgoing_msg.route_dist,
281                    PREFIX: vpn_nlri.prefix,
282                    NEXT_HOP: path.nexthop,
283                    VRF_RF: VrfConf.rf_2_vrf_rf(path.route_family)}]
284         if path.nlri.ROUTE_FAMILY.safi not in (subaddr_family.IP_FLOWSPEC,
285                                                subaddr_family.VPN_FLOWSPEC):
286             params[VPN_LABEL] = path.label_list[0]
287
288         if not path.is_withdraw:
289             # Create notification to NetworkController.
290             rpc_msg = rpc_session.create_notification(
291                 NOTIFICATION_ADD_REMOTE_PREFIX, params)
292         else:
293             # Create update-delete request to NetworkController.
294             rpc_msg = rpc_session.create_notification(
295                 NOTIFICATION_DELETE_REMOTE_PREFIX, params)
296     else:
297         # Extract relevant info for update-add/update-delete.
298         params = [{ROUTE_DISTINGUISHER: outgoing_msg.route_dist,
299                    PREFIX: vpn_nlri.prefix,
300                    NEXT_HOP: path.nexthop,
301                    VRF_RF: VrfConf.rf_2_vrf_rf(path.route_family),
302                    ORIGIN_RD: path.origin_rd}]
303         if not path.is_withdraw:
304             # Create notification to NetworkController.
305             rpc_msg = rpc_session.create_notification(
306                 NOTIFICATION_ADD_LOCAL_PREFIX, params)
307         else:
308             # Create update-delete request to NetworkController.
309             rpc_msg = rpc_session.create_notification(
310                 NOTIFICATION_DELETE_LOCAL_PREFIX, params)
311
312     return rpc_msg
313
314
315 def _validate_rpc_ip(rpc_server_ip):
316     """Validates given ip for use as rpc host bind address.
317     """
318     if not is_valid_ipv4(rpc_server_ip):
319         raise NetworkControllerError(desc='Invalid rpc ip address.')
320     return rpc_server_ip
321
322
323 def _validate_rpc_port(port):
324     """Validates give port for use as rpc server port.
325     """
326     if not port:
327         raise NetworkControllerError(desc='Invalid rpc port number.')
328     if isinstance(port, str):
329         port = int(port)
330
331     if port <= 0:
332         raise NetworkControllerError(desc='Invalid rpc port number %s' % port)
333     return port
334
335
336 class _NetworkController(FlexinetPeer, Activity):
337     """Network controller peer.
338
339     Provides MessagePackRPC interface for flexinet peers like Network
340     controller to peer and have RPC session with BGPS process. This RPC
341     interface provides access to BGPS API.
342     """
343
344     def __init__(self):
345         FlexinetPeer.__init__(self)
346         Activity.__init__(self, name='NETWORK_CONTROLLER')
347         # Outstanding requests, i.e. requests for which we are yet to receive
348         # response from peer. We currently do not have any requests going out.
349         self._outstanding_reqs = {}
350         # Dictionary for Peer name to RPC session.
351         self._rpc_sessions = {}
352
353     def _run(self, *args, **kwargs):
354         """Runs RPC server.
355
356         Wait for peer to connect and start rpc session with it.
357         For every connection we start and new rpc session.
358         """
359         apgw_rpc_bind_ip = _validate_rpc_ip(kwargs.pop(NC_RPC_BIND_IP))
360         apgw_rpc_bind_port = _validate_rpc_port(kwargs.pop(NC_RPC_BIND_PORT))
361
362         sock_addr = (apgw_rpc_bind_ip, apgw_rpc_bind_port)
363         LOG.debug('NetworkController started listening for connections...')
364
365         server_thread, _ = self._listen_tcp(sock_addr,
366                                             self._start_rpc_session)
367         self.pause(0)
368         server_thread.wait()
369
370     def _start_rpc_session(self, sock):
371         """Starts a new RPC session with given connection.
372         """
373         session_name = RpcSession.NAME_FMT % str(sock.getpeername())
374         self._stop_child_activities(session_name)
375
376         rpc_session = RpcSession(sock, self)
377         self._spawn_activity(rpc_session)
378
379     def _send_rpc_notification_to_session(self, session, method, params):
380         if not session.is_connected:
381             # Stops disconnected RPC session.
382             self._stop_child_activities(session.name)
383             return
384
385         return session.send_notification(method, params)
386
387     def send_rpc_notification(self, method, params):
388         if not self.started:
389             return
390
391         for session in list(self._child_activity_map.values()):
392             if not isinstance(session, RpcSession):
393                 continue
394             self._send_rpc_notification_to_session(session, method, params)
395
396
397 def _handle_response(response):
398     raise NotImplementedError('BGPS is not making any request hence should not'
399                               ' get any response. Response: %s' % response)
400
401
402 def _handle_notification(notification):
403     LOG.debug('Notification from NetworkController<<: %s %s',
404               notification[RPC_IDX_NTF_SYM], notification[RPC_IDX_NTF_PARAM])
405     operation, params = notification[1], notification[2]
406     return api.base.call(operation, **params[0])
407
408
409 def _handle_request(request):
410     LOG.debug('Request from NetworkController<<: %s %s',
411               request[RPC_IDX_REQ_SYM], request[RPC_IDX_REQ_PARAM])
412     operation, params = request[2], request[3]
413     kwargs = {}
414     if len(params) > 0:
415         kwargs = params[0]
416     try:
417         return api.base.call(operation, **kwargs)
418     except TypeError:
419         LOG.error(traceback.format_exc())
420         raise ApiException(desc='Invalid type for RPC parameter.')
421
422
423 # Network controller singleton
424 NET_CONTROLLER = _NetworkController()