backing up
[vsorcdistro/.git] / ryu / build / lib.linux-armv7l-2.7 / ryu / services / protocols / ovsdb / manager.py
diff --git a/ryu/build/lib.linux-armv7l-2.7/ryu/services/protocols/ovsdb/manager.py b/ryu/build/lib.linux-armv7l-2.7/ryu/services/protocols/ovsdb/manager.py
new file mode 100644 (file)
index 0000000..5a5b424
--- /dev/null
@@ -0,0 +1,249 @@
+# Copyright (c) 2014 Rackspace Hosting
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import ssl
+import socket
+
+from ryu import cfg
+from ryu.base import app_manager
+from ryu.lib import hub
+from ryu.lib import ip
+from ryu.services.protocols.ovsdb import client
+from ryu.services.protocols.ovsdb import event
+from ryu.controller import handler
+
+
+opts = (cfg.StrOpt('address', default='0.0.0.0', help='OVSDB address'),
+        cfg.IntOpt('port', default=6640, help='OVSDB port'),
+        cfg.IntOpt('probe-interval', help='OVSDB reconnect probe interval'),
+        cfg.IntOpt('min-backoff',
+                   help=('OVSDB reconnect minimum milliseconds between '
+                         'connection attemps')),
+        cfg.IntOpt('max-backoff',
+                   help=('OVSDB reconnect maximum milliseconds between '
+                         'connection attemps')),
+        cfg.StrOpt('mngr-privkey', default=None, help='manager private key'),
+        cfg.StrOpt('mngr-cert', default=None, help='manager certificate'),
+        cfg.ListOpt('whitelist', default=[],
+                    help='Whitelist of address to allow to connect'),
+        cfg.ListOpt('schema-tables', default=[],
+                    help='Tables in the OVSDB schema to configure'),
+        cfg.ListOpt('schema-exclude-columns', default=[],
+                    help='Table columns in the OVSDB schema to filter out.  '
+                         'Values should be in the format: <table>.<column>.'
+                         'Ex: Bridge.netflow,Interface.statistics')
+        )
+
+cfg.CONF.register_opts(opts, 'ovsdb')
+
+
+class OVSDB(app_manager.RyuApp):
+    _EVENTS = [event.EventNewOVSDBConnection,
+               event.EventModifyRequest,
+               event.EventReadRequest]
+
+    def __init__(self, *args, **kwargs):
+        super(OVSDB, self).__init__(*args, **kwargs)
+        self._address = self.CONF.ovsdb.address
+        self._port = self.CONF.ovsdb.port
+        self._probe_interval = self.CONF.ovsdb.probe_interval
+        self._min_backoff = self.CONF.ovsdb.min_backoff
+        self._max_backoff = self.CONF.ovsdb.max_backoff
+        self._clients = {}
+
+    def _accept(self, server):
+        if self.CONF.ovsdb.whitelist:
+            def check(address):
+                if address in self.CONF.ovsdb.whitelist:
+                    return True
+
+                self.logger.debug('Connection from non-whitelist client '
+                                  '(%s:%s)' % address)
+                return False
+
+        else:
+            def check(address):
+                return True
+
+        while self.is_active:
+            try:
+                # TODO(jkoelker) SSL Certificate Fingerprint check
+                sock, client_address = server.accept()
+
+            except:
+                if self.is_active:
+                    self.logger.exception('Error accepting connection')
+                    continue
+
+            if not check(client_address[0]):
+                sock.shutdown(socket.SHUT_RDWR)
+                sock.close()
+                continue
+
+            if ip.valid_ipv6(client_address[0]):
+                self.logger.debug(
+                    'New connection from [%s]:%s' % client_address[:2])
+            else:
+                self.logger.debug(
+                    'New connection from %s:%s' % client_address[:2])
+            t = hub.spawn(self._start_remote, sock, client_address)
+            self.threads.append(t)
+
+    def _bulk_read_handler(self, ev):
+        results = []
+
+        def done(gt, *args, **kwargs):
+            if gt in self.threads:
+                self.threads.remove(gt)
+
+            results.append(gt.wait())
+
+        threads = []
+        for c in self._clients.values():
+            gt = hub.spawn(c.read_request_handler, ev, bulk=True)
+            threads.append(gt)
+            self.threads.append(gt)
+            gt.link(done)
+
+        hub.joinall(threads)
+        rep = event.EventReadReply(None, results)
+        self.reply_to_request(ev, rep)
+
+    def _proxy_event(self, ev):
+        system_id = ev.system_id
+        client_name = client.RemoteOvsdb.instance_name(system_id)
+
+        if client_name not in self._clients:
+            self.logger.info('Unknown remote system_id %s' % system_id)
+            return
+
+        return self.send_event(client_name, ev)
+
+    def _start_remote(self, sock, client_address):
+        schema_tables = cfg.CONF.ovsdb.schema_tables
+        schema_ex_col = {}
+        if cfg.CONF.ovsdb.schema_exclude_columns:
+            for c in cfg.CONF.ovsdb.schema_exclude_columns:
+                tbl, col = c.split('.')
+                if tbl in schema_ex_col:
+                    schema_ex_col[tbl].append(col)
+                else:
+                    schema_ex_col[tbl] = [col]
+
+        app = client.RemoteOvsdb.factory(sock, client_address,
+                                         probe_interval=self._probe_interval,
+                                         min_backoff=self._min_backoff,
+                                         max_backoff=self._max_backoff,
+                                         schema_tables=schema_tables,
+                                         schema_exclude_columns=schema_ex_col)
+
+        if app:
+            self._clients[app.name] = app
+            app.start()
+            ev = event.EventNewOVSDBConnection(app)
+            self.send_event_to_observers(ev)
+
+        else:
+            try:
+                sock.shutdown(socket.SHUT_RDWR)
+            except:
+                pass
+
+            sock.close()
+
+    def start(self):
+        if ip.valid_ipv6(self._address):
+            server = hub.listen(
+                (self._address, self._port), family=socket.AF_INET6)
+        else:
+            server = hub.listen((self._address, self._port))
+        key = self.CONF.ovsdb.mngr_privkey or self.CONF.ctl_privkey
+        cert = self.CONF.ovsdb.mngr_cert or self.CONF.ctl_cert
+
+        if key is not None and cert is not None:
+            ssl_kwargs = dict(keyfile=key, certfile=cert, server_side=True)
+
+            if self.CONF.ca_certs is not None:
+                ssl_kwargs['cert_reqs'] = ssl.CERT_REQUIRED
+                ssl_kwargs['ca_certs'] = self.CONF.ca_certs
+
+            server = ssl.wrap_socket(server, **ssl_kwargs)
+
+        self._server = server
+
+        if ip.valid_ipv6(self._address):
+            self.logger.info(
+                'Listening on [%s]:%s for clients', self._address, self._port)
+        else:
+            self.logger.info(
+                'Listening on %s:%s for clients', self._address, self._port)
+        t = hub.spawn(self._accept, self._server)
+        super(OVSDB, self).start()
+        return t
+
+    def stop(self):
+        # NOTE(jkoelker) Attempt to gracefully stop the accept loop
+        self.is_active = False
+
+        # NOTE(jkoelker) Forceably kill the loop and clear the main_thread
+        if self.main_thread:
+            hub.kill(self.main_thread)
+            self.main_thread = None
+
+        # NOTE(jkoelker) Stop all the clients
+        for c in self._clients.values():
+            c.stop()
+
+        # NOTE(jkoelker) super will only take care of the event and joining now
+        super(OVSDB, self).stop()
+
+    @handler.set_ev_cls(event.EventModifyRequest)
+    def modify_request_handler(self, ev):
+
+        system_id = ev.system_id
+        client_name = client.RemoteOvsdb.instance_name(system_id)
+        remote = self._clients.get(client_name)
+
+        if not remote:
+            msg = 'Unknown remote system_id %s' % system_id
+            self.logger.info(msg)
+            rep = event.EventModifyReply(system_id, None, None, msg)
+            return self.reply_to_request(ev, rep)
+
+        return remote.modify_request_handler(ev)
+
+    @handler.set_ev_cls(event.EventReadRequest)
+    def read_request_handler(self, ev):
+        system_id = ev.system_id
+
+        if system_id is None:
+            def done(gt, *args, **kwargs):
+                if gt in self.threads:
+                    self.threads.remove(gt)
+
+            thread = hub.spawn(self._bulk_read_handler, ev)
+            self.threads.append(thread)
+            return thread.link(done)
+
+        client_name = client.RemoteOvsdb.instance_name(system_id)
+        remote = self._clients.get(client_name)
+
+        if not remote:
+            msg = 'Unknown remote system_id %s' % system_id
+            self.logger.info(msg)
+            rep = event.EventReadReply(system_id, None, msg)
+            return self.reply_to_request(ev, rep)
+
+        return remote.read_request_handler(ev)