backing up
[vsorcdistro/.git] / ryu / build / lib.linux-armv7l-2.7 / ryu / services / protocols / ovsdb / manager.py
1 # Copyright (c) 2014 Rackspace Hosting
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #    http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 import ssl
17 import socket
18
19 from ryu import cfg
20 from ryu.base import app_manager
21 from ryu.lib import hub
22 from ryu.lib import ip
23 from ryu.services.protocols.ovsdb import client
24 from ryu.services.protocols.ovsdb import event
25 from ryu.controller import handler
26
27
28 opts = (cfg.StrOpt('address', default='0.0.0.0', help='OVSDB address'),
29         cfg.IntOpt('port', default=6640, help='OVSDB port'),
30         cfg.IntOpt('probe-interval', help='OVSDB reconnect probe interval'),
31         cfg.IntOpt('min-backoff',
32                    help=('OVSDB reconnect minimum milliseconds between '
33                          'connection attemps')),
34         cfg.IntOpt('max-backoff',
35                    help=('OVSDB reconnect maximum milliseconds between '
36                          'connection attemps')),
37         cfg.StrOpt('mngr-privkey', default=None, help='manager private key'),
38         cfg.StrOpt('mngr-cert', default=None, help='manager certificate'),
39         cfg.ListOpt('whitelist', default=[],
40                     help='Whitelist of address to allow to connect'),
41         cfg.ListOpt('schema-tables', default=[],
42                     help='Tables in the OVSDB schema to configure'),
43         cfg.ListOpt('schema-exclude-columns', default=[],
44                     help='Table columns in the OVSDB schema to filter out.  '
45                          'Values should be in the format: <table>.<column>.'
46                          'Ex: Bridge.netflow,Interface.statistics')
47         )
48
49 cfg.CONF.register_opts(opts, 'ovsdb')
50
51
52 class OVSDB(app_manager.RyuApp):
53     _EVENTS = [event.EventNewOVSDBConnection,
54                event.EventModifyRequest,
55                event.EventReadRequest]
56
57     def __init__(self, *args, **kwargs):
58         super(OVSDB, self).__init__(*args, **kwargs)
59         self._address = self.CONF.ovsdb.address
60         self._port = self.CONF.ovsdb.port
61         self._probe_interval = self.CONF.ovsdb.probe_interval
62         self._min_backoff = self.CONF.ovsdb.min_backoff
63         self._max_backoff = self.CONF.ovsdb.max_backoff
64         self._clients = {}
65
66     def _accept(self, server):
67         if self.CONF.ovsdb.whitelist:
68             def check(address):
69                 if address in self.CONF.ovsdb.whitelist:
70                     return True
71
72                 self.logger.debug('Connection from non-whitelist client '
73                                   '(%s:%s)' % address)
74                 return False
75
76         else:
77             def check(address):
78                 return True
79
80         while self.is_active:
81             try:
82                 # TODO(jkoelker) SSL Certificate Fingerprint check
83                 sock, client_address = server.accept()
84
85             except:
86                 if self.is_active:
87                     self.logger.exception('Error accepting connection')
88                     continue
89
90             if not check(client_address[0]):
91                 sock.shutdown(socket.SHUT_RDWR)
92                 sock.close()
93                 continue
94
95             if ip.valid_ipv6(client_address[0]):
96                 self.logger.debug(
97                     'New connection from [%s]:%s' % client_address[:2])
98             else:
99                 self.logger.debug(
100                     'New connection from %s:%s' % client_address[:2])
101             t = hub.spawn(self._start_remote, sock, client_address)
102             self.threads.append(t)
103
104     def _bulk_read_handler(self, ev):
105         results = []
106
107         def done(gt, *args, **kwargs):
108             if gt in self.threads:
109                 self.threads.remove(gt)
110
111             results.append(gt.wait())
112
113         threads = []
114         for c in self._clients.values():
115             gt = hub.spawn(c.read_request_handler, ev, bulk=True)
116             threads.append(gt)
117             self.threads.append(gt)
118             gt.link(done)
119
120         hub.joinall(threads)
121         rep = event.EventReadReply(None, results)
122         self.reply_to_request(ev, rep)
123
124     def _proxy_event(self, ev):
125         system_id = ev.system_id
126         client_name = client.RemoteOvsdb.instance_name(system_id)
127
128         if client_name not in self._clients:
129             self.logger.info('Unknown remote system_id %s' % system_id)
130             return
131
132         return self.send_event(client_name, ev)
133
134     def _start_remote(self, sock, client_address):
135         schema_tables = cfg.CONF.ovsdb.schema_tables
136         schema_ex_col = {}
137         if cfg.CONF.ovsdb.schema_exclude_columns:
138             for c in cfg.CONF.ovsdb.schema_exclude_columns:
139                 tbl, col = c.split('.')
140                 if tbl in schema_ex_col:
141                     schema_ex_col[tbl].append(col)
142                 else:
143                     schema_ex_col[tbl] = [col]
144
145         app = client.RemoteOvsdb.factory(sock, client_address,
146                                          probe_interval=self._probe_interval,
147                                          min_backoff=self._min_backoff,
148                                          max_backoff=self._max_backoff,
149                                          schema_tables=schema_tables,
150                                          schema_exclude_columns=schema_ex_col)
151
152         if app:
153             self._clients[app.name] = app
154             app.start()
155             ev = event.EventNewOVSDBConnection(app)
156             self.send_event_to_observers(ev)
157
158         else:
159             try:
160                 sock.shutdown(socket.SHUT_RDWR)
161             except:
162                 pass
163
164             sock.close()
165
166     def start(self):
167         if ip.valid_ipv6(self._address):
168             server = hub.listen(
169                 (self._address, self._port), family=socket.AF_INET6)
170         else:
171             server = hub.listen((self._address, self._port))
172         key = self.CONF.ovsdb.mngr_privkey or self.CONF.ctl_privkey
173         cert = self.CONF.ovsdb.mngr_cert or self.CONF.ctl_cert
174
175         if key is not None and cert is not None:
176             ssl_kwargs = dict(keyfile=key, certfile=cert, server_side=True)
177
178             if self.CONF.ca_certs is not None:
179                 ssl_kwargs['cert_reqs'] = ssl.CERT_REQUIRED
180                 ssl_kwargs['ca_certs'] = self.CONF.ca_certs
181
182             server = ssl.wrap_socket(server, **ssl_kwargs)
183
184         self._server = server
185
186         if ip.valid_ipv6(self._address):
187             self.logger.info(
188                 'Listening on [%s]:%s for clients', self._address, self._port)
189         else:
190             self.logger.info(
191                 'Listening on %s:%s for clients', self._address, self._port)
192         t = hub.spawn(self._accept, self._server)
193         super(OVSDB, self).start()
194         return t
195
196     def stop(self):
197         # NOTE(jkoelker) Attempt to gracefully stop the accept loop
198         self.is_active = False
199
200         # NOTE(jkoelker) Forceably kill the loop and clear the main_thread
201         if self.main_thread:
202             hub.kill(self.main_thread)
203             self.main_thread = None
204
205         # NOTE(jkoelker) Stop all the clients
206         for c in self._clients.values():
207             c.stop()
208
209         # NOTE(jkoelker) super will only take care of the event and joining now
210         super(OVSDB, self).stop()
211
212     @handler.set_ev_cls(event.EventModifyRequest)
213     def modify_request_handler(self, ev):
214
215         system_id = ev.system_id
216         client_name = client.RemoteOvsdb.instance_name(system_id)
217         remote = self._clients.get(client_name)
218
219         if not remote:
220             msg = 'Unknown remote system_id %s' % system_id
221             self.logger.info(msg)
222             rep = event.EventModifyReply(system_id, None, None, msg)
223             return self.reply_to_request(ev, rep)
224
225         return remote.modify_request_handler(ev)
226
227     @handler.set_ev_cls(event.EventReadRequest)
228     def read_request_handler(self, ev):
229         system_id = ev.system_id
230
231         if system_id is None:
232             def done(gt, *args, **kwargs):
233                 if gt in self.threads:
234                     self.threads.remove(gt)
235
236             thread = hub.spawn(self._bulk_read_handler, ev)
237             self.threads.append(thread)
238             return thread.link(done)
239
240         client_name = client.RemoteOvsdb.instance_name(system_id)
241         remote = self._clients.get(client_name)
242
243         if not remote:
244             msg = 'Unknown remote system_id %s' % system_id
245             self.logger.info(msg)
246             rep = event.EventReadReply(system_id, None, msg)
247             return self.reply_to_request(ev, rep)
248
249         return remote.read_request_handler(ev)