1 # Copyright (C) 2014 Nippon Telegraph and Telephone Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
20 from ryu.base import app_manager
21 from ryu.controller import handler
22 from ryu.services.protocols.vrrp import event as vrrp_event
23 from ryu.services.protocols.vrrp import api as vrrp_api
24 from ryu.lib import rpc
25 from ryu.lib import hub
26 from ryu.lib import mac
28 VRRP_RPC_PORT = 50004 # random
31 class RPCError(Exception):
36 def __init__(self, queue):
37 super(Peer, self).__init__()
40 def _handle_vrrp_request(self, data):
41 self.queue.put((self, data))
44 class RpcVRRPManager(app_manager.RyuApp):
45 def __init__(self, *args, **kwargs):
46 super(RpcVRRPManager, self).__init__(*args, **kwargs)
47 self.CONF.register_opts([
48 cfg.IntOpt('vrrp-rpc-port', default=VRRP_RPC_PORT,
49 help='port for vrrp rpc interface')])
54 self._rpc_events = hub.Queue(128)
55 self.server_thread = hub.spawn(self._peer_accept_thread)
56 self.event_thread = hub.spawn(self._rpc_request_loop_thread)
58 def _rpc_request_loop_thread(self):
60 (peer, data) = self._rpc_events.get()
61 msgid, target_method, params = data
65 if target_method == b'vrrp_config':
66 result = self._config(msgid, params)
67 elif target_method == b'vrrp_list':
68 result = self._list(msgid, params)
69 elif target_method == b'vrrp_config_change':
70 result = self._config_change(msgid, params)
72 error = 'Unknown method %s' % (target_method)
75 peer._endpoint.send_response(msgid, error=error, result=result)
77 def _peer_loop_thread(self, peer):
78 peer._endpoint.serve()
79 # the peer connection is closed
80 self._peers.remove(peer)
82 def peer_accept_handler(self, new_sock, addr):
83 peer = Peer(self._rpc_events)
85 rpc.MessageType.REQUEST: peer._handle_vrrp_request,
87 peer._endpoint = rpc.EndPoint(new_sock, disp_table=table)
88 self._peers.append(peer)
89 hub.spawn(self._peer_loop_thread, peer)
91 def _peer_accept_thread(self):
92 server = hub.StreamServer(('', self.CONF.vrrp_rpc_port),
93 self.peer_accept_handler)
94 server.serve_forever()
96 def _params_to_dict(self, params, keys):
98 for k, v in params.items():
103 def _config(self, msgid, params):
104 self.logger.debug('handle vrrp_config request')
106 param_dict = params[0]
108 raise RPCError('parameters are missing')
110 if_params = self._params_to_dict(param_dict,
111 ('primary_ip_address',
113 # drop vlan support later
114 if_params['vlan_id'] = None
115 if_params['mac_address'] = mac.DONTCARE_STR
117 interface = vrrp_event.VRRPInterfaceNetworkDevice(**if_params)
119 raise RPCError('parameters are invalid, %s' % (str(param_dict)))
121 config_params = self._params_to_dict(param_dict,
123 'ip_addresses', # mandatory
127 'advertisement_interval',
130 'statistics_interval'))
132 ip_addr = config_params.pop('ip_addresses')
133 config_params['ip_addresses'] = [ip_addr]
134 config = vrrp_event.VRRPConfig(**config_params)
136 raise RPCError('parameters are invalid, %s' % (str(param_dict)))
138 config_result = vrrp_api.vrrp_config(self, interface, config)
141 config_result.config.vrid,
142 config_result.config.priority,
143 str(netaddr.IPAddress(config_result.config.ip_addresses[0]))]
146 def _lookup_instance(self, vrid):
147 for instance in vrrp_api.vrrp_list(self).instance_list:
148 if vrid == instance.config.vrid:
149 return instance.instance_name
152 def _config_change(self, msgid, params):
153 self.logger.debug('handle vrrp_config_change request')
155 config_values = params[0]
157 raise RPCError('parameters are missing')
159 vrid = config_values.get('vrid')
160 instance_name = self._lookup_instance(vrid)
161 if not instance_name:
162 raise RPCError('vrid %d is not found' % (vrid))
164 priority = config_values.get('priority')
165 interval = config_values.get('advertisement_interval')
166 vrrp_api.vrrp_config_change(self, instance_name, priority=priority,
167 advertisement_interval=interval)
170 def _list(self, msgid, params):
171 self.logger.debug('handle vrrp_list request')
172 result = vrrp_api.vrrp_list(self)
173 instance_list = result.instance_list
175 for instance in instance_list:
178 "instance_name": instance.instance_name,
180 "version": c.version,
181 "advertisement_interval": c.advertisement_interval,
182 "priority": c.priority,
183 "virtual_ip_address": str(netaddr.IPAddress(c.ip_addresses[0]))
185 ret_list.append(info_dict)
188 @handler.set_ev_cls(vrrp_event.EventVRRPStateChanged)
189 def vrrp_state_changed_handler(self, ev):
190 self.logger.info('handle EventVRRPStateChanged')
191 name = ev.instance_name
192 old_state = ev.old_state
193 new_state = ev.new_state
194 vrid = ev.config.vrid
195 self.logger.info('VRID:%s %s: %s -> %s', vrid, name, old_state,
197 params = {'vrid': vrid, 'old_state': old_state, 'new_state': new_state}
198 for peer in self._peers:
199 peer._endpoint.send_notification("notify_status", [params])