1 # Copyright (C) 2011, 2012 Nippon Telegraph and Telephone Corporation.
2 # Copyright (C) 2011, 2012 Isaku Yamahata <yamahata at valinux co jp>
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
18 The main component of OpenFlow controller.
20 - Handle connections from switches
21 - Generate and route events to appropriate entities like Ryu applications
28 from socket import IPPROTO_TCP
29 from socket import TCP_NODELAY
30 from socket import SHUT_WR
31 from socket import timeout as SocketTimeout
35 from ryu.lib import hub
36 from ryu.lib.hub import StreamServer
38 import ryu.base.app_manager
40 from ryu.ofproto import ofproto_common
41 from ryu.ofproto import ofproto_parser
42 from ryu.ofproto import ofproto_protocol
43 from ryu.ofproto import ofproto_v1_0
44 from ryu.ofproto import nx_match
46 from ryu.controller import ofp_event
47 from ryu.controller.handler import HANDSHAKE_DISPATCHER, DEAD_DISPATCHER
49 from ryu.lib.dpid import dpid_to_str
50 from ryu.lib import ip
52 LOG = logging.getLogger('ryu.controller.controller')
54 DEFAULT_OFP_HOST = '0.0.0.0'
55 DEFAULT_OFP_SW_CON_INTERVAL = 1
58 CONF.register_cli_opts([
59 cfg.StrOpt('ofp-listen-host', default=DEFAULT_OFP_HOST,
60 help='openflow listen host (default %s)' % DEFAULT_OFP_HOST),
61 cfg.IntOpt('ofp-tcp-listen-port', default=None,
62 help='openflow tcp listen port '
63 '(default: %d)' % ofproto_common.OFP_TCP_PORT),
64 cfg.IntOpt('ofp-ssl-listen-port', default=None,
65 help='openflow ssl listen port '
66 '(default: %d)' % ofproto_common.OFP_SSL_PORT),
67 cfg.StrOpt('ctl-privkey', default=None, help='controller private key'),
68 cfg.StrOpt('ctl-cert', default=None, help='controller certificate'),
69 cfg.StrOpt('ca-certs', default=None, help='CA certificates'),
70 cfg.ListOpt('ofp-switch-address-list', item_type=str, default=[],
71 help='list of IP address and port pairs (default empty). '
72 'e.g., "127.0.0.1:6653,[::1]:6653"'),
73 cfg.IntOpt('ofp-switch-connect-interval',
74 default=DEFAULT_OFP_SW_CON_INTERVAL,
75 help='interval in seconds to connect to switches '
76 '(default %d)' % DEFAULT_OFP_SW_CON_INTERVAL),
79 cfg.FloatOpt('socket-timeout',
81 help='Time, in seconds, to await completion of socket operations.'),
82 cfg.FloatOpt('echo-request-interval',
84 help='Time, in seconds, between sending echo requests to a datapath.'),
85 cfg.IntOpt('maximum-unreplied-echo-requests',
88 help='Maximum number of unreplied echo requests before datapath is disconnected.')
92 def _split_addr(addr):
94 Splits a str of IP address and port pair into (host, port).
98 >>> _split_addr('127.0.0.1:6653')
100 >>> _split_addr('[::1]:6653')
103 Raises ValueError if invalid format.
105 :param addr: A pair of IP address and port.
106 :return: IP address and port
108 e = ValueError('Invalid IP address and port pair: "%s"' % addr)
109 pair = addr.rsplit(':', 1)
114 if addr.startswith('[') and addr.endswith(']'):
115 addr = addr.lstrip('[').rstrip(']')
116 if not ip.valid_ipv6(addr):
118 elif not ip.valid_ipv4(addr):
121 return addr, int(port, 0)
124 class OpenFlowController(object):
126 super(OpenFlowController, self).__init__()
127 if not CONF.ofp_tcp_listen_port and not CONF.ofp_ssl_listen_port:
128 self.ofp_tcp_listen_port = ofproto_common.OFP_TCP_PORT
129 self.ofp_ssl_listen_port = ofproto_common.OFP_SSL_PORT
130 # For the backward compatibility, we spawn a server loop
131 # listening on the old OpenFlow listen port 6633.
132 hub.spawn(self.server_loop,
133 ofproto_common.OFP_TCP_PORT_OLD,
134 ofproto_common.OFP_SSL_PORT_OLD)
136 self.ofp_tcp_listen_port = CONF.ofp_tcp_listen_port
137 self.ofp_ssl_listen_port = CONF.ofp_ssl_listen_port
141 # ('127.0.0.1', 6653): <instance of StreamClient>,
148 for address in CONF.ofp_switch_address_list:
149 addr = tuple(_split_addr(address))
150 self.spawn_client_loop(addr)
152 self.server_loop(self.ofp_tcp_listen_port,
153 self.ofp_ssl_listen_port)
155 def spawn_client_loop(self, addr, interval=None):
156 interval = interval or CONF.ofp_switch_connect_interval
157 client = hub.StreamClient(addr)
158 hub.spawn(client.connect_loop, datapath_connection_factory, interval)
159 self._clients[addr] = client
161 def stop_client_loop(self, addr):
162 client = self._clients.get(addr, None)
163 if client is not None:
166 def server_loop(self, ofp_tcp_listen_port, ofp_ssl_listen_port):
167 if CONF.ctl_privkey is not None and CONF.ctl_cert is not None:
168 if not hasattr(ssl, 'SSLContext'):
169 # anything less than python 2.7.9 supports only TLSv1
170 # or less, thus we choose TLSv1
171 ssl_args = {'ssl_version': ssl.PROTOCOL_TLSv1}
173 # from 2.7.9 and versions 3.4+ ssl context creation is
174 # supported. Protocol_TLS from 2.7.13 and from 3.5.3
175 # replaced SSLv23. Functionality is similar.
176 if hasattr(ssl, 'PROTOCOL_TLS'):
179 p = 'PROTOCOL_SSLv23'
181 ssl_args = {'ssl_ctx': ssl.SSLContext(getattr(ssl, p))}
182 # Restrict non-safe versions
183 ssl_args['ssl_ctx'].options |= ssl.OP_NO_SSLv3 | ssl.OP_NO_SSLv2
185 if CONF.ca_certs is not None:
186 server = StreamServer((CONF.ofp_listen_host,
187 ofp_ssl_listen_port),
188 datapath_connection_factory,
189 keyfile=CONF.ctl_privkey,
190 certfile=CONF.ctl_cert,
191 cert_reqs=ssl.CERT_REQUIRED,
192 ca_certs=CONF.ca_certs, **ssl_args)
194 server = StreamServer((CONF.ofp_listen_host,
195 ofp_ssl_listen_port),
196 datapath_connection_factory,
197 keyfile=CONF.ctl_privkey,
198 certfile=CONF.ctl_cert, **ssl_args)
200 server = StreamServer((CONF.ofp_listen_host,
201 ofp_tcp_listen_port),
202 datapath_connection_factory)
205 server.serve_forever()
208 def _deactivate(method):
209 def deactivate(self):
221 class Datapath(ofproto_protocol.ProtocolDesc):
223 A class to describe an OpenFlow switch connected to this controller.
225 An instance has the following attributes.
227 .. tabularcolumns:: |l|L|
229 ==================================== ======================================
230 Attribute Description
231 ==================================== ======================================
232 id 64-bit OpenFlow Datapath ID.
234 ryu.controller.handler.MAIN_DISPATCHER
236 ofproto A module which exports OpenFlow
237 definitions, mainly constants appeared
238 in the specification, for the
239 negotiated OpenFlow version. For
240 example, ryu.ofproto.ofproto_v1_0 for
242 ofproto_parser A module which exports OpenFlow wire
243 message encoder and decoder for the
244 negotiated OpenFlow version.
246 ryu.ofproto.ofproto_v1_0_parser
248 ofproto_parser.OFPxxxx(datapath,...) A callable to prepare an OpenFlow
249 message for the given switch. It can
250 be sent with Datapath.send_msg later.
251 xxxx is a name of the message. For
252 example OFPFlowMod for flow-mod
253 message. Arguemnts depend on the
255 set_xid(self, msg) Generate an OpenFlow XID and put it
257 send_msg(self, msg) Queue an OpenFlow message to send to
258 the corresponding switch. If msg.xid
259 is None, set_xid is automatically
260 called on the message before queueing.
261 send_packet_out deprecated
262 send_flow_mod deprecated
263 send_flow_del deprecated
264 send_delete_all_flows deprecated
265 send_barrier Queue an OpenFlow barrier message to
267 send_nxt_set_flow_format deprecated
268 is_reserved_port deprecated
269 ==================================== ======================================
272 def __init__(self, socket, address):
273 super(Datapath, self).__init__()
276 self.socket.setsockopt(IPPROTO_TCP, TCP_NODELAY, 1)
277 self.socket.settimeout(CONF.socket_timeout)
278 self.address = address
279 self.is_active = True
281 # The limit is arbitrary. We need to limit queue size to
282 # prevent it from eating memory up.
283 self.send_q = hub.Queue(16)
284 self._send_q_sem = hub.BoundedSemaphore(self.send_q.maxsize)
286 self.echo_request_interval = CONF.echo_request_interval
287 self.max_unreplied_echo_requests = CONF.maximum_unreplied_echo_requests
288 self.unreplied_echo_requests = []
290 self.xid = random.randint(0, self.ofproto.MAX_XID)
291 self.id = None # datapath_id is unknown yet
293 self.flow_format = ofproto_v1_0.NXFF_OPENFLOW10
294 self.ofp_brick = ryu.base.app_manager.lookup_service_brick('ofp_event')
295 self.state = None # for pylint
296 self.set_state(HANDSHAKE_DISPATCHER)
298 def _close_write(self):
299 # Note: Close only further sends in order to wait for the switch to
300 # disconnect this connection.
302 self.socket.shutdown(SHUT_WR)
303 except (EOFError, IOError):
307 self.set_state(DEAD_DISPATCHER)
310 def set_state(self, state):
311 if self.state == state:
314 ev = ofp_event.EventOFPStateChange(self)
316 self.ofp_brick.send_event_to_observers(ev, state)
318 # Low level socket handling layer
320 def _recv_loop(self):
323 min_read_len = remaining_read_len = ofproto_common.OFP_HEADER_SIZE
325 while self.state != DEAD_DISPATCHER:
327 read_len = min_read_len
328 if remaining_read_len > min_read_len:
329 read_len = remaining_read_len
330 ret = self.socket.recv(read_len)
331 except SocketTimeout:
334 # eventlet throws SSLError (which is a subclass of IOError)
335 # on SSL socket read timeout; re-try the loop in this case.
337 except (EOFError, IOError):
345 while buf_len >= min_read_len:
346 (version, msg_type, msg_len, xid) = ofproto_parser.header(buf)
347 if msg_len < min_read_len:
348 # Someone isn't playing nicely; log it, and try something sane.
349 LOG.debug("Message with invalid length %s received from switch at address %s",
350 msg_len, self.address)
351 msg_len = min_read_len
352 if buf_len < msg_len:
353 remaining_read_len = (msg_len - buf_len)
356 msg = ofproto_parser.msg(
357 self, version, msg_type, msg_len, xid, buf[:msg_len])
358 # LOG.debug('queue msg %s cls %s', msg, msg.__class__)
360 ev = ofp_event.ofp_msg_to_ev(msg)
361 self.ofp_brick.send_event_to_observers(ev, self.state)
364 return x.callers[ev.__class__].dispatchers
366 handlers = [handler for handler in
367 self.ofp_brick.get_handlers(ev) if
368 self.state in dispatchers(handler)]
369 for handler in handlers:
374 remaining_read_len = min_read_len
376 # We need to schedule other greenlets. Otherwise, ryu
377 # can't accept new switches or handle the existing
378 # switches. The limit is arbitrary. We need the better
379 # approach in the future.
385 def _send_loop(self):
387 while self.state != DEAD_DISPATCHER:
388 buf, close_socket = self.send_q.get()
389 self._send_q_sem.release()
390 self.socket.sendall(buf)
393 except SocketTimeout:
394 LOG.debug("Socket timed out while sending data to switch at address %s",
396 except IOError as ioe:
397 # Convert ioe.errno to a string, just in case it was somehow set to None.
398 errno = "%s" % ioe.errno
399 LOG.debug("Socket error while sending data to switch at address %s: [%s] %s",
400 self.address, errno, ioe.strerror)
403 # First, clear self.send_q to prevent new references.
405 # Now, drain the send_q, releasing the associated semaphore for each entry.
406 # This should release all threads waiting to acquire the semaphore.
408 while q.get(block=False):
409 self._send_q_sem.release()
410 except hub.QueueEmpty:
412 # Finally, disallow further sends.
415 def send(self, buf, close_socket=False):
417 self._send_q_sem.acquire()
419 self.send_q.put((buf, close_socket))
422 self._send_q_sem.release()
424 LOG.debug('Datapath in process of terminating; send() to %s discarded.',
428 def set_xid(self, msg):
430 self.xid &= self.ofproto.MAX_XID
431 msg.set_xid(self.xid)
434 def send_msg(self, msg, close_socket=False):
435 assert isinstance(msg, self.ofproto_parser.MsgBase)
439 # LOG.debug('send_msg %s', msg)
440 return self.send(msg.buf, close_socket=close_socket)
442 def _echo_request_loop(self):
443 if not self.max_unreplied_echo_requests:
445 while (self.send_q and
446 (len(self.unreplied_echo_requests) <= self.max_unreplied_echo_requests)):
447 echo_req = self.ofproto_parser.OFPEchoRequest(self)
448 self.unreplied_echo_requests.append(self.set_xid(echo_req))
449 self.send_msg(echo_req)
450 hub.sleep(self.echo_request_interval)
453 def acknowledge_echo_reply(self, xid):
455 self.unreplied_echo_requests.remove(xid)
460 send_thr = hub.spawn(self._send_loop)
462 # send hello message immediately
463 hello = self.ofproto_parser.OFPHello(self)
466 echo_thr = hub.spawn(self._echo_request_loop)
473 hub.joinall([send_thr, echo_thr])
474 self.is_active = False
477 # Utility methods for convenience
479 def send_packet_out(self, buffer_id=0xffffffff, in_port=None,
480 actions=None, data=None):
482 in_port = self.ofproto.OFPP_NONE
483 packet_out = self.ofproto_parser.OFPPacketOut(
484 self, buffer_id, in_port, actions, data)
485 self.send_msg(packet_out)
487 def send_flow_mod(self, rule, cookie, command, idle_timeout, hard_timeout,
488 priority=None, buffer_id=0xffffffff,
489 out_port=None, flags=0, actions=None):
491 priority = self.ofproto.OFP_DEFAULT_PRIORITY
493 out_port = self.ofproto.OFPP_NONE
494 flow_format = rule.flow_format()
495 assert (flow_format == ofproto_v1_0.NXFF_OPENFLOW10 or
496 flow_format == ofproto_v1_0.NXFF_NXM)
497 if self.flow_format < flow_format:
498 self.send_nxt_set_flow_format(flow_format)
499 if flow_format == ofproto_v1_0.NXFF_OPENFLOW10:
500 match_tuple = rule.match_tuple()
501 match = self.ofproto_parser.OFPMatch(*match_tuple)
502 flow_mod = self.ofproto_parser.OFPFlowMod(
503 self, match, cookie, command, idle_timeout, hard_timeout,
504 priority, buffer_id, out_port, flags, actions)
506 flow_mod = self.ofproto_parser.NXTFlowMod(
507 self, cookie, command, idle_timeout, hard_timeout,
508 priority, buffer_id, out_port, flags, rule, actions)
509 self.send_msg(flow_mod)
511 def send_flow_del(self, rule, cookie, out_port=None):
512 self.send_flow_mod(rule=rule, cookie=cookie,
513 command=self.ofproto.OFPFC_DELETE,
514 idle_timeout=0, hard_timeout=0, priority=0,
517 def send_delete_all_flows(self):
518 rule = nx_match.ClsRule()
520 rule=rule, cookie=0, command=self.ofproto.OFPFC_DELETE,
521 idle_timeout=0, hard_timeout=0, priority=0, buffer_id=0,
522 out_port=self.ofproto.OFPP_NONE, flags=0, actions=None)
524 def send_barrier(self):
525 barrier_request = self.ofproto_parser.OFPBarrierRequest(self)
526 return self.send_msg(barrier_request)
528 def send_nxt_set_flow_format(self, flow_format):
529 assert (flow_format == ofproto_v1_0.NXFF_OPENFLOW10 or
530 flow_format == ofproto_v1_0.NXFF_NXM)
531 if self.flow_format == flow_format:
534 self.flow_format = flow_format
535 set_format = self.ofproto_parser.NXTSetFlowFormat(self, flow_format)
536 # FIXME: If NXT_SET_FLOW_FORMAT or NXFF_NXM is not supported by
537 # the switch then an error message will be received. It may be
538 # handled by setting self.flow_format to
539 # ofproto_v1_0.NXFF_OPENFLOW10 but currently isn't.
540 self.send_msg(set_format)
543 def is_reserved_port(self, port_no):
544 return port_no > self.ofproto.OFPP_MAX
547 def datapath_connection_factory(socket, address):
548 LOG.debug('connected socket:%s address:%s', socket, address)
549 with contextlib.closing(Datapath(socket, address)) as datapath:
553 # Something went wrong.
554 # Especially malicious switch can send malformed packet,
555 # the parser raise exception.
556 # Can we do anything more graceful?
557 if datapath.id is None:
558 dpid_str = "%s" % datapath.id
560 dpid_str = dpid_to_str(datapath.id)
561 LOG.error("Error in the datapath %s from %s", dpid_str, address)