1 # Copyright (c) 2014 Rackspace Hosting
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
20 from ryu.base import app_manager
21 from ryu.lib import hub
22 from ryu.lib import ip
23 from ryu.services.protocols.ovsdb import client
24 from ryu.services.protocols.ovsdb import event
25 from ryu.controller import handler
28 opts = (cfg.StrOpt('address', default='0.0.0.0', help='OVSDB address'),
29 cfg.IntOpt('port', default=6640, help='OVSDB port'),
30 cfg.IntOpt('probe-interval', help='OVSDB reconnect probe interval'),
31 cfg.IntOpt('min-backoff',
32 help=('OVSDB reconnect minimum milliseconds between '
33 'connection attemps')),
34 cfg.IntOpt('max-backoff',
35 help=('OVSDB reconnect maximum milliseconds between '
36 'connection attemps')),
37 cfg.StrOpt('mngr-privkey', default=None, help='manager private key'),
38 cfg.StrOpt('mngr-cert', default=None, help='manager certificate'),
39 cfg.ListOpt('whitelist', default=[],
40 help='Whitelist of address to allow to connect'),
41 cfg.ListOpt('schema-tables', default=[],
42 help='Tables in the OVSDB schema to configure'),
43 cfg.ListOpt('schema-exclude-columns', default=[],
44 help='Table columns in the OVSDB schema to filter out. '
45 'Values should be in the format: <table>.<column>.'
46 'Ex: Bridge.netflow,Interface.statistics')
49 cfg.CONF.register_opts(opts, 'ovsdb')
52 class OVSDB(app_manager.RyuApp):
53 _EVENTS = [event.EventNewOVSDBConnection,
54 event.EventModifyRequest,
55 event.EventReadRequest]
57 def __init__(self, *args, **kwargs):
58 super(OVSDB, self).__init__(*args, **kwargs)
59 self._address = self.CONF.ovsdb.address
60 self._port = self.CONF.ovsdb.port
61 self._probe_interval = self.CONF.ovsdb.probe_interval
62 self._min_backoff = self.CONF.ovsdb.min_backoff
63 self._max_backoff = self.CONF.ovsdb.max_backoff
66 def _accept(self, server):
67 if self.CONF.ovsdb.whitelist:
69 if address in self.CONF.ovsdb.whitelist:
72 self.logger.debug('Connection from non-whitelist client '
82 # TODO(jkoelker) SSL Certificate Fingerprint check
83 sock, client_address = server.accept()
87 self.logger.exception('Error accepting connection')
90 if not check(client_address[0]):
91 sock.shutdown(socket.SHUT_RDWR)
95 if ip.valid_ipv6(client_address[0]):
97 'New connection from [%s]:%s' % client_address[:2])
100 'New connection from %s:%s' % client_address[:2])
101 t = hub.spawn(self._start_remote, sock, client_address)
102 self.threads.append(t)
104 def _bulk_read_handler(self, ev):
107 def done(gt, *args, **kwargs):
108 if gt in self.threads:
109 self.threads.remove(gt)
111 results.append(gt.wait())
114 for c in self._clients.values():
115 gt = hub.spawn(c.read_request_handler, ev, bulk=True)
117 self.threads.append(gt)
121 rep = event.EventReadReply(None, results)
122 self.reply_to_request(ev, rep)
124 def _proxy_event(self, ev):
125 system_id = ev.system_id
126 client_name = client.RemoteOvsdb.instance_name(system_id)
128 if client_name not in self._clients:
129 self.logger.info('Unknown remote system_id %s' % system_id)
132 return self.send_event(client_name, ev)
134 def _start_remote(self, sock, client_address):
135 schema_tables = cfg.CONF.ovsdb.schema_tables
137 if cfg.CONF.ovsdb.schema_exclude_columns:
138 for c in cfg.CONF.ovsdb.schema_exclude_columns:
139 tbl, col = c.split('.')
140 if tbl in schema_ex_col:
141 schema_ex_col[tbl].append(col)
143 schema_ex_col[tbl] = [col]
145 app = client.RemoteOvsdb.factory(sock, client_address,
146 probe_interval=self._probe_interval,
147 min_backoff=self._min_backoff,
148 max_backoff=self._max_backoff,
149 schema_tables=schema_tables,
150 schema_exclude_columns=schema_ex_col)
153 self._clients[app.name] = app
155 ev = event.EventNewOVSDBConnection(app)
156 self.send_event_to_observers(ev)
160 sock.shutdown(socket.SHUT_RDWR)
167 if ip.valid_ipv6(self._address):
169 (self._address, self._port), family=socket.AF_INET6)
171 server = hub.listen((self._address, self._port))
172 key = self.CONF.ovsdb.mngr_privkey or self.CONF.ctl_privkey
173 cert = self.CONF.ovsdb.mngr_cert or self.CONF.ctl_cert
175 if key is not None and cert is not None:
176 ssl_kwargs = dict(keyfile=key, certfile=cert, server_side=True)
178 if self.CONF.ca_certs is not None:
179 ssl_kwargs['cert_reqs'] = ssl.CERT_REQUIRED
180 ssl_kwargs['ca_certs'] = self.CONF.ca_certs
182 server = ssl.wrap_socket(server, **ssl_kwargs)
184 self._server = server
186 if ip.valid_ipv6(self._address):
188 'Listening on [%s]:%s for clients', self._address, self._port)
191 'Listening on %s:%s for clients', self._address, self._port)
192 t = hub.spawn(self._accept, self._server)
193 super(OVSDB, self).start()
197 # NOTE(jkoelker) Attempt to gracefully stop the accept loop
198 self.is_active = False
200 # NOTE(jkoelker) Forceably kill the loop and clear the main_thread
202 hub.kill(self.main_thread)
203 self.main_thread = None
205 # NOTE(jkoelker) Stop all the clients
206 for c in self._clients.values():
209 # NOTE(jkoelker) super will only take care of the event and joining now
210 super(OVSDB, self).stop()
212 @handler.set_ev_cls(event.EventModifyRequest)
213 def modify_request_handler(self, ev):
215 system_id = ev.system_id
216 client_name = client.RemoteOvsdb.instance_name(system_id)
217 remote = self._clients.get(client_name)
220 msg = 'Unknown remote system_id %s' % system_id
221 self.logger.info(msg)
222 rep = event.EventModifyReply(system_id, None, None, msg)
223 return self.reply_to_request(ev, rep)
225 return remote.modify_request_handler(ev)
227 @handler.set_ev_cls(event.EventReadRequest)
228 def read_request_handler(self, ev):
229 system_id = ev.system_id
231 if system_id is None:
232 def done(gt, *args, **kwargs):
233 if gt in self.threads:
234 self.threads.remove(gt)
236 thread = hub.spawn(self._bulk_read_handler, ev)
237 self.threads.append(thread)
238 return thread.link(done)
240 client_name = client.RemoteOvsdb.instance_name(system_id)
241 remote = self._clients.get(client_name)
244 msg = 'Unknown remote system_id %s' % system_id
245 self.logger.info(msg)
246 rep = event.EventReadReply(system_id, None, msg)
247 return self.reply_to_request(ev, rep)
249 return remote.read_request_handler(ev)