backing up
[vsorcdistro/.git] / ryu / build / lib.linux-armv7l-2.7 / ryu / controller / controller.py
diff --git a/ryu/build/lib.linux-armv7l-2.7/ryu/controller/controller.py b/ryu/build/lib.linux-armv7l-2.7/ryu/controller/controller.py
new file mode 100644 (file)
index 0000000..b3d2d35
--- /dev/null
@@ -0,0 +1,562 @@
+# Copyright (C) 2011, 2012 Nippon Telegraph and Telephone Corporation.
+# Copyright (C) 2011, 2012 Isaku Yamahata <yamahata at valinux co jp>
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+The main component of OpenFlow controller.
+
+- Handle connections from switches
+- Generate and route events to appropriate entities like Ryu applications
+
+"""
+
+import contextlib
+import logging
+import random
+from socket import IPPROTO_TCP
+from socket import TCP_NODELAY
+from socket import SHUT_WR
+from socket import timeout as SocketTimeout
+import ssl
+
+from ryu import cfg
+from ryu.lib import hub
+from ryu.lib.hub import StreamServer
+
+import ryu.base.app_manager
+
+from ryu.ofproto import ofproto_common
+from ryu.ofproto import ofproto_parser
+from ryu.ofproto import ofproto_protocol
+from ryu.ofproto import ofproto_v1_0
+from ryu.ofproto import nx_match
+
+from ryu.controller import ofp_event
+from ryu.controller.handler import HANDSHAKE_DISPATCHER, DEAD_DISPATCHER
+
+from ryu.lib.dpid import dpid_to_str
+from ryu.lib import ip
+
+LOG = logging.getLogger('ryu.controller.controller')
+
+DEFAULT_OFP_HOST = '0.0.0.0'
+DEFAULT_OFP_SW_CON_INTERVAL = 1
+
+CONF = cfg.CONF
+CONF.register_cli_opts([
+    cfg.StrOpt('ofp-listen-host', default=DEFAULT_OFP_HOST,
+               help='openflow listen host (default %s)' % DEFAULT_OFP_HOST),
+    cfg.IntOpt('ofp-tcp-listen-port', default=None,
+               help='openflow tcp listen port '
+                    '(default: %d)' % ofproto_common.OFP_TCP_PORT),
+    cfg.IntOpt('ofp-ssl-listen-port', default=None,
+               help='openflow ssl listen port '
+                    '(default: %d)' % ofproto_common.OFP_SSL_PORT),
+    cfg.StrOpt('ctl-privkey', default=None, help='controller private key'),
+    cfg.StrOpt('ctl-cert', default=None, help='controller certificate'),
+    cfg.StrOpt('ca-certs', default=None, help='CA certificates'),
+    cfg.ListOpt('ofp-switch-address-list', item_type=str, default=[],
+                help='list of IP address and port pairs (default empty). '
+                     'e.g., "127.0.0.1:6653,[::1]:6653"'),
+    cfg.IntOpt('ofp-switch-connect-interval',
+               default=DEFAULT_OFP_SW_CON_INTERVAL,
+               help='interval in seconds to connect to switches '
+                    '(default %d)' % DEFAULT_OFP_SW_CON_INTERVAL),
+])
+CONF.register_opts([
+    cfg.FloatOpt('socket-timeout',
+                 default=5.0,
+                 help='Time, in seconds, to await completion of socket operations.'),
+    cfg.FloatOpt('echo-request-interval',
+                 default=15.0,
+                 help='Time, in seconds, between sending echo requests to a datapath.'),
+    cfg.IntOpt('maximum-unreplied-echo-requests',
+               default=0,
+               min=0,
+               help='Maximum number of unreplied echo requests before datapath is disconnected.')
+])
+
+
+def _split_addr(addr):
+    """
+    Splits a str of IP address and port pair into (host, port).
+
+    Example::
+
+        >>> _split_addr('127.0.0.1:6653')
+        ('127.0.0.1', 6653)
+        >>> _split_addr('[::1]:6653')
+        ('::1', 6653)
+
+    Raises ValueError if invalid format.
+
+    :param addr: A pair of IP address and port.
+    :return: IP address and port
+    """
+    e = ValueError('Invalid IP address and port pair: "%s"' % addr)
+    pair = addr.rsplit(':', 1)
+    if len(pair) != 2:
+        raise e
+
+    addr, port = pair
+    if addr.startswith('[') and addr.endswith(']'):
+        addr = addr.lstrip('[').rstrip(']')
+        if not ip.valid_ipv6(addr):
+            raise e
+    elif not ip.valid_ipv4(addr):
+        raise e
+
+    return addr, int(port, 0)
+
+
+class OpenFlowController(object):
+    def __init__(self):
+        super(OpenFlowController, self).__init__()
+        if not CONF.ofp_tcp_listen_port and not CONF.ofp_ssl_listen_port:
+            self.ofp_tcp_listen_port = ofproto_common.OFP_TCP_PORT
+            self.ofp_ssl_listen_port = ofproto_common.OFP_SSL_PORT
+            # For the backward compatibility, we spawn a server loop
+            # listening on the old OpenFlow listen port 6633.
+            hub.spawn(self.server_loop,
+                      ofproto_common.OFP_TCP_PORT_OLD,
+                      ofproto_common.OFP_SSL_PORT_OLD)
+        else:
+            self.ofp_tcp_listen_port = CONF.ofp_tcp_listen_port
+            self.ofp_ssl_listen_port = CONF.ofp_ssl_listen_port
+
+        # Example:
+        # self._clients = {
+        #     ('127.0.0.1', 6653): <instance of StreamClient>,
+        # }
+        self._clients = {}
+
+    # entry point
+    def __call__(self):
+        # LOG.debug('call')
+        for address in CONF.ofp_switch_address_list:
+            addr = tuple(_split_addr(address))
+            self.spawn_client_loop(addr)
+
+        self.server_loop(self.ofp_tcp_listen_port,
+                         self.ofp_ssl_listen_port)
+
+    def spawn_client_loop(self, addr, interval=None):
+        interval = interval or CONF.ofp_switch_connect_interval
+        client = hub.StreamClient(addr)
+        hub.spawn(client.connect_loop, datapath_connection_factory, interval)
+        self._clients[addr] = client
+
+    def stop_client_loop(self, addr):
+        client = self._clients.get(addr, None)
+        if client is not None:
+            client.stop()
+
+    def server_loop(self, ofp_tcp_listen_port, ofp_ssl_listen_port):
+        if CONF.ctl_privkey is not None and CONF.ctl_cert is not None:
+            if not hasattr(ssl, 'SSLContext'):
+                # anything less than python 2.7.9 supports only TLSv1
+                # or less, thus we choose TLSv1
+                ssl_args = {'ssl_version': ssl.PROTOCOL_TLSv1}
+            else:
+                # from 2.7.9 and versions 3.4+ ssl context creation is
+                # supported. Protocol_TLS from 2.7.13 and from 3.5.3
+                # replaced SSLv23. Functionality is similar.
+                if hasattr(ssl, 'PROTOCOL_TLS'):
+                    p = 'PROTOCOL_TLS'
+                else:
+                    p = 'PROTOCOL_SSLv23'
+
+                ssl_args = {'ssl_ctx': ssl.SSLContext(getattr(ssl, p))}
+                # Restrict non-safe versions
+                ssl_args['ssl_ctx'].options |= ssl.OP_NO_SSLv3 | ssl.OP_NO_SSLv2
+
+            if CONF.ca_certs is not None:
+                server = StreamServer((CONF.ofp_listen_host,
+                                       ofp_ssl_listen_port),
+                                      datapath_connection_factory,
+                                      keyfile=CONF.ctl_privkey,
+                                      certfile=CONF.ctl_cert,
+                                      cert_reqs=ssl.CERT_REQUIRED,
+                                      ca_certs=CONF.ca_certs, **ssl_args)
+            else:
+                server = StreamServer((CONF.ofp_listen_host,
+                                       ofp_ssl_listen_port),
+                                      datapath_connection_factory,
+                                      keyfile=CONF.ctl_privkey,
+                                      certfile=CONF.ctl_cert, **ssl_args)
+        else:
+            server = StreamServer((CONF.ofp_listen_host,
+                                   ofp_tcp_listen_port),
+                                  datapath_connection_factory)
+
+        # LOG.debug('loop')
+        server.serve_forever()
+
+
+def _deactivate(method):
+    def deactivate(self):
+        try:
+            method(self)
+        finally:
+            try:
+                self.socket.close()
+            except IOError:
+                pass
+
+    return deactivate
+
+
+class Datapath(ofproto_protocol.ProtocolDesc):
+    """
+    A class to describe an OpenFlow switch connected to this controller.
+
+    An instance has the following attributes.
+
+    .. tabularcolumns:: |l|L|
+
+    ==================================== ======================================
+    Attribute                            Description
+    ==================================== ======================================
+    id                                   64-bit OpenFlow Datapath ID.
+                                         Only available for
+                                         ryu.controller.handler.MAIN_DISPATCHER
+                                         phase.
+    ofproto                              A module which exports OpenFlow
+                                         definitions, mainly constants appeared
+                                         in the specification, for the
+                                         negotiated OpenFlow version.  For
+                                         example, ryu.ofproto.ofproto_v1_0 for
+                                         OpenFlow 1.0.
+    ofproto_parser                       A module which exports OpenFlow wire
+                                         message encoder and decoder for the
+                                         negotiated OpenFlow version.
+                                         For example,
+                                         ryu.ofproto.ofproto_v1_0_parser
+                                         for OpenFlow 1.0.
+    ofproto_parser.OFPxxxx(datapath,...) A callable to prepare an OpenFlow
+                                         message for the given switch.  It can
+                                         be sent with Datapath.send_msg later.
+                                         xxxx is a name of the message.  For
+                                         example OFPFlowMod for flow-mod
+                                         message.  Arguemnts depend on the
+                                         message.
+    set_xid(self, msg)                   Generate an OpenFlow XID and put it
+                                         in msg.xid.
+    send_msg(self, msg)                  Queue an OpenFlow message to send to
+                                         the corresponding switch.  If msg.xid
+                                         is None, set_xid is automatically
+                                         called on the message before queueing.
+    send_packet_out                      deprecated
+    send_flow_mod                        deprecated
+    send_flow_del                        deprecated
+    send_delete_all_flows                deprecated
+    send_barrier                         Queue an OpenFlow barrier message to
+                                         send to the switch.
+    send_nxt_set_flow_format             deprecated
+    is_reserved_port                     deprecated
+    ==================================== ======================================
+    """
+
+    def __init__(self, socket, address):
+        super(Datapath, self).__init__()
+
+        self.socket = socket
+        self.socket.setsockopt(IPPROTO_TCP, TCP_NODELAY, 1)
+        self.socket.settimeout(CONF.socket_timeout)
+        self.address = address
+        self.is_active = True
+
+        # The limit is arbitrary. We need to limit queue size to
+        # prevent it from eating memory up.
+        self.send_q = hub.Queue(16)
+        self._send_q_sem = hub.BoundedSemaphore(self.send_q.maxsize)
+
+        self.echo_request_interval = CONF.echo_request_interval
+        self.max_unreplied_echo_requests = CONF.maximum_unreplied_echo_requests
+        self.unreplied_echo_requests = []
+
+        self.xid = random.randint(0, self.ofproto.MAX_XID)
+        self.id = None  # datapath_id is unknown yet
+        self._ports = None
+        self.flow_format = ofproto_v1_0.NXFF_OPENFLOW10
+        self.ofp_brick = ryu.base.app_manager.lookup_service_brick('ofp_event')
+        self.state = None  # for pylint
+        self.set_state(HANDSHAKE_DISPATCHER)
+
+    def _close_write(self):
+        # Note: Close only further sends in order to wait for the switch to
+        # disconnect this connection.
+        try:
+            self.socket.shutdown(SHUT_WR)
+        except (EOFError, IOError):
+            pass
+
+    def close(self):
+        self.set_state(DEAD_DISPATCHER)
+        self._close_write()
+
+    def set_state(self, state):
+        if self.state == state:
+            return
+        self.state = state
+        ev = ofp_event.EventOFPStateChange(self)
+        ev.state = state
+        self.ofp_brick.send_event_to_observers(ev, state)
+
+    # Low level socket handling layer
+    @_deactivate
+    def _recv_loop(self):
+        buf = bytearray()
+        count = 0
+        min_read_len = remaining_read_len = ofproto_common.OFP_HEADER_SIZE
+
+        while self.state != DEAD_DISPATCHER:
+            try:
+                read_len = min_read_len
+                if remaining_read_len > min_read_len:
+                    read_len = remaining_read_len
+                ret = self.socket.recv(read_len)
+            except SocketTimeout:
+                continue
+            except ssl.SSLError:
+                # eventlet throws SSLError (which is a subclass of IOError)
+                # on SSL socket read timeout; re-try the loop in this case.
+                continue
+            except (EOFError, IOError):
+                break
+
+            if not ret:
+                break
+
+            buf += ret
+            buf_len = len(buf)
+            while buf_len >= min_read_len:
+                (version, msg_type, msg_len, xid) = ofproto_parser.header(buf)
+                if msg_len < min_read_len:
+                    # Someone isn't playing nicely; log it, and try something sane.
+                    LOG.debug("Message with invalid length %s received from switch at address %s",
+                              msg_len, self.address)
+                    msg_len = min_read_len
+                if buf_len < msg_len:
+                    remaining_read_len = (msg_len - buf_len)
+                    break
+
+                msg = ofproto_parser.msg(
+                    self, version, msg_type, msg_len, xid, buf[:msg_len])
+                # LOG.debug('queue msg %s cls %s', msg, msg.__class__)
+                if msg:
+                    ev = ofp_event.ofp_msg_to_ev(msg)
+                    self.ofp_brick.send_event_to_observers(ev, self.state)
+
+                    def dispatchers(x):
+                        return x.callers[ev.__class__].dispatchers
+
+                    handlers = [handler for handler in
+                                self.ofp_brick.get_handlers(ev) if
+                                self.state in dispatchers(handler)]
+                    for handler in handlers:
+                        handler(ev)
+
+                buf = buf[msg_len:]
+                buf_len = len(buf)
+                remaining_read_len = min_read_len
+
+                # We need to schedule other greenlets. Otherwise, ryu
+                # can't accept new switches or handle the existing
+                # switches. The limit is arbitrary. We need the better
+                # approach in the future.
+                count += 1
+                if count > 2048:
+                    count = 0
+                    hub.sleep(0)
+
+    def _send_loop(self):
+        try:
+            while self.state != DEAD_DISPATCHER:
+                buf, close_socket = self.send_q.get()
+                self._send_q_sem.release()
+                self.socket.sendall(buf)
+                if close_socket:
+                    break
+        except SocketTimeout:
+            LOG.debug("Socket timed out while sending data to switch at address %s",
+                      self.address)
+        except IOError as ioe:
+            # Convert ioe.errno to a string, just in case it was somehow set to None.
+            errno = "%s" % ioe.errno
+            LOG.debug("Socket error while sending data to switch at address %s: [%s] %s",
+                      self.address, errno, ioe.strerror)
+        finally:
+            q = self.send_q
+            # First, clear self.send_q to prevent new references.
+            self.send_q = None
+            # Now, drain the send_q, releasing the associated semaphore for each entry.
+            # This should release all threads waiting to acquire the semaphore.
+            try:
+                while q.get(block=False):
+                    self._send_q_sem.release()
+            except hub.QueueEmpty:
+                pass
+            # Finally, disallow further sends.
+            self._close_write()
+
+    def send(self, buf, close_socket=False):
+        msg_enqueued = False
+        self._send_q_sem.acquire()
+        if self.send_q:
+            self.send_q.put((buf, close_socket))
+            msg_enqueued = True
+        else:
+            self._send_q_sem.release()
+        if not msg_enqueued:
+            LOG.debug('Datapath in process of terminating; send() to %s discarded.',
+                      self.address)
+        return msg_enqueued
+
+    def set_xid(self, msg):
+        self.xid += 1
+        self.xid &= self.ofproto.MAX_XID
+        msg.set_xid(self.xid)
+        return self.xid
+
+    def send_msg(self, msg, close_socket=False):
+        assert isinstance(msg, self.ofproto_parser.MsgBase)
+        if msg.xid is None:
+            self.set_xid(msg)
+        msg.serialize()
+        # LOG.debug('send_msg %s', msg)
+        return self.send(msg.buf, close_socket=close_socket)
+
+    def _echo_request_loop(self):
+        if not self.max_unreplied_echo_requests:
+            return
+        while (self.send_q and
+               (len(self.unreplied_echo_requests) <= self.max_unreplied_echo_requests)):
+            echo_req = self.ofproto_parser.OFPEchoRequest(self)
+            self.unreplied_echo_requests.append(self.set_xid(echo_req))
+            self.send_msg(echo_req)
+            hub.sleep(self.echo_request_interval)
+        self.close()
+
+    def acknowledge_echo_reply(self, xid):
+        try:
+            self.unreplied_echo_requests.remove(xid)
+        except ValueError:
+            pass
+
+    def serve(self):
+        send_thr = hub.spawn(self._send_loop)
+
+        # send hello message immediately
+        hello = self.ofproto_parser.OFPHello(self)
+        self.send_msg(hello)
+
+        echo_thr = hub.spawn(self._echo_request_loop)
+
+        try:
+            self._recv_loop()
+        finally:
+            hub.kill(send_thr)
+            hub.kill(echo_thr)
+            hub.joinall([send_thr, echo_thr])
+            self.is_active = False
+
+    #
+    # Utility methods for convenience
+    #
+    def send_packet_out(self, buffer_id=0xffffffff, in_port=None,
+                        actions=None, data=None):
+        if in_port is None:
+            in_port = self.ofproto.OFPP_NONE
+        packet_out = self.ofproto_parser.OFPPacketOut(
+            self, buffer_id, in_port, actions, data)
+        self.send_msg(packet_out)
+
+    def send_flow_mod(self, rule, cookie, command, idle_timeout, hard_timeout,
+                      priority=None, buffer_id=0xffffffff,
+                      out_port=None, flags=0, actions=None):
+        if priority is None:
+            priority = self.ofproto.OFP_DEFAULT_PRIORITY
+        if out_port is None:
+            out_port = self.ofproto.OFPP_NONE
+        flow_format = rule.flow_format()
+        assert (flow_format == ofproto_v1_0.NXFF_OPENFLOW10 or
+                flow_format == ofproto_v1_0.NXFF_NXM)
+        if self.flow_format < flow_format:
+            self.send_nxt_set_flow_format(flow_format)
+        if flow_format == ofproto_v1_0.NXFF_OPENFLOW10:
+            match_tuple = rule.match_tuple()
+            match = self.ofproto_parser.OFPMatch(*match_tuple)
+            flow_mod = self.ofproto_parser.OFPFlowMod(
+                self, match, cookie, command, idle_timeout, hard_timeout,
+                priority, buffer_id, out_port, flags, actions)
+        else:
+            flow_mod = self.ofproto_parser.NXTFlowMod(
+                self, cookie, command, idle_timeout, hard_timeout,
+                priority, buffer_id, out_port, flags, rule, actions)
+        self.send_msg(flow_mod)
+
+    def send_flow_del(self, rule, cookie, out_port=None):
+        self.send_flow_mod(rule=rule, cookie=cookie,
+                           command=self.ofproto.OFPFC_DELETE,
+                           idle_timeout=0, hard_timeout=0, priority=0,
+                           out_port=out_port)
+
+    def send_delete_all_flows(self):
+        rule = nx_match.ClsRule()
+        self.send_flow_mod(
+            rule=rule, cookie=0, command=self.ofproto.OFPFC_DELETE,
+            idle_timeout=0, hard_timeout=0, priority=0, buffer_id=0,
+            out_port=self.ofproto.OFPP_NONE, flags=0, actions=None)
+
+    def send_barrier(self):
+        barrier_request = self.ofproto_parser.OFPBarrierRequest(self)
+        return self.send_msg(barrier_request)
+
+    def send_nxt_set_flow_format(self, flow_format):
+        assert (flow_format == ofproto_v1_0.NXFF_OPENFLOW10 or
+                flow_format == ofproto_v1_0.NXFF_NXM)
+        if self.flow_format == flow_format:
+            # Nothing to do
+            return
+        self.flow_format = flow_format
+        set_format = self.ofproto_parser.NXTSetFlowFormat(self, flow_format)
+        # FIXME: If NXT_SET_FLOW_FORMAT or NXFF_NXM is not supported by
+        # the switch then an error message will be received. It may be
+        # handled by setting self.flow_format to
+        # ofproto_v1_0.NXFF_OPENFLOW10 but currently isn't.
+        self.send_msg(set_format)
+        self.send_barrier()
+
+    def is_reserved_port(self, port_no):
+        return port_no > self.ofproto.OFPP_MAX
+
+
+def datapath_connection_factory(socket, address):
+    LOG.debug('connected socket:%s address:%s', socket, address)
+    with contextlib.closing(Datapath(socket, address)) as datapath:
+        try:
+            datapath.serve()
+        except:
+            # Something went wrong.
+            # Especially malicious switch can send malformed packet,
+            # the parser raise exception.
+            # Can we do anything more graceful?
+            if datapath.id is None:
+                dpid_str = "%s" % datapath.id
+            else:
+                dpid_str = dpid_to_str(datapath.id)
+            LOG.error("Error in the datapath %s from %s", dpid_str, address)
+            raise