1 # Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
18 from ryu.base import app_manager
19 from ryu.controller import event
20 from ryu.controller import ofp_event
21 from ryu.controller.handler import MAIN_DISPATCHER
22 from ryu.controller.handler import set_ev_cls
23 from ryu.ofproto import ether
24 from ryu.ofproto import ofproto_v1_0
25 from ryu.ofproto import ofproto_v1_2
26 from ryu.ofproto import ofproto_v1_3
27 from ryu.lib import addrconv
28 from ryu.lib.dpid import dpid_to_str
29 from ryu.lib.packet import packet
30 from ryu.lib.packet import ethernet
31 from ryu.lib.packet import slow
34 class EventPacketIn(event.EventBase):
35 """a PacketIn event class using except LACP."""
37 def __init__(self, msg):
39 super(EventPacketIn, self).__init__()
43 class EventSlaveStateChanged(event.EventBase):
44 """a event class that notifies the changes of the statuses of the
47 def __init__(self, datapath, port, enabled):
49 super(EventSlaveStateChanged, self).__init__()
50 self.datapath = datapath
52 self.enabled = enabled
55 class LacpLib(app_manager.RyuApp):
56 """LACP exchange library. this works only in a PASSIVE mode."""
58 # -------------------------------------------------------------------
60 # -------------------------------------------------------------------
63 super(LacpLib, self).__init__()
67 ofproto_v1_0.OFP_VERSION: self._add_flow_v1_0,
68 ofproto_v1_2.OFP_VERSION: self._add_flow_v1_2,
69 ofproto_v1_3.OFP_VERSION: self._add_flow_v1_2,
73 def add(self, dpid, ports):
74 """add a setting of a bonding i/f.
75 'add' method takes the corresponding args in this order.
77 ========= =====================================================
79 ========= =====================================================
82 ports a list of integer values that means the ports face
84 ========= =====================================================
86 if you want to use multi LAG, call 'add' method more than once.
88 assert isinstance(ports, list)
89 assert len(ports) >= 2
92 ifs[port] = {'enabled': False, 'timeout': 0}
94 self._bonds.append(bond)
96 # -------------------------------------------------------------------
97 # PUBLIC METHODS ( EVENT HANDLERS )
98 # -------------------------------------------------------------------
99 @set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER)
100 def packet_in_handler(self, evt):
101 """PacketIn event handler. when the received packet was LACP,
102 proceed it. otherwise, send a event."""
103 req_pkt = packet.Packet(evt.msg.data)
104 if slow.lacp in req_pkt:
105 (req_lacp, ) = req_pkt.get_protocols(slow.lacp)
106 (req_eth, ) = req_pkt.get_protocols(ethernet.ethernet)
107 self._do_lacp(req_lacp, req_eth.src, evt.msg)
109 self.send_event_to_observers(EventPacketIn(evt.msg))
111 @set_ev_cls(ofp_event.EventOFPFlowRemoved, MAIN_DISPATCHER)
112 def flow_removed_handler(self, evt):
113 """FlowRemoved event handler. when the removed flow entry was
114 for LACP, set the status of the slave i/f to disabled, and
117 datapath = msg.datapath
118 ofproto = datapath.ofproto
121 if ofproto.OFP_VERSION == ofproto_v1_0.OFP_VERSION:
123 dl_type = match.dl_type
125 port = match['in_port']
126 dl_type = match['eth_type']
127 if ether.ETH_TYPE_SLOW != dl_type:
130 "SW=%s PORT=%d LACP exchange timeout has occurred.",
131 dpid_to_str(dpid), port)
132 self._set_slave_enabled(dpid, port, False)
133 self._set_slave_timeout(dpid, port, 0)
134 self.send_event_to_observers(
135 EventSlaveStateChanged(datapath, port, False))
137 # -------------------------------------------------------------------
138 # PRIVATE METHODS ( RELATED TO LACP )
139 # -------------------------------------------------------------------
140 def _do_lacp(self, req_lacp, src, msg):
141 """packet-in process when the received packet is LACP."""
142 datapath = msg.datapath
144 ofproto = datapath.ofproto
145 parser = datapath.ofproto_parser
146 if ofproto.OFP_VERSION == ofproto_v1_0.OFP_VERSION:
149 port = msg.match['in_port']
151 self.logger.info("SW=%s PORT=%d LACP received.",
152 dpid_to_str(dpid), port)
153 self.logger.debug(str(req_lacp))
155 # when LACP arrived at disabled port, update the status of
156 # the slave i/f to enabled, and send a event.
157 if not self._get_slave_enabled(dpid, port):
159 "SW=%s PORT=%d the slave i/f has just been up.",
160 dpid_to_str(dpid), port)
161 self._set_slave_enabled(dpid, port, True)
162 self.send_event_to_observers(
163 EventSlaveStateChanged(datapath, port, True))
165 # set the idle_timeout time using the actor state of the
167 if req_lacp.LACP_STATE_SHORT_TIMEOUT == \
168 req_lacp.actor_state_timeout:
169 idle_timeout = req_lacp.SHORT_TIMEOUT_TIME
171 idle_timeout = req_lacp.LONG_TIMEOUT_TIME
173 # when the timeout time has changed, update the timeout time of
174 # the slave i/f and re-enter a flow entry for the packet from
175 # the slave i/f with idle_timeout.
176 if idle_timeout != self._get_slave_timeout(dpid, port):
178 "SW=%s PORT=%d the timeout time has changed.",
179 dpid_to_str(dpid), port)
180 self._set_slave_timeout(dpid, port, idle_timeout)
181 func = self._add_flow.get(ofproto.OFP_VERSION)
183 func(src, port, idle_timeout, datapath)
185 # create a response packet.
186 res_pkt = self._create_response(datapath, port, req_lacp)
188 # packet-out the response packet.
189 out_port = ofproto.OFPP_IN_PORT
190 actions = [parser.OFPActionOutput(out_port)]
191 out = datapath.ofproto_parser.OFPPacketOut(
192 datapath=datapath, buffer_id=ofproto.OFP_NO_BUFFER,
193 data=res_pkt.data, in_port=port, actions=actions)
194 datapath.send_msg(out)
196 def _create_response(self, datapath, port, req):
197 """create a packet including LACP."""
198 src = datapath.ports[port].hw_addr
199 res_ether = ethernet.ethernet(
200 slow.SLOW_PROTOCOL_MULTICAST, src, ether.ETH_TYPE_SLOW)
201 res_lacp = self._create_lacp(datapath, port, req)
202 res_pkt = packet.Packet()
203 res_pkt.add_protocol(res_ether)
204 res_pkt.add_protocol(res_lacp)
208 def _create_lacp(self, datapath, port, req):
209 """create a LACP packet."""
210 actor_system = datapath.ports[datapath.ofproto.OFPP_LOCAL].hw_addr
212 actor_system_priority=0xffff,
213 actor_system=actor_system,
214 actor_key=req.actor_key,
215 actor_port_priority=0xff,
217 actor_state_activity=req.LACP_STATE_PASSIVE,
218 actor_state_timeout=req.actor_state_timeout,
219 actor_state_aggregation=req.actor_state_aggregation,
220 actor_state_synchronization=req.actor_state_synchronization,
221 actor_state_collecting=req.actor_state_collecting,
222 actor_state_distributing=req.actor_state_distributing,
223 actor_state_defaulted=req.LACP_STATE_OPERATIONAL_PARTNER,
224 actor_state_expired=req.LACP_STATE_NOT_EXPIRED,
225 partner_system_priority=req.actor_system_priority,
226 partner_system=req.actor_system,
227 partner_key=req.actor_key,
228 partner_port_priority=req.actor_port_priority,
229 partner_port=req.actor_port,
230 partner_state_activity=req.actor_state_activity,
231 partner_state_timeout=req.actor_state_timeout,
232 partner_state_aggregation=req.actor_state_aggregation,
233 partner_state_synchronization=req.actor_state_synchronization,
234 partner_state_collecting=req.actor_state_collecting,
235 partner_state_distributing=req.actor_state_distributing,
236 partner_state_defaulted=req.actor_state_defaulted,
237 partner_state_expired=req.actor_state_expired,
238 collector_max_delay=0)
239 self.logger.info("SW=%s PORT=%d LACP sent.",
240 dpid_to_str(datapath.id), port)
241 self.logger.debug(str(res))
244 def _get_slave_enabled(self, dpid, port):
245 """get whether a slave i/f at some port of some datapath is
247 slave = self._get_slave(dpid, port)
249 return slave['enabled']
253 def _set_slave_enabled(self, dpid, port, enabled):
254 """set whether a slave i/f at some port of some datapath is
256 slave = self._get_slave(dpid, port)
258 slave['enabled'] = enabled
260 def _get_slave_timeout(self, dpid, port):
261 """get the timeout time at some port of some datapath."""
262 slave = self._get_slave(dpid, port)
264 return slave['timeout']
268 def _set_slave_timeout(self, dpid, port, timeout):
269 """set the timeout time at some port of some datapath."""
270 slave = self._get_slave(dpid, port)
272 slave['timeout'] = timeout
274 def _get_slave(self, dpid, port):
275 """get slave i/f at some port of some datapath."""
277 for bond in self._bonds:
279 if port in bond[dpid]:
280 result = bond[dpid][port]
284 # -------------------------------------------------------------------
285 # PRIVATE METHODS ( RELATED TO OPEN FLOW PROTOCOL )
286 # -------------------------------------------------------------------
287 def _add_flow_v1_0(self, src, port, timeout, datapath):
288 """enter a flow entry for the packet from the slave i/f
289 with idle_timeout. for OpenFlow ver1.0."""
290 ofproto = datapath.ofproto
291 parser = datapath.ofproto_parser
293 match = parser.OFPMatch(
294 in_port=port, dl_src=addrconv.mac.text_to_bin(src),
295 dl_type=ether.ETH_TYPE_SLOW)
296 actions = [parser.OFPActionOutput(
297 ofproto.OFPP_CONTROLLER, 65535)]
298 mod = parser.OFPFlowMod(
299 datapath=datapath, match=match, cookie=0,
300 command=ofproto.OFPFC_ADD, idle_timeout=timeout,
301 priority=65535, flags=ofproto.OFPFF_SEND_FLOW_REM,
303 datapath.send_msg(mod)
305 def _add_flow_v1_2(self, src, port, timeout, datapath):
306 """enter a flow entry for the packet from the slave i/f
307 with idle_timeout. for OpenFlow ver1.2 and ver1.3."""
308 ofproto = datapath.ofproto
309 parser = datapath.ofproto_parser
311 match = parser.OFPMatch(
312 in_port=port, eth_src=src, eth_type=ether.ETH_TYPE_SLOW)
313 actions = [parser.OFPActionOutput(
314 ofproto.OFPP_CONTROLLER, ofproto.OFPCML_MAX)]
315 inst = [parser.OFPInstructionActions(
316 ofproto.OFPIT_APPLY_ACTIONS, actions)]
317 mod = parser.OFPFlowMod(
318 datapath=datapath, command=ofproto.OFPFC_ADD,
319 idle_timeout=timeout, priority=65535,
320 flags=ofproto.OFPFF_SEND_FLOW_REM, match=match,
322 datapath.send_msg(mod)
324 # -------------------------------------------------------------------
325 # PRIVATE METHODS ( OTHERS )
326 # -------------------------------------------------------------------
327 def _set_logger(self):
328 """change log format."""
329 self.logger.propagate = False
330 hdl = logging.StreamHandler()
331 fmt_str = '[LACP][%(levelname)s] %(message)s'
332 hdl.setFormatter(logging.Formatter(fmt_str))
333 self.logger.addHandler(hdl)