1 # Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
22 from ryu.topology import event
23 from ryu.base import app_manager
24 from ryu.controller import ofp_event
25 from ryu.controller.handler import set_ev_cls
26 from ryu.controller.handler import MAIN_DISPATCHER, DEAD_DISPATCHER
27 from ryu.exception import RyuException
28 from ryu.lib import addrconv, hub
29 from ryu.lib.mac import DONTCARE_STR
30 from ryu.lib.dpid import dpid_to_str, str_to_dpid
31 from ryu.lib.port_no import port_no_to_str
32 from ryu.lib.packet import packet, ethernet
33 from ryu.lib.packet import lldp, ether_types
34 from ryu.ofproto.ether import ETH_TYPE_LLDP
35 from ryu.ofproto.ether import ETH_TYPE_CFM
36 from ryu.ofproto import nx_match
37 from ryu.ofproto import ofproto_v1_0
38 from ryu.ofproto import ofproto_v1_2
39 from ryu.ofproto import ofproto_v1_3
40 from ryu.ofproto import ofproto_v1_4
43 LOG = logging.getLogger(__name__)
48 CONF.register_cli_opts([
49 cfg.BoolOpt('observe-links', default=False,
50 help='observe link discovery events.'),
51 cfg.BoolOpt('install-lldp-flow', default=True,
52 help='link discovery: explicitly install flow entry '
53 'to send lldp packet to controller'),
54 cfg.BoolOpt('explicit-drop', default=True,
55 help='link discovery: explicitly drop lldp packet in')
60 # This is data class passed by EventPortXXX
61 def __init__(self, dpid, ofproto, ofpport):
62 super(Port, self).__init__()
65 self._ofproto = ofproto
66 self._config = ofpport.config
67 self._state = ofpport.state
69 self.port_no = ofpport.port_no
70 self.hw_addr = ofpport.hw_addr
71 self.name = ofpport.name
73 def is_reserved(self):
74 return self.port_no > self._ofproto.OFPP_MAX
77 return (self._state & self._ofproto.OFPPS_LINK_DOWN) > 0 \
78 or (self._config & self._ofproto.OFPPC_PORT_DOWN) > 0
81 # NOTE: OF1.2 has OFPPS_LIVE state
82 # return (self._state & self._ofproto.OFPPS_LIVE) > 0
83 return not self.is_down()
86 return {'dpid': dpid_to_str(self.dpid),
87 'port_no': port_no_to_str(self.port_no),
88 'hw_addr': self.hw_addr,
89 'name': self.name.decode('utf-8')}
91 # for Switch.del_port()
92 def __eq__(self, other):
93 return self.dpid == other.dpid and self.port_no == other.port_no
95 def __ne__(self, other):
96 return not self.__eq__(other)
99 return hash((self.dpid, self.port_no))
102 LIVE_MSG = {False: 'DOWN', True: 'LIVE'}
103 return 'Port<dpid=%s, port_no=%s, %s>' % \
104 (self.dpid, self.port_no, LIVE_MSG[self.is_live()])
107 class Switch(object):
108 # This is data class passed by EventSwitchXXX
109 def __init__(self, dp):
110 super(Switch, self).__init__()
115 def add_port(self, ofpport):
116 port = Port(self.dp.id, self.dp.ofproto, ofpport)
117 if not port.is_reserved():
118 self.ports.append(port)
120 def del_port(self, ofpport):
121 self.ports.remove(Port(ofpport))
124 d = {'dpid': dpid_to_str(self.dp.id),
125 'ports': [port.to_dict() for port in self.ports]}
129 msg = 'Switch<dpid=%s, ' % self.dp.id
130 for port in self.ports:
131 msg += str(port) + ' '
138 # This is data class passed by EventLinkXXX
139 def __init__(self, src, dst):
140 super(Link, self).__init__()
145 d = {'src': self.src.to_dict(),
146 'dst': self.dst.to_dict()}
149 # this type is used for key value of LinkState
150 def __eq__(self, other):
151 return self.src == other.src and self.dst == other.dst
153 def __ne__(self, other):
154 return not self.__eq__(other)
157 return hash((self.src, self.dst))
160 return 'Link: %s to %s' % (self.src, self.dst)
164 # This is data class passed by EventHostXXX
165 def __init__(self, mac, port):
166 super(Host, self).__init__()
173 d = {'mac': self.mac,
176 'port': self.port.to_dict()}
179 def __eq__(self, host):
180 return self.mac == host.mac and self.port == host.port
183 msg = 'Host<mac=%s, port=%s,' % (self.mac, str(self.port))
184 msg += ','.join(self.ipv4)
185 msg += ','.join(self.ipv6)
190 class HostState(dict):
191 # mac address -> Host class
193 super(HostState, self).__init__()
197 self.setdefault(mac, host)
199 def update_ip(self, host, ip_v4=None, ip_v6=None):
208 if ip_v4 is not None:
209 if ip_v4 in host.ipv4:
210 host.ipv4.remove(ip_v4)
211 host.ipv4.append(ip_v4)
213 if ip_v6 is not None:
214 if ip_v6 in host.ipv6:
215 host.ipv6.remove(ip_v6)
216 host.ipv6.append(ip_v6)
218 def get_by_dpid(self, dpid):
223 if host.port.dpid == dpid:
229 class PortState(dict):
230 # dict: int port_no -> OFPPort port
231 # OFPPort is defined in ryu.ofproto.ofproto_v1_X_parser
233 super(PortState, self).__init__()
235 def add(self, port_no, port):
238 def remove(self, port_no):
241 def modify(self, port_no, port):
245 class PortData(object):
246 def __init__(self, is_down, lldp_data):
247 super(PortData, self).__init__()
248 self.is_down = is_down
249 self.lldp_data = lldp_data
250 self.timestamp = None
254 self.timestamp = time.time()
257 def lldp_received(self):
260 def lldp_dropped(self):
263 def clear_timestamp(self):
264 self.timestamp = None
266 def set_down(self, is_down):
267 self.is_down = is_down
270 return 'PortData<live=%s, timestamp=%s, sent=%d>' \
271 % (not self.is_down, self.timestamp, self.sent)
274 class PortDataState(dict):
275 # dict: Port class -> PortData class
276 # slimed down version of OrderedDict as python 2.6 doesn't support it.
282 super(PortDataState, self).__init__()
283 self._root = root = [] # sentinel node
284 root[:] = [root, root, None] # [_PREV, _NEXT, _KEY] doubly linked list
287 def _remove_key(self, key):
288 link_prev, link_next, key = self._map.pop(key)
289 link_prev[self._NEXT] = link_next
290 link_next[self._PREV] = link_prev
292 def _append_key(self, key):
294 last = root[self._PREV]
295 last[self._NEXT] = root[self._PREV] = self._map[key] = [last, root,
298 def _prepend_key(self, key):
300 first = root[self._NEXT]
301 first[self._PREV] = root[self._NEXT] = self._map[key] = [root, first,
304 def _move_last_key(self, key):
305 self._remove_key(key)
306 self._append_key(key)
308 def _move_front_key(self, key):
309 self._remove_key(key)
310 self._prepend_key(key)
312 def add_port(self, port, lldp_data):
314 self._prepend_key(port)
315 self[port] = PortData(port.is_down(), lldp_data)
317 self[port].is_down = port.is_down()
319 def lldp_sent(self, port):
320 port_data = self[port]
321 port_data.lldp_sent()
322 self._move_last_key(port)
325 def lldp_received(self, port):
326 self[port].lldp_received()
328 def move_front(self, port):
329 port_data = self.get(port, None)
330 if port_data is not None:
331 port_data.clear_timestamp()
332 self._move_front_key(port)
334 def set_down(self, port):
335 is_down = port.is_down()
336 port_data = self[port]
337 port_data.set_down(is_down)
338 port_data.clear_timestamp()
340 self._move_front_key(port)
343 def get_port(self, port):
346 def del_port(self, port):
348 self._remove_key(port)
352 curr = root[self._NEXT]
353 while curr is not root:
354 yield curr[self._KEY]
355 curr = curr[self._NEXT]
358 for node in self._map.values():
361 root[:] = [root, root, None]
366 'od.items() -> list of (key, value) pairs in od'
367 return [(key, self[key]) for key in self]
370 'od.iteritems -> an iterator over the (key, value) pairs in od'
375 class LinkState(dict):
376 # dict: Link class -> timestamp
378 super(LinkState, self).__init__()
381 def get_peer(self, src):
382 return self._map.get(src, None)
384 def update_link(self, src, dst):
385 link = Link(src, dst)
387 self[link] = time.time()
390 # return if the reverse link is also up or not
391 rev_link = Link(dst, src)
392 return rev_link in self
394 def link_down(self, link):
396 del self._map[link.src]
398 def rev_link_set_timestamp(self, rev_link, timestamp):
399 # rev_link may or may not in LinkSet
401 self[rev_link] = timestamp
403 def port_deleted(self, src):
404 dst = self.get_peer(src)
408 link = Link(src, dst)
409 rev_link = Link(dst, src)
412 # reverse link might not exist
413 self.pop(rev_link, None)
414 rev_link_dst = self._map.pop(dst, None)
416 return dst, rev_link_dst
419 class LLDPPacket(object):
420 # make a LLDP packet for link discovery.
422 CHASSIS_ID_PREFIX = 'dpid:'
423 CHASSIS_ID_PREFIX_LEN = len(CHASSIS_ID_PREFIX)
424 CHASSIS_ID_FMT = CHASSIS_ID_PREFIX + '%s'
426 PORT_ID_STR = '!I' # uint32_t
429 class LLDPUnknownFormat(RyuException):
433 def lldp_packet(dpid, port_no, dl_addr, ttl):
434 pkt = packet.Packet()
436 dst = lldp.LLDP_MAC_NEAREST_BRIDGE
438 ethertype = ETH_TYPE_LLDP
439 eth_pkt = ethernet.ethernet(dst, src, ethertype)
440 pkt.add_protocol(eth_pkt)
442 tlv_chassis_id = lldp.ChassisID(
443 subtype=lldp.ChassisID.SUB_LOCALLY_ASSIGNED,
444 chassis_id=(LLDPPacket.CHASSIS_ID_FMT %
445 dpid_to_str(dpid)).encode('ascii'))
447 tlv_port_id = lldp.PortID(subtype=lldp.PortID.SUB_PORT_COMPONENT,
449 LLDPPacket.PORT_ID_STR,
452 tlv_ttl = lldp.TTL(ttl=ttl)
455 tlvs = (tlv_chassis_id, tlv_port_id, tlv_ttl, tlv_end)
456 lldp_pkt = lldp.lldp(tlvs)
457 pkt.add_protocol(lldp_pkt)
463 def lldp_parse(data):
464 pkt = packet.Packet(data)
466 eth_pkt = six.next(i)
467 assert type(eth_pkt) == ethernet.ethernet
469 lldp_pkt = six.next(i)
470 if type(lldp_pkt) != lldp.lldp:
471 raise LLDPPacket.LLDPUnknownFormat()
473 tlv_chassis_id = lldp_pkt.tlvs[0]
474 if tlv_chassis_id.subtype != lldp.ChassisID.SUB_LOCALLY_ASSIGNED:
475 raise LLDPPacket.LLDPUnknownFormat(
476 msg='unknown chassis id subtype %d' % tlv_chassis_id.subtype)
477 chassis_id = tlv_chassis_id.chassis_id.decode('utf-8')
478 if not chassis_id.startswith(LLDPPacket.CHASSIS_ID_PREFIX):
479 raise LLDPPacket.LLDPUnknownFormat(
480 msg='unknown chassis id format %s' % chassis_id)
481 src_dpid = str_to_dpid(chassis_id[LLDPPacket.CHASSIS_ID_PREFIX_LEN:])
483 tlv_port_id = lldp_pkt.tlvs[1]
484 if tlv_port_id.subtype != lldp.PortID.SUB_PORT_COMPONENT:
485 raise LLDPPacket.LLDPUnknownFormat(
486 msg='unknown port id subtype %d' % tlv_port_id.subtype)
487 port_id = tlv_port_id.port_id
488 if len(port_id) != LLDPPacket.PORT_ID_SIZE:
489 raise LLDPPacket.LLDPUnknownFormat(
490 msg='unknown port id %d' % port_id)
491 (src_port_no, ) = struct.unpack(LLDPPacket.PORT_ID_STR, port_id)
493 return src_dpid, src_port_no
496 class Switches(app_manager.RyuApp):
497 OFP_VERSIONS = [ofproto_v1_0.OFP_VERSION, ofproto_v1_2.OFP_VERSION,
498 ofproto_v1_3.OFP_VERSION, ofproto_v1_4.OFP_VERSION]
499 _EVENTS = [event.EventSwitchEnter, event.EventSwitchLeave,
500 event.EventSwitchReconnected,
501 event.EventPortAdd, event.EventPortDelete,
502 event.EventPortModify,
503 event.EventLinkAdd, event.EventLinkDelete,
506 DEFAULT_TTL = 120 # unused. ignored.
507 LLDP_PACKET_LEN = len(LLDPPacket.lldp_packet(0, 0, DONTCARE_STR, 0))
509 LLDP_SEND_GUARD = .05
510 LLDP_SEND_PERIOD_PER_PORT = .9
511 TIMEOUT_CHECK_PERIOD = 5.
512 LINK_TIMEOUT = TIMEOUT_CHECK_PERIOD * 2
515 def __init__(self, *args, **kwargs):
516 super(Switches, self).__init__(*args, **kwargs)
518 self.name = 'switches'
519 self.dps = {} # datapath_id => Datapath class
520 self.port_state = {} # datapath_id => ports
521 self.ports = PortDataState() # Port class -> PortData class
522 self.links = LinkState() # Link class -> timestamp
523 self.hosts = HostState() # mac address -> Host class list
524 self.is_active = True
526 self.link_discovery = self.CONF.observe_links
527 if self.link_discovery:
528 self.install_flow = self.CONF.install_lldp_flow
529 self.explicit_drop = self.CONF.explicit_drop
530 self.lldp_event = hub.Event()
531 self.link_event = hub.Event()
532 self.threads.append(hub.spawn(self.lldp_loop))
533 self.threads.append(hub.spawn(self.link_loop))
536 self.is_active = False
537 if self.link_discovery:
538 self.lldp_event.set()
539 self.link_event.set()
540 hub.joinall(self.threads)
542 def _register(self, dp):
543 assert dp.id is not None
546 if dp.id not in self.port_state:
547 self.port_state[dp.id] = PortState()
548 for port in dp.ports.values():
549 self.port_state[dp.id].add(port.port_no, port)
551 def _unregister(self, dp):
552 if dp.id in self.dps:
553 if (self.dps[dp.id] == dp):
555 del self.port_state[dp.id]
557 def _get_switch(self, dpid):
559 switch = Switch(self.dps[dpid])
560 for ofpport in self.port_state[dpid].values():
561 switch.add_port(ofpport)
564 def _get_port(self, dpid, port_no):
565 switch = self._get_switch(dpid)
567 for p in switch.ports:
568 if p.port_no == port_no:
571 def _port_added(self, port):
572 lldp_data = LLDPPacket.lldp_packet(
573 port.dpid, port.port_no, port.hw_addr, self.DEFAULT_TTL)
574 self.ports.add_port(port, lldp_data)
575 # LOG.debug('_port_added dpid=%s, port_no=%s, live=%s',
576 # port.dpid, port.port_no, port.is_live())
578 def _link_down(self, port):
580 dst, rev_link_dst = self.links.port_deleted(port)
582 # LOG.debug('key error. src=%s, dst=%s',
583 # port, self.links.get_peer(port))
585 link = Link(port, dst)
586 self.send_event_to_observers(event.EventLinkDelete(link))
588 rev_link = Link(dst, rev_link_dst)
589 self.send_event_to_observers(event.EventLinkDelete(rev_link))
590 self.ports.move_front(dst)
592 def _is_edge_port(self, port):
593 for link in self.links:
594 if port == link.src or port == link.dst:
599 @set_ev_cls(ofp_event.EventOFPStateChange,
600 [MAIN_DISPATCHER, DEAD_DISPATCHER])
601 def state_change_handler(self, ev):
603 assert dp is not None
606 if ev.state == MAIN_DISPATCHER:
607 dp_multiple_conns = False
608 if dp.id in self.dps:
609 LOG.warning('Multiple connections from %s', dpid_to_str(dp.id))
610 dp_multiple_conns = True
611 (self.dps[dp.id]).close()
614 switch = self._get_switch(dp.id)
615 LOG.debug('register %s', switch)
617 if not dp_multiple_conns:
618 self.send_event_to_observers(event.EventSwitchEnter(switch))
620 evt = event.EventSwitchReconnected(switch)
621 self.send_event_to_observers(evt)
623 if not self.link_discovery:
626 if self.install_flow:
628 ofproto_parser = dp.ofproto_parser
630 # TODO:XXX need other versions
631 if ofproto.OFP_VERSION == ofproto_v1_0.OFP_VERSION:
632 rule = nx_match.ClsRule()
633 rule.set_dl_dst(addrconv.mac.text_to_bin(
634 lldp.LLDP_MAC_NEAREST_BRIDGE))
635 rule.set_dl_type(ETH_TYPE_LLDP)
636 actions = [ofproto_parser.OFPActionOutput(
637 ofproto.OFPP_CONTROLLER, self.LLDP_PACKET_LEN)]
639 rule=rule, cookie=0, command=ofproto.OFPFC_ADD,
640 idle_timeout=0, hard_timeout=0, actions=actions,
642 elif ofproto.OFP_VERSION >= ofproto_v1_2.OFP_VERSION:
643 match = ofproto_parser.OFPMatch(
644 eth_type=ETH_TYPE_LLDP,
645 eth_dst=lldp.LLDP_MAC_NEAREST_BRIDGE)
646 # OFPCML_NO_BUFFER is set so that the LLDP is not
648 parser = ofproto_parser
649 actions = [parser.OFPActionOutput(ofproto.OFPP_CONTROLLER,
650 ofproto.OFPCML_NO_BUFFER
652 inst = [parser.OFPInstructionActions(
653 ofproto.OFPIT_APPLY_ACTIONS, actions)]
654 mod = parser.OFPFlowMod(datapath=dp, match=match,
655 idle_timeout=0, hard_timeout=0,
660 LOG.error('cannot install flow. unsupported version. %x',
661 dp.ofproto.OFP_VERSION)
663 # Do not add ports while dp has multiple connections to controller.
664 if not dp_multiple_conns:
665 for port in switch.ports:
666 if not port.is_reserved():
667 self._port_added(port)
669 self.lldp_event.set()
671 elif ev.state == DEAD_DISPATCHER:
672 # dp.id is None when datapath dies before handshake
676 switch = self._get_switch(dp.id)
680 LOG.debug('unregister %s', switch)
681 evt = event.EventSwitchLeave(switch)
682 self.send_event_to_observers(evt)
684 if not self.link_discovery:
687 for port in switch.ports:
688 if not port.is_reserved():
689 self.ports.del_port(port)
690 self._link_down(port)
691 self.lldp_event.set()
693 @set_ev_cls(ofp_event.EventOFPPortStatus, MAIN_DISPATCHER)
694 def port_status_handler(self, ev):
700 if reason == dp.ofproto.OFPPR_ADD:
701 # LOG.debug('A port was added.' +
702 # '(datapath id = %s, port number = %s)',
703 # dp.id, ofpport.port_no)
704 self.port_state[dp.id].add(ofpport.port_no, ofpport)
705 self.send_event_to_observers(
706 event.EventPortAdd(Port(dp.id, dp.ofproto, ofpport)))
708 if not self.link_discovery:
711 port = self._get_port(dp.id, ofpport.port_no)
712 if port and not port.is_reserved():
713 self._port_added(port)
714 self.lldp_event.set()
716 elif reason == dp.ofproto.OFPPR_DELETE:
717 # LOG.debug('A port was deleted.' +
718 # '(datapath id = %s, port number = %s)',
719 # dp.id, ofpport.port_no)
720 self.send_event_to_observers(
721 event.EventPortDelete(Port(dp.id, dp.ofproto, ofpport)))
723 if not self.link_discovery:
726 port = self._get_port(dp.id, ofpport.port_no)
727 if port and not port.is_reserved():
728 self.ports.del_port(port)
729 self._link_down(port)
730 self.lldp_event.set()
732 self.port_state[dp.id].remove(ofpport.port_no)
735 assert reason == dp.ofproto.OFPPR_MODIFY
736 # LOG.debug('A port was modified.' +
737 # '(datapath id = %s, port number = %s)',
738 # dp.id, ofpport.port_no)
739 self.port_state[dp.id].modify(ofpport.port_no, ofpport)
740 self.send_event_to_observers(
741 event.EventPortModify(Port(dp.id, dp.ofproto, ofpport)))
743 if not self.link_discovery:
746 port = self._get_port(dp.id, ofpport.port_no)
747 if port and not port.is_reserved():
748 if self.ports.set_down(port):
749 self._link_down(port)
750 self.lldp_event.set()
753 def _drop_packet(msg):
754 buffer_id = msg.buffer_id
755 if buffer_id == msg.datapath.ofproto.OFP_NO_BUFFER:
760 if dp.ofproto.OFP_VERSION == ofproto_v1_0.OFP_VERSION:
761 dp.send_packet_out(buffer_id, msg.in_port, [])
762 elif dp.ofproto.OFP_VERSION >= ofproto_v1_2.OFP_VERSION:
763 dp.send_packet_out(buffer_id, msg.match['in_port'], [])
765 LOG.error('cannot drop_packet. unsupported version. %x',
766 dp.ofproto.OFP_VERSION)
768 @set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER)
769 def lldp_packet_in_handler(self, ev):
770 if not self.link_discovery:
775 src_dpid, src_port_no = LLDPPacket.lldp_parse(msg.data)
776 except LLDPPacket.LLDPUnknownFormat:
777 # This handler can receive all the packets which can be
778 # not-LLDP packet. Ignore it silently
781 dst_dpid = msg.datapath.id
782 if msg.datapath.ofproto.OFP_VERSION == ofproto_v1_0.OFP_VERSION:
783 dst_port_no = msg.in_port
784 elif msg.datapath.ofproto.OFP_VERSION >= ofproto_v1_2.OFP_VERSION:
785 dst_port_no = msg.match['in_port']
787 LOG.error('cannot accept LLDP. unsupported version. %x',
788 msg.datapath.ofproto.OFP_VERSION)
790 src = self._get_port(src_dpid, src_port_no)
791 if not src or src.dpid == dst_dpid:
794 self.ports.lldp_received(src)
796 # There are races between EventOFPPacketIn and
797 # EventDPPortAdd. So packet-in event can happend before
798 # port add event. In that case key error can happend.
799 # LOG.debug('lldp_received error', exc_info=True)
802 dst = self._get_port(dst_dpid, dst_port_no)
806 old_peer = self.links.get_peer(src)
807 # LOG.debug("Packet-In")
808 # LOG.debug(" src=%s", src)
809 # LOG.debug(" dst=%s", dst)
810 # LOG.debug(" old_peer=%s", old_peer)
811 if old_peer and old_peer != dst:
812 old_link = Link(src, old_peer)
813 del self.links[old_link]
814 self.send_event_to_observers(event.EventLinkDelete(old_link))
816 link = Link(src, dst)
817 if link not in self.links:
818 self.send_event_to_observers(event.EventLinkAdd(link))
820 # remove hosts if it's not attached to edge port
822 for host in self.hosts.values():
823 if not self._is_edge_port(host.port):
824 host_to_del.append(host.mac)
826 for host_mac in host_to_del:
827 del self.hosts[host_mac]
829 if not self.links.update_link(src, dst):
830 # reverse link is not detected yet.
831 # So schedule the check early because it's very likely it's up
832 self.ports.move_front(dst)
833 self.lldp_event.set()
834 if self.explicit_drop:
835 self._drop_packet(msg)
837 @set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER)
838 def host_discovery_packet_in_handler(self, ev):
840 eth, pkt_type, pkt_data = ethernet.ethernet.parser(msg.data)
842 # ignore lldp and cfm packets
843 if eth.ethertype in (ETH_TYPE_LLDP, ETH_TYPE_CFM):
846 datapath = msg.datapath
850 if msg.datapath.ofproto.OFP_VERSION == ofproto_v1_0.OFP_VERSION:
851 port_no = msg.in_port
853 port_no = msg.match['in_port']
855 port = self._get_port(dpid, port_no)
857 # can't find this port(ex: logic port)
860 # ignore switch-to-switch port
861 if not self._is_edge_port(port):
865 host = Host(host_mac, port)
867 if host_mac not in self.hosts:
869 ev = event.EventHostAdd(host)
870 self.send_event_to_observers(ev)
871 elif self.hosts[host_mac].port != port:
872 # assumes the host is moved to another port
873 ev = event.EventHostMove(src=self.hosts[host_mac], dst=host)
874 self.hosts[host_mac] = host
875 self.send_event_to_observers(ev)
877 # arp packet, update ip address
878 if eth.ethertype == ether_types.ETH_TYPE_ARP:
879 arp_pkt, _, _ = pkt_type.parser(pkt_data)
880 self.hosts.update_ip(host, ip_v4=arp_pkt.src_ip)
882 # ipv4 packet, update ipv4 address
883 elif eth.ethertype == ether_types.ETH_TYPE_IP:
884 ipv4_pkt, _, _ = pkt_type.parser(pkt_data)
885 self.hosts.update_ip(host, ip_v4=ipv4_pkt.src)
887 # ipv6 packet, update ipv6 address
888 elif eth.ethertype == ether_types.ETH_TYPE_IPV6:
889 # TODO: need to handle NDP
890 ipv6_pkt, _, _ = pkt_type.parser(pkt_data)
891 self.hosts.update_ip(host, ip_v6=ipv6_pkt.src)
893 def send_lldp_packet(self, port):
895 port_data = self.ports.lldp_sent(port)
897 # ports can be modified during our sleep in self.lldp_loop()
898 # LOG.debug('send_lld error', exc_info=True)
900 if port_data.is_down:
903 dp = self.dps.get(port.dpid, None)
905 # datapath was already deleted
908 # LOG.debug('lldp sent dpid=%s, port_no=%d', dp.id, port.port_no)
910 if dp.ofproto.OFP_VERSION == ofproto_v1_0.OFP_VERSION:
911 actions = [dp.ofproto_parser.OFPActionOutput(port.port_no)]
912 dp.send_packet_out(actions=actions, data=port_data.lldp_data)
913 elif dp.ofproto.OFP_VERSION >= ofproto_v1_2.OFP_VERSION:
914 actions = [dp.ofproto_parser.OFPActionOutput(port.port_no)]
915 out = dp.ofproto_parser.OFPPacketOut(
916 datapath=dp, in_port=dp.ofproto.OFPP_CONTROLLER,
917 buffer_id=dp.ofproto.OFP_NO_BUFFER, actions=actions,
918 data=port_data.lldp_data)
921 LOG.error('cannot send lldp packet. unsupported version. %x',
922 dp.ofproto.OFP_VERSION)
925 while self.is_active:
926 self.lldp_event.clear()
932 for (key, data) in self.ports.items():
933 if data.timestamp is None:
934 ports_now.append(key)
937 expire = data.timestamp + self.LLDP_SEND_PERIOD_PER_PORT
942 timeout = expire - now
945 for port in ports_now:
946 self.send_lldp_packet(port)
948 self.send_lldp_packet(port)
949 hub.sleep(self.LLDP_SEND_GUARD) # don't burst
951 if timeout is not None and ports:
952 timeout = 0 # We have already slept
953 # LOG.debug('lldp sleep %s', timeout)
954 self.lldp_event.wait(timeout=timeout)
957 while self.is_active:
958 self.link_event.clear()
962 for (link, timestamp) in self.links.items():
963 # LOG.debug('%s timestamp %d (now %d)', link, timestamp, now)
964 if timestamp + self.LINK_TIMEOUT < now:
966 if src in self.ports:
967 port_data = self.ports.get_port(src)
968 # LOG.debug('port_data %s', port_data)
969 if port_data.lldp_dropped() > self.LINK_LLDP_DROP:
973 self.links.link_down(link)
974 # LOG.debug('delete %s', link)
975 self.send_event_to_observers(event.EventLinkDelete(link))
978 rev_link = Link(dst, link.src)
979 if rev_link not in deleted:
980 # It is very likely that the reverse link is also
981 # disconnected. Check it early.
982 expire = now - self.LINK_TIMEOUT
983 self.links.rev_link_set_timestamp(rev_link, expire)
984 if dst in self.ports:
985 self.ports.move_front(dst)
986 self.lldp_event.set()
988 self.link_event.wait(timeout=self.TIMEOUT_CHECK_PERIOD)
990 @set_ev_cls(event.EventSwitchRequest)
991 def switch_request_handler(self, req):
998 for dp in self.dps.values():
999 switches.append(self._get_switch(dp.id))
1000 elif dpid in self.dps:
1001 switches.append(self._get_switch(dpid))
1003 rep = event.EventSwitchReply(req.src, switches)
1004 self.reply_to_request(req, rep)
1006 @set_ev_cls(event.EventLinkRequest)
1007 def link_request_handler(self, req):
1014 links = [link for link in self.links if link.src.dpid == dpid]
1015 rep = event.EventLinkReply(req.src, dpid, links)
1016 self.reply_to_request(req, rep)
1018 @set_ev_cls(event.EventHostRequest)
1019 def host_request_handler(self, req):
1023 for mac in self.hosts:
1024 hosts.append(self.hosts[mac])
1026 hosts = self.hosts.get_by_dpid(dpid)
1028 rep = event.EventHostReply(req.src, dpid, hosts)
1029 self.reply_to_request(req, rep)