1 # Copyright (C) 2017 Nippon Telegraph and Telephone Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 Zebra Server corresponding to 'zserv' structure.
27 from ryu.base import app_manager
28 from ryu.base.app_manager import RyuApp
29 from ryu.controller.handler import set_ev_cls
30 from ryu.lib import hub
31 from ryu.lib import ip
32 from ryu.lib.packet import zebra
34 from ryu.services.protocols.zebra import db
35 from ryu.services.protocols.zebra import event
36 from ryu.services.protocols.zebra.server import event as zserver_event
39 LOG = logging.getLogger(__name__)
41 CONF = cfg.CONF['zapi']
42 GLOBAL_CONF = cfg.CONF
44 # Session to database of Zebra protocol service
45 SESSION = db.Session()
48 class ZClient(object):
53 def __init__(self, server, sock, addr):
57 self.logger = server.logger
58 self.is_active = False
60 self.send_q = hub.Queue(16)
62 # Zebra protocol version
63 self.zserv_ver = CONF.server_version
65 # Zebra route type distributed by client (not initialized yet)
66 self.route_type = None
70 self.sock.settimeout(GLOBAL_CONF.socket_timeout)
72 self._threads.append(hub.spawn(self._send_loop))
73 self._threads.append(hub.spawn(self._recv_loop))
75 self.server.send_event_to_observers(
76 zserver_event.EventZClientConnected(self))
78 hub.joinall(self._threads)
80 self.server.send_event_to_observers(
81 zserver_event.EventZClientDisconnected(self))
84 self.is_active = False
89 buf = self.send_q.get()
90 self.sock.sendall(buf)
91 except socket.error as e:
92 self.logger.exception(
93 'Error while sending message to Zebra client%s: %s',
100 min_len = recv_len = zebra.ZebraMessage.get_header_size(
103 while self.is_active:
105 recv_buf = self.sock.recv(recv_len)
106 except socket.timeout:
109 if len(recv_buf) == 0:
113 while len(buf) >= min_len:
114 (length,) = struct.unpack_from('!H', buf)
115 if (length - len(buf)) > 0:
116 # Need to receive remaining data
117 recv_len = length - len(buf)
120 msg, _, buf = zebra.ZebraMessage.parser(buf)
122 ev = event.message_to_event(self, msg)
124 self.logger.debug('Notify event: %s', ev)
125 self.server.send_event_to_observers(ev)
127 except socket.error as e:
128 self.logger.exception(
129 'Error while sending message to Zebra client%s: %s',
134 def send_msg(self, msg):
138 :param msg: Instance of py:class: `ryu.lib.packet.zebra.ZebraMessage`.
139 :return: Serialized msg if succeeded, otherwise None.
141 if not self.is_active:
143 'Cannot send message: Already deactivated: msg=%s', msg)
145 elif not self.send_q:
147 'Cannot send message: Send queue does not exist: msg=%s', msg)
149 elif self.zserv_ver != msg.version:
151 'Zebra protocol version mismatch:'
152 'server_version=%d, msg.version=%d',
153 self.zserv_ver, msg.version)
154 msg.version = self.zserv_ver # fixup
156 self.send_q.put(msg.serialize())
159 def zclient_connection_factory(sock, addr):
160 LOG.debug('Connected from client: %s: %s', addr, sock)
161 zserv = app_manager.lookup_service_brick(ZServer.__name__)
162 with contextlib.closing(ZClient(zserv, sock, addr)) as zclient:
165 except Exception as e:
166 LOG.error('Error in client%s: %s', addr, e)
170 def detect_address_family(host):
171 if ip.valid_ipv4(host):
172 return socket.AF_INET
173 elif ip.valid_ipv6(host):
174 return socket.AF_INET6
175 elif os.path.isdir(os.path.dirname(host)):
176 return socket.AF_UNIX
181 class ZServer(RyuApp):
183 The base class for Zebra server application.
185 _EVENTS = event.ZEBRA_EVENTS + [
186 zserver_event.EventZClientConnected,
187 zserver_event.EventZClientDisconnected,
190 def __init__(self, *args, **kwargs):
191 super(ZServer, self).__init__(*args, **kwargs)
193 self.zserv_addr = (CONF.server_host, CONF.server_port)
194 self.zapi_connection_family = detect_address_family(CONF.server_host)
196 # Initial Router ID for Zebra server
197 self.router_id = CONF.router_id
200 super(ZServer, self).start()
202 if self.zapi_connection_family == socket.AF_UNIX:
203 unix_sock_dir = os.path.dirname(CONF.server_host)
204 # Makes sure the unix socket does not already exist
205 if os.path.exists(CONF.server_host):
206 os.remove(CONF.server_host)
207 if not os.path.isdir(unix_sock_dir):
208 os.mkdir(unix_sock_dir)
209 os.chmod(unix_sock_dir, 0o777)
212 self.zserv = hub.StreamServer(
213 self.zserv_addr, zclient_connection_factory)
216 'Cannot start Zebra server%s: %s', self.zserv_addr, e)
219 if self.zapi_connection_family == socket.AF_UNIX:
220 os.chmod(CONF.server_host, 0o777)
222 self._add_lo_interface()
224 return hub.spawn(self.zserv.serve_forever)
226 def _add_lo_interface(self):
227 intf = db.interface.ip_link_add(SESSION, 'lo')
229 self.logger.debug('Added interface "%s": %s', intf.ifname, intf)
231 route = db.route.ip_route_add(
233 destination='127.0.0.0/8',
235 source='127.0.0.1/8',
236 route_type=zebra.ZEBRA_ROUTE_CONNECT)
239 'Added route to "%s": %s', route.destination, route)
241 @set_ev_cls(event.EventZebraHello)
242 def _hello_handler(self, ev):
244 self.logger.debug('Client %s says hello.', ev.zclient)
247 # Set distributed route_type to ZClient
248 ev.zclient.route_type = ev.body.route_type
250 'Client %s says hello and bids fair to announce only %s routes',
251 ev.zclient, ev.body.route_type)
253 @set_ev_cls(event.EventZebraRouterIDAdd)
254 def _router_id_add_handler(self, ev):
256 'Client %s requests router_id, server will response: router_id=%s',
257 ev.zclient, self.router_id)
259 # Send ZEBRA_ROUTER_ID_UPDATE for response
260 msg = zebra.ZebraMessage(
261 body=zebra.ZebraRouterIDUpdate(
262 family=socket.AF_INET,
263 prefix='%s/32' % self.router_id))
264 ev.zclient.send_msg(msg)
266 @set_ev_cls(event.EventZebraInterfaceAdd)
267 def _interface_add_handler(self, ev):
268 self.logger.debug('Client %s requested all interfaces', ev.zclient)
270 interfaces = db.interface.ip_address_show_all(SESSION)
271 self.logger.debug('Server will response interfaces: %s', interfaces)
272 for intf in interfaces:
273 msg = zebra.ZebraMessage(
274 body=zebra.ZebraInterfaceAdd(
276 ifindex=intf.ifindex,
279 ptm_enable=zebra.ZEBRA_IF_PTM_ENABLE_OFF,
280 ptm_status=zebra.ZEBRA_PTM_STATUS_UNKNOWN,
284 bandwidth=intf.bandwidth,
285 ll_type=intf.ll_type,
286 hw_addr=intf.hw_addr))
287 ev.zclient.send_msg(msg)
289 routes = db.route.ip_route_show_all(
290 SESSION, ifindex=intf.ifindex, is_selected=True)
291 self.logger.debug('Server will response routes: %s', routes)
293 dest, _ = route.destination.split('/')
294 msg = zebra.ZebraMessage(
295 body=zebra.ZebraInterfaceAddressAdd(
296 ifindex=intf.ifindex,
301 ev.zclient.send_msg(msg)
303 @set_ev_cls([event.EventZebraIPv4RouteAdd,
304 event.EventZebraIPv6RouteAdd])
305 def _ip_route_add_handler(self, ev):
307 'Client %s advertised IP route: %s', ev.zclient, ev.body)
309 for nexthop in ev.body.nexthops:
310 route = db.route.ip_route_add(
312 destination=ev.body.prefix,
313 gateway=nexthop.addr,
314 ifindex=nexthop.ifindex or 0,
315 route_type=ev.body.route_type)
318 'Added route to "%s": %s', route.destination, route)
320 @set_ev_cls([event.EventZebraIPv4RouteDelete,
321 event.EventZebraIPv6RouteDelete])
322 def _ip_route_delete_handler(self, ev):
324 'Client %s withdrew IP route: %s', ev.zclient, ev.body)
326 for nexthop in ev.body.nexthops:
327 routes = db.route.ip_route_delete(
329 destination=ev.body.prefix,
330 gateway=nexthop.addr,
331 route_type=ev.body.route_type)
334 'Deleted routes to "%s": %s', ev.body.prefix, routes)