backing up
[vsorcdistro/.git] / ryu / build / lib.linux-armv7l-2.7 / ryu / controller / controller.py
1 # Copyright (C) 2011, 2012 Nippon Telegraph and Telephone Corporation.
2 # Copyright (C) 2011, 2012 Isaku Yamahata <yamahata at valinux co jp>
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #    http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16
17 """
18 The main component of OpenFlow controller.
19
20 - Handle connections from switches
21 - Generate and route events to appropriate entities like Ryu applications
22
23 """
24
25 import contextlib
26 import logging
27 import random
28 from socket import IPPROTO_TCP
29 from socket import TCP_NODELAY
30 from socket import SHUT_WR
31 from socket import timeout as SocketTimeout
32 import ssl
33
34 from ryu import cfg
35 from ryu.lib import hub
36 from ryu.lib.hub import StreamServer
37
38 import ryu.base.app_manager
39
40 from ryu.ofproto import ofproto_common
41 from ryu.ofproto import ofproto_parser
42 from ryu.ofproto import ofproto_protocol
43 from ryu.ofproto import ofproto_v1_0
44 from ryu.ofproto import nx_match
45
46 from ryu.controller import ofp_event
47 from ryu.controller.handler import HANDSHAKE_DISPATCHER, DEAD_DISPATCHER
48
49 from ryu.lib.dpid import dpid_to_str
50 from ryu.lib import ip
51
52 LOG = logging.getLogger('ryu.controller.controller')
53
54 DEFAULT_OFP_HOST = '0.0.0.0'
55 DEFAULT_OFP_SW_CON_INTERVAL = 1
56
57 CONF = cfg.CONF
58 CONF.register_cli_opts([
59     cfg.StrOpt('ofp-listen-host', default=DEFAULT_OFP_HOST,
60                help='openflow listen host (default %s)' % DEFAULT_OFP_HOST),
61     cfg.IntOpt('ofp-tcp-listen-port', default=None,
62                help='openflow tcp listen port '
63                     '(default: %d)' % ofproto_common.OFP_TCP_PORT),
64     cfg.IntOpt('ofp-ssl-listen-port', default=None,
65                help='openflow ssl listen port '
66                     '(default: %d)' % ofproto_common.OFP_SSL_PORT),
67     cfg.StrOpt('ctl-privkey', default=None, help='controller private key'),
68     cfg.StrOpt('ctl-cert', default=None, help='controller certificate'),
69     cfg.StrOpt('ca-certs', default=None, help='CA certificates'),
70     cfg.ListOpt('ofp-switch-address-list', item_type=str, default=[],
71                 help='list of IP address and port pairs (default empty). '
72                      'e.g., "127.0.0.1:6653,[::1]:6653"'),
73     cfg.IntOpt('ofp-switch-connect-interval',
74                default=DEFAULT_OFP_SW_CON_INTERVAL,
75                help='interval in seconds to connect to switches '
76                     '(default %d)' % DEFAULT_OFP_SW_CON_INTERVAL),
77 ])
78 CONF.register_opts([
79     cfg.FloatOpt('socket-timeout',
80                  default=5.0,
81                  help='Time, in seconds, to await completion of socket operations.'),
82     cfg.FloatOpt('echo-request-interval',
83                  default=15.0,
84                  help='Time, in seconds, between sending echo requests to a datapath.'),
85     cfg.IntOpt('maximum-unreplied-echo-requests',
86                default=0,
87                min=0,
88                help='Maximum number of unreplied echo requests before datapath is disconnected.')
89 ])
90
91
92 def _split_addr(addr):
93     """
94     Splits a str of IP address and port pair into (host, port).
95
96     Example::
97
98         >>> _split_addr('127.0.0.1:6653')
99         ('127.0.0.1', 6653)
100         >>> _split_addr('[::1]:6653')
101         ('::1', 6653)
102
103     Raises ValueError if invalid format.
104
105     :param addr: A pair of IP address and port.
106     :return: IP address and port
107     """
108     e = ValueError('Invalid IP address and port pair: "%s"' % addr)
109     pair = addr.rsplit(':', 1)
110     if len(pair) != 2:
111         raise e
112
113     addr, port = pair
114     if addr.startswith('[') and addr.endswith(']'):
115         addr = addr.lstrip('[').rstrip(']')
116         if not ip.valid_ipv6(addr):
117             raise e
118     elif not ip.valid_ipv4(addr):
119         raise e
120
121     return addr, int(port, 0)
122
123
124 class OpenFlowController(object):
125     def __init__(self):
126         super(OpenFlowController, self).__init__()
127         if not CONF.ofp_tcp_listen_port and not CONF.ofp_ssl_listen_port:
128             self.ofp_tcp_listen_port = ofproto_common.OFP_TCP_PORT
129             self.ofp_ssl_listen_port = ofproto_common.OFP_SSL_PORT
130             # For the backward compatibility, we spawn a server loop
131             # listening on the old OpenFlow listen port 6633.
132             hub.spawn(self.server_loop,
133                       ofproto_common.OFP_TCP_PORT_OLD,
134                       ofproto_common.OFP_SSL_PORT_OLD)
135         else:
136             self.ofp_tcp_listen_port = CONF.ofp_tcp_listen_port
137             self.ofp_ssl_listen_port = CONF.ofp_ssl_listen_port
138
139         # Example:
140         # self._clients = {
141         #     ('127.0.0.1', 6653): <instance of StreamClient>,
142         # }
143         self._clients = {}
144
145     # entry point
146     def __call__(self):
147         # LOG.debug('call')
148         for address in CONF.ofp_switch_address_list:
149             addr = tuple(_split_addr(address))
150             self.spawn_client_loop(addr)
151
152         self.server_loop(self.ofp_tcp_listen_port,
153                          self.ofp_ssl_listen_port)
154
155     def spawn_client_loop(self, addr, interval=None):
156         interval = interval or CONF.ofp_switch_connect_interval
157         client = hub.StreamClient(addr)
158         hub.spawn(client.connect_loop, datapath_connection_factory, interval)
159         self._clients[addr] = client
160
161     def stop_client_loop(self, addr):
162         client = self._clients.get(addr, None)
163         if client is not None:
164             client.stop()
165
166     def server_loop(self, ofp_tcp_listen_port, ofp_ssl_listen_port):
167         if CONF.ctl_privkey is not None and CONF.ctl_cert is not None:
168             if not hasattr(ssl, 'SSLContext'):
169                 # anything less than python 2.7.9 supports only TLSv1
170                 # or less, thus we choose TLSv1
171                 ssl_args = {'ssl_version': ssl.PROTOCOL_TLSv1}
172             else:
173                 # from 2.7.9 and versions 3.4+ ssl context creation is
174                 # supported. Protocol_TLS from 2.7.13 and from 3.5.3
175                 # replaced SSLv23. Functionality is similar.
176                 if hasattr(ssl, 'PROTOCOL_TLS'):
177                     p = 'PROTOCOL_TLS'
178                 else:
179                     p = 'PROTOCOL_SSLv23'
180
181                 ssl_args = {'ssl_ctx': ssl.SSLContext(getattr(ssl, p))}
182                 # Restrict non-safe versions
183                 ssl_args['ssl_ctx'].options |= ssl.OP_NO_SSLv3 | ssl.OP_NO_SSLv2
184
185             if CONF.ca_certs is not None:
186                 server = StreamServer((CONF.ofp_listen_host,
187                                        ofp_ssl_listen_port),
188                                       datapath_connection_factory,
189                                       keyfile=CONF.ctl_privkey,
190                                       certfile=CONF.ctl_cert,
191                                       cert_reqs=ssl.CERT_REQUIRED,
192                                       ca_certs=CONF.ca_certs, **ssl_args)
193             else:
194                 server = StreamServer((CONF.ofp_listen_host,
195                                        ofp_ssl_listen_port),
196                                       datapath_connection_factory,
197                                       keyfile=CONF.ctl_privkey,
198                                       certfile=CONF.ctl_cert, **ssl_args)
199         else:
200             server = StreamServer((CONF.ofp_listen_host,
201                                    ofp_tcp_listen_port),
202                                   datapath_connection_factory)
203
204         # LOG.debug('loop')
205         server.serve_forever()
206
207
208 def _deactivate(method):
209     def deactivate(self):
210         try:
211             method(self)
212         finally:
213             try:
214                 self.socket.close()
215             except IOError:
216                 pass
217
218     return deactivate
219
220
221 class Datapath(ofproto_protocol.ProtocolDesc):
222     """
223     A class to describe an OpenFlow switch connected to this controller.
224
225     An instance has the following attributes.
226
227     .. tabularcolumns:: |l|L|
228
229     ==================================== ======================================
230     Attribute                            Description
231     ==================================== ======================================
232     id                                   64-bit OpenFlow Datapath ID.
233                                          Only available for
234                                          ryu.controller.handler.MAIN_DISPATCHER
235                                          phase.
236     ofproto                              A module which exports OpenFlow
237                                          definitions, mainly constants appeared
238                                          in the specification, for the
239                                          negotiated OpenFlow version.  For
240                                          example, ryu.ofproto.ofproto_v1_0 for
241                                          OpenFlow 1.0.
242     ofproto_parser                       A module which exports OpenFlow wire
243                                          message encoder and decoder for the
244                                          negotiated OpenFlow version.
245                                          For example,
246                                          ryu.ofproto.ofproto_v1_0_parser
247                                          for OpenFlow 1.0.
248     ofproto_parser.OFPxxxx(datapath,...) A callable to prepare an OpenFlow
249                                          message for the given switch.  It can
250                                          be sent with Datapath.send_msg later.
251                                          xxxx is a name of the message.  For
252                                          example OFPFlowMod for flow-mod
253                                          message.  Arguemnts depend on the
254                                          message.
255     set_xid(self, msg)                   Generate an OpenFlow XID and put it
256                                          in msg.xid.
257     send_msg(self, msg)                  Queue an OpenFlow message to send to
258                                          the corresponding switch.  If msg.xid
259                                          is None, set_xid is automatically
260                                          called on the message before queueing.
261     send_packet_out                      deprecated
262     send_flow_mod                        deprecated
263     send_flow_del                        deprecated
264     send_delete_all_flows                deprecated
265     send_barrier                         Queue an OpenFlow barrier message to
266                                          send to the switch.
267     send_nxt_set_flow_format             deprecated
268     is_reserved_port                     deprecated
269     ==================================== ======================================
270     """
271
272     def __init__(self, socket, address):
273         super(Datapath, self).__init__()
274
275         self.socket = socket
276         self.socket.setsockopt(IPPROTO_TCP, TCP_NODELAY, 1)
277         self.socket.settimeout(CONF.socket_timeout)
278         self.address = address
279         self.is_active = True
280
281         # The limit is arbitrary. We need to limit queue size to
282         # prevent it from eating memory up.
283         self.send_q = hub.Queue(16)
284         self._send_q_sem = hub.BoundedSemaphore(self.send_q.maxsize)
285
286         self.echo_request_interval = CONF.echo_request_interval
287         self.max_unreplied_echo_requests = CONF.maximum_unreplied_echo_requests
288         self.unreplied_echo_requests = []
289
290         self.xid = random.randint(0, self.ofproto.MAX_XID)
291         self.id = None  # datapath_id is unknown yet
292         self._ports = None
293         self.flow_format = ofproto_v1_0.NXFF_OPENFLOW10
294         self.ofp_brick = ryu.base.app_manager.lookup_service_brick('ofp_event')
295         self.state = None  # for pylint
296         self.set_state(HANDSHAKE_DISPATCHER)
297
298     def _close_write(self):
299         # Note: Close only further sends in order to wait for the switch to
300         # disconnect this connection.
301         try:
302             self.socket.shutdown(SHUT_WR)
303         except (EOFError, IOError):
304             pass
305
306     def close(self):
307         self.set_state(DEAD_DISPATCHER)
308         self._close_write()
309
310     def set_state(self, state):
311         if self.state == state:
312             return
313         self.state = state
314         ev = ofp_event.EventOFPStateChange(self)
315         ev.state = state
316         self.ofp_brick.send_event_to_observers(ev, state)
317
318     # Low level socket handling layer
319     @_deactivate
320     def _recv_loop(self):
321         buf = bytearray()
322         count = 0
323         min_read_len = remaining_read_len = ofproto_common.OFP_HEADER_SIZE
324
325         while self.state != DEAD_DISPATCHER:
326             try:
327                 read_len = min_read_len
328                 if remaining_read_len > min_read_len:
329                     read_len = remaining_read_len
330                 ret = self.socket.recv(read_len)
331             except SocketTimeout:
332                 continue
333             except ssl.SSLError:
334                 # eventlet throws SSLError (which is a subclass of IOError)
335                 # on SSL socket read timeout; re-try the loop in this case.
336                 continue
337             except (EOFError, IOError):
338                 break
339
340             if not ret:
341                 break
342
343             buf += ret
344             buf_len = len(buf)
345             while buf_len >= min_read_len:
346                 (version, msg_type, msg_len, xid) = ofproto_parser.header(buf)
347                 if msg_len < min_read_len:
348                     # Someone isn't playing nicely; log it, and try something sane.
349                     LOG.debug("Message with invalid length %s received from switch at address %s",
350                               msg_len, self.address)
351                     msg_len = min_read_len
352                 if buf_len < msg_len:
353                     remaining_read_len = (msg_len - buf_len)
354                     break
355
356                 msg = ofproto_parser.msg(
357                     self, version, msg_type, msg_len, xid, buf[:msg_len])
358                 # LOG.debug('queue msg %s cls %s', msg, msg.__class__)
359                 if msg:
360                     ev = ofp_event.ofp_msg_to_ev(msg)
361                     self.ofp_brick.send_event_to_observers(ev, self.state)
362
363                     def dispatchers(x):
364                         return x.callers[ev.__class__].dispatchers
365
366                     handlers = [handler for handler in
367                                 self.ofp_brick.get_handlers(ev) if
368                                 self.state in dispatchers(handler)]
369                     for handler in handlers:
370                         handler(ev)
371
372                 buf = buf[msg_len:]
373                 buf_len = len(buf)
374                 remaining_read_len = min_read_len
375
376                 # We need to schedule other greenlets. Otherwise, ryu
377                 # can't accept new switches or handle the existing
378                 # switches. The limit is arbitrary. We need the better
379                 # approach in the future.
380                 count += 1
381                 if count > 2048:
382                     count = 0
383                     hub.sleep(0)
384
385     def _send_loop(self):
386         try:
387             while self.state != DEAD_DISPATCHER:
388                 buf, close_socket = self.send_q.get()
389                 self._send_q_sem.release()
390                 self.socket.sendall(buf)
391                 if close_socket:
392                     break
393         except SocketTimeout:
394             LOG.debug("Socket timed out while sending data to switch at address %s",
395                       self.address)
396         except IOError as ioe:
397             # Convert ioe.errno to a string, just in case it was somehow set to None.
398             errno = "%s" % ioe.errno
399             LOG.debug("Socket error while sending data to switch at address %s: [%s] %s",
400                       self.address, errno, ioe.strerror)
401         finally:
402             q = self.send_q
403             # First, clear self.send_q to prevent new references.
404             self.send_q = None
405             # Now, drain the send_q, releasing the associated semaphore for each entry.
406             # This should release all threads waiting to acquire the semaphore.
407             try:
408                 while q.get(block=False):
409                     self._send_q_sem.release()
410             except hub.QueueEmpty:
411                 pass
412             # Finally, disallow further sends.
413             self._close_write()
414
415     def send(self, buf, close_socket=False):
416         msg_enqueued = False
417         self._send_q_sem.acquire()
418         if self.send_q:
419             self.send_q.put((buf, close_socket))
420             msg_enqueued = True
421         else:
422             self._send_q_sem.release()
423         if not msg_enqueued:
424             LOG.debug('Datapath in process of terminating; send() to %s discarded.',
425                       self.address)
426         return msg_enqueued
427
428     def set_xid(self, msg):
429         self.xid += 1
430         self.xid &= self.ofproto.MAX_XID
431         msg.set_xid(self.xid)
432         return self.xid
433
434     def send_msg(self, msg, close_socket=False):
435         assert isinstance(msg, self.ofproto_parser.MsgBase)
436         if msg.xid is None:
437             self.set_xid(msg)
438         msg.serialize()
439         # LOG.debug('send_msg %s', msg)
440         return self.send(msg.buf, close_socket=close_socket)
441
442     def _echo_request_loop(self):
443         if not self.max_unreplied_echo_requests:
444             return
445         while (self.send_q and
446                (len(self.unreplied_echo_requests) <= self.max_unreplied_echo_requests)):
447             echo_req = self.ofproto_parser.OFPEchoRequest(self)
448             self.unreplied_echo_requests.append(self.set_xid(echo_req))
449             self.send_msg(echo_req)
450             hub.sleep(self.echo_request_interval)
451         self.close()
452
453     def acknowledge_echo_reply(self, xid):
454         try:
455             self.unreplied_echo_requests.remove(xid)
456         except ValueError:
457             pass
458
459     def serve(self):
460         send_thr = hub.spawn(self._send_loop)
461
462         # send hello message immediately
463         hello = self.ofproto_parser.OFPHello(self)
464         self.send_msg(hello)
465
466         echo_thr = hub.spawn(self._echo_request_loop)
467
468         try:
469             self._recv_loop()
470         finally:
471             hub.kill(send_thr)
472             hub.kill(echo_thr)
473             hub.joinall([send_thr, echo_thr])
474             self.is_active = False
475
476     #
477     # Utility methods for convenience
478     #
479     def send_packet_out(self, buffer_id=0xffffffff, in_port=None,
480                         actions=None, data=None):
481         if in_port is None:
482             in_port = self.ofproto.OFPP_NONE
483         packet_out = self.ofproto_parser.OFPPacketOut(
484             self, buffer_id, in_port, actions, data)
485         self.send_msg(packet_out)
486
487     def send_flow_mod(self, rule, cookie, command, idle_timeout, hard_timeout,
488                       priority=None, buffer_id=0xffffffff,
489                       out_port=None, flags=0, actions=None):
490         if priority is None:
491             priority = self.ofproto.OFP_DEFAULT_PRIORITY
492         if out_port is None:
493             out_port = self.ofproto.OFPP_NONE
494         flow_format = rule.flow_format()
495         assert (flow_format == ofproto_v1_0.NXFF_OPENFLOW10 or
496                 flow_format == ofproto_v1_0.NXFF_NXM)
497         if self.flow_format < flow_format:
498             self.send_nxt_set_flow_format(flow_format)
499         if flow_format == ofproto_v1_0.NXFF_OPENFLOW10:
500             match_tuple = rule.match_tuple()
501             match = self.ofproto_parser.OFPMatch(*match_tuple)
502             flow_mod = self.ofproto_parser.OFPFlowMod(
503                 self, match, cookie, command, idle_timeout, hard_timeout,
504                 priority, buffer_id, out_port, flags, actions)
505         else:
506             flow_mod = self.ofproto_parser.NXTFlowMod(
507                 self, cookie, command, idle_timeout, hard_timeout,
508                 priority, buffer_id, out_port, flags, rule, actions)
509         self.send_msg(flow_mod)
510
511     def send_flow_del(self, rule, cookie, out_port=None):
512         self.send_flow_mod(rule=rule, cookie=cookie,
513                            command=self.ofproto.OFPFC_DELETE,
514                            idle_timeout=0, hard_timeout=0, priority=0,
515                            out_port=out_port)
516
517     def send_delete_all_flows(self):
518         rule = nx_match.ClsRule()
519         self.send_flow_mod(
520             rule=rule, cookie=0, command=self.ofproto.OFPFC_DELETE,
521             idle_timeout=0, hard_timeout=0, priority=0, buffer_id=0,
522             out_port=self.ofproto.OFPP_NONE, flags=0, actions=None)
523
524     def send_barrier(self):
525         barrier_request = self.ofproto_parser.OFPBarrierRequest(self)
526         return self.send_msg(barrier_request)
527
528     def send_nxt_set_flow_format(self, flow_format):
529         assert (flow_format == ofproto_v1_0.NXFF_OPENFLOW10 or
530                 flow_format == ofproto_v1_0.NXFF_NXM)
531         if self.flow_format == flow_format:
532             # Nothing to do
533             return
534         self.flow_format = flow_format
535         set_format = self.ofproto_parser.NXTSetFlowFormat(self, flow_format)
536         # FIXME: If NXT_SET_FLOW_FORMAT or NXFF_NXM is not supported by
537         # the switch then an error message will be received. It may be
538         # handled by setting self.flow_format to
539         # ofproto_v1_0.NXFF_OPENFLOW10 but currently isn't.
540         self.send_msg(set_format)
541         self.send_barrier()
542
543     def is_reserved_port(self, port_no):
544         return port_no > self.ofproto.OFPP_MAX
545
546
547 def datapath_connection_factory(socket, address):
548     LOG.debug('connected socket:%s address:%s', socket, address)
549     with contextlib.closing(Datapath(socket, address)) as datapath:
550         try:
551             datapath.serve()
552         except:
553             # Something went wrong.
554             # Especially malicious switch can send malformed packet,
555             # the parser raise exception.
556             # Can we do anything more graceful?
557             if datapath.id is None:
558                 dpid_str = "%s" % datapath.id
559             else:
560                 dpid_str = dpid_to_str(datapath.id)
561             LOG.error("Error in the datapath %s from %s", dpid_str, address)
562             raise