1 # Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
19 from ryu.base import app_manager
20 from ryu.controller import event
21 from ryu.controller import ofp_event
22 from ryu.controller.handler import DEAD_DISPATCHER
23 from ryu.controller.handler import MAIN_DISPATCHER
24 from ryu.controller.handler import set_ev_cls
25 from ryu.ofproto import ether
26 from ryu.ofproto import inet
27 from ryu.ofproto import ofproto_v1_0
28 from ryu.ofproto import ofproto_v1_2
29 from ryu.ofproto import ofproto_v1_3
30 from ryu.lib import addrconv
31 from ryu.lib import hub
32 from ryu.lib.dpid import dpid_to_str
33 from ryu.lib.packet import packet
34 from ryu.lib.packet import ethernet
35 from ryu.lib.packet import ipv4
36 from ryu.lib.packet import igmp
39 class EventPacketIn(event.EventBase):
40 """a PacketIn event class using except IGMP."""
42 def __init__(self, msg):
44 super(EventPacketIn, self).__init__()
53 class EventMulticastGroupStateChanged(event.EventBase):
54 """a event class that notifies the changes of the statuses of the
57 def __init__(self, reason, address, src, dsts):
59 ========= =====================================================
61 ========= =====================================================
62 reason why the event occurs. use one of MG_*.
63 address a multicast group address.
64 src a port number in which a querier exists.
65 dsts a list of port numbers in which the members exist.
66 ========= =====================================================
68 super(EventMulticastGroupStateChanged, self).__init__()
70 self.address = address
75 class IgmpLib(app_manager.RyuApp):
76 """IGMP snooping library."""
78 # -------------------------------------------------------------------
80 # -------------------------------------------------------------------
83 super(IgmpLib, self).__init__()
85 self._querier = IgmpQuerier()
86 self._snooper = IgmpSnooper(self.send_event_to_observers)
88 def set_querier_mode(self, dpid, server_port):
89 """set a datapath id and server port number to the instance
92 ============ ==================================================
94 ============ ==================================================
95 dpid the datapath id that will operate as a querier.
96 server_port the port number linked to the multicasting server.
97 ============ ==================================================
99 self._querier.set_querier_mode(dpid, server_port)
101 # -------------------------------------------------------------------
102 # PUBLIC METHODS ( EVENT HANDLERS )
103 # -------------------------------------------------------------------
104 @set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER)
105 def packet_in_handler(self, evt):
106 """PacketIn event handler. when the received packet was IGMP,
107 proceed it. otherwise, send a event."""
109 dpid = msg.datapath.id
111 req_pkt = packet.Packet(msg.data)
112 req_igmp = req_pkt.get_protocol(igmp.igmp)
114 if self._querier.dpid == dpid:
115 self._querier.packet_in_handler(req_igmp, msg)
117 self._snooper.packet_in_handler(req_pkt, req_igmp, msg)
119 self.send_event_to_observers(EventPacketIn(msg))
121 @set_ev_cls(ofp_event.EventOFPStateChange,
122 [MAIN_DISPATCHER, DEAD_DISPATCHER])
123 def state_change_handler(self, evt):
124 """StateChange event handler."""
125 datapath = evt.datapath
126 assert datapath is not None
127 if datapath.id == self._querier.dpid:
128 if evt.state == MAIN_DISPATCHER:
129 self._querier.start_loop(datapath)
130 elif evt.state == DEAD_DISPATCHER:
131 self._querier.stop_loop()
134 class IgmpBase(object):
135 """IGMP abstract class library."""
137 # -------------------------------------------------------------------
139 # -------------------------------------------------------------------
141 self._set_flow_func = {
142 ofproto_v1_0.OFP_VERSION: self._set_flow_entry_v1_0,
143 ofproto_v1_2.OFP_VERSION: self._set_flow_entry_v1_2,
144 ofproto_v1_3.OFP_VERSION: self._set_flow_entry_v1_2,
146 self._del_flow_func = {
147 ofproto_v1_0.OFP_VERSION: self._del_flow_entry_v1_0,
148 ofproto_v1_2.OFP_VERSION: self._del_flow_entry_v1_2,
149 ofproto_v1_3.OFP_VERSION: self._del_flow_entry_v1_2,
152 # -------------------------------------------------------------------
153 # PROTECTED METHODS ( RELATED TO OPEN FLOW PROTOCOL )
154 # -------------------------------------------------------------------
155 def _set_flow_entry_v1_0(self, datapath, actions, in_port, dst,
157 ofproto = datapath.ofproto
158 parser = datapath.ofproto_parser
160 match = parser.OFPMatch(
161 dl_type=ether.ETH_TYPE_IP, in_port=in_port,
162 nw_src=self._ipv4_text_to_int(src),
163 nw_dst=self._ipv4_text_to_int(dst))
164 mod = parser.OFPFlowMod(
165 datapath=datapath, match=match, cookie=0,
166 command=ofproto.OFPFC_ADD, actions=actions)
167 datapath.send_msg(mod)
169 def _set_flow_entry_v1_2(self, datapath, actions, in_port, dst,
171 ofproto = datapath.ofproto
172 parser = datapath.ofproto_parser
174 match = parser.OFPMatch(
175 eth_type=ether.ETH_TYPE_IP, in_port=in_port, ipv4_dst=dst)
177 match.append_field(ofproto.OXM_OF_IPV4_SRC, src)
178 inst = [parser.OFPInstructionActions(
179 ofproto.OFPIT_APPLY_ACTIONS, actions)]
180 mod = parser.OFPFlowMod(
181 datapath=datapath, command=ofproto.OFPFC_ADD,
182 priority=65535, match=match, instructions=inst)
183 datapath.send_msg(mod)
185 def _set_flow_entry(self, datapath, actions, in_port, dst, src=None):
186 """set a flow entry."""
187 set_flow = self._set_flow_func.get(datapath.ofproto.OFP_VERSION)
189 set_flow(datapath, actions, in_port, dst, src)
191 def _del_flow_entry_v1_0(self, datapath, in_port, dst, src=None):
192 ofproto = datapath.ofproto
193 parser = datapath.ofproto_parser
195 match = parser.OFPMatch(
196 dl_type=ether.ETH_TYPE_IP, in_port=in_port,
197 nw_src=self._ipv4_text_to_int(src),
198 nw_dst=self._ipv4_text_to_int(dst))
199 mod = parser.OFPFlowMod(
200 datapath=datapath, match=match, cookie=0,
201 command=ofproto.OFPFC_DELETE)
202 datapath.send_msg(mod)
204 def _del_flow_entry_v1_2(self, datapath, in_port, dst, src=None):
205 ofproto = datapath.ofproto
206 parser = datapath.ofproto_parser
208 match = parser.OFPMatch(
209 eth_type=ether.ETH_TYPE_IP, in_port=in_port, ipv4_dst=dst)
211 match.append_field(ofproto.OXM_OF_IPV4_SRC, src)
212 mod = parser.OFPFlowMod(
213 datapath=datapath, command=ofproto.OFPFC_DELETE,
214 out_port=ofproto.OFPP_ANY, out_group=ofproto.OFPG_ANY,
216 datapath.send_msg(mod)
218 def _del_flow_entry(self, datapath, in_port, dst, src=None):
219 """remove a flow entry."""
220 del_flow = self._del_flow_func.get(datapath.ofproto.OFP_VERSION)
222 del_flow(datapath, in_port, dst, src)
224 def _do_packet_out(self, datapath, data, in_port, actions):
226 ofproto = datapath.ofproto
227 parser = datapath.ofproto_parser
229 out = parser.OFPPacketOut(
230 datapath=datapath, buffer_id=ofproto.OFP_NO_BUFFER,
231 data=data, in_port=in_port, actions=actions)
232 datapath.send_msg(out)
234 # -------------------------------------------------------------------
235 # PROTECTED METHODS ( OTHERS )
236 # -------------------------------------------------------------------
237 def _ipv4_text_to_int(self, ip_text):
238 """convert ip v4 string to integer."""
241 assert isinstance(ip_text, str)
242 return struct.unpack('!I', addrconv.ipv4.text_to_bin(ip_text))[0]
245 class IgmpQuerier(IgmpBase):
246 """IGMP querier emulation class library.
248 this querier is a simplified implementation, and is not based on RFC,
249 for example as following points:
250 - ignore some constant values
251 - does not send a specific QUERY in response to LEAVE
255 # -------------------------------------------------------------------
257 # -------------------------------------------------------------------
259 """initialization."""
260 super(IgmpQuerier, self).__init__()
261 self.name = "IgmpQuerier"
262 self.logger = logging.getLogger(self.name)
264 self.server_port = None
266 self._datapath = None
267 self._querier_thread = None
269 # the structure of self._macst
271 # +-------+------------------+
272 # | group | port: True/False |
273 # | +------------------+
275 # +-------+------------------+
277 # +--------------------------+
279 # group multicast address.
280 # port a port number which connect to the group member.
281 # the value indicates that whether a flow entry
287 def set_querier_mode(self, dpid, server_port):
288 """set the datapath to work as a querier. note that you can set
289 up only the one querier. when you called this method several
290 times, only the last one becomes effective."""
292 self.server_port = server_port
293 if self._querier_thread:
294 hub.kill(self._querier_thread)
295 self._querier_thread = None
297 def packet_in_handler(self, req_igmp, msg):
298 """the process when the querier received IGMP."""
299 ofproto = msg.datapath.ofproto
300 if ofproto.OFP_VERSION == ofproto_v1_0.OFP_VERSION:
301 in_port = msg.in_port
303 in_port = msg.match['in_port']
304 if (igmp.IGMP_TYPE_REPORT_V1 == req_igmp.msgtype or
305 igmp.IGMP_TYPE_REPORT_V2 == req_igmp.msgtype):
306 self._do_report(req_igmp, in_port, msg)
307 elif igmp.IGMP_TYPE_LEAVE == req_igmp.msgtype:
308 self._do_leave(req_igmp, in_port, msg)
310 def start_loop(self, datapath):
311 """start QUERY thread."""
312 self._datapath = datapath
313 self._querier_thread = hub.spawn(self._send_query)
314 self.logger.info("started a querier.")
317 """stop QUERY thread."""
318 hub.kill(self._querier_thread)
319 self._querier_thread = None
320 self._datapath = None
321 self.logger.info("stopped a querier.")
323 # -------------------------------------------------------------------
324 # PRIVATE METHODS ( RELATED TO IGMP )
325 # -------------------------------------------------------------------
326 def _send_query(self):
327 """ send a QUERY message periodically."""
329 ofproto = self._datapath.ofproto
330 parser = self._datapath.ofproto_parser
331 if ofproto_v1_0.OFP_VERSION == ofproto.OFP_VERSION:
332 send_port = ofproto.OFPP_NONE
334 send_port = ofproto.OFPP_ANY
336 # create a general query.
337 res_igmp = igmp.igmp(
338 msgtype=igmp.IGMP_TYPE_QUERY,
339 maxresp=igmp.QUERY_RESPONSE_INTERVAL * 10,
342 res_ipv4 = ipv4.ipv4(
343 total_length=len(ipv4.ipv4()) + len(res_igmp),
344 proto=inet.IPPROTO_IGMP, ttl=1,
346 dst=igmp.MULTICAST_IP_ALL_HOST)
347 res_ether = ethernet.ethernet(
348 dst=igmp.MULTICAST_MAC_ALL_HOST,
349 src=self._datapath.ports[ofproto.OFPP_LOCAL].hw_addr,
350 ethertype=ether.ETH_TYPE_IP)
351 res_pkt = packet.Packet()
352 res_pkt.add_protocol(res_ether)
353 res_pkt.add_protocol(res_ipv4)
354 res_pkt.add_protocol(res_igmp)
357 flood = [parser.OFPActionOutput(ofproto.OFPP_FLOOD)]
360 # reset reply status.
361 for status in self._mcast.values():
362 for port in status.keys():
365 # send a general query to the host that sent this message.
367 self._datapath, res_pkt.data, send_port, flood)
368 hub.sleep(igmp.QUERY_RESPONSE_INTERVAL)
370 # QUERY timeout expired.
372 for group, status in self._mcast.items():
375 for port in status.keys():
377 del_ports.append(port)
379 actions.append(parser.OFPActionOutput(port))
380 if len(actions) and len(del_ports):
381 self._set_flow_entry(
382 self._datapath, actions, self.server_port, group)
384 self._del_flow_entry(
385 self._datapath, self.server_port, group)
386 del_groups.append(group)
388 for port in del_ports:
389 self._del_flow_entry(self._datapath, port, group)
390 for port in del_ports:
392 for group in del_groups:
393 del self._mcast[group]
395 rest_time = timeout - igmp.QUERY_RESPONSE_INTERVAL
398 def _do_report(self, report, in_port, msg):
399 """the process when the querier received a REPORT message."""
400 datapath = msg.datapath
401 ofproto = datapath.ofproto
402 parser = datapath.ofproto_parser
404 if ofproto.OFP_VERSION == ofproto_v1_0.OFP_VERSION:
407 size = ofproto.OFPCML_MAX
410 self._mcast.setdefault(report.address, {})
411 if in_port not in self._mcast[report.address]:
413 self._mcast[report.address][in_port] = True
417 for port in self._mcast[report.address]:
418 actions.append(parser.OFPActionOutput(port))
419 self._set_flow_entry(
420 datapath, actions, self.server_port, report.address)
421 self._set_flow_entry(
423 [parser.OFPActionOutput(ofproto.OFPP_CONTROLLER, size)],
424 in_port, report.address)
426 def _do_leave(self, leave, in_port, msg):
427 """the process when the querier received a LEAVE message."""
428 datapath = msg.datapath
429 parser = datapath.ofproto_parser
431 self._mcast.setdefault(leave.address, {})
432 if in_port in self._mcast[leave.address]:
433 self._del_flow_entry(
434 datapath, in_port, leave.address)
435 del self._mcast[leave.address][in_port]
437 for port in self._mcast[leave.address]:
438 actions.append(parser.OFPActionOutput(port))
440 self._set_flow_entry(
441 datapath, actions, self.server_port, leave.address)
443 self._del_flow_entry(
444 datapath, self.server_port, leave.address)
446 # -------------------------------------------------------------------
447 # PRIVATE METHODS ( OTHERS )
448 # -------------------------------------------------------------------
449 def _set_logger(self):
450 """change log format."""
451 self.logger.propagate = False
452 hdl = logging.StreamHandler()
453 fmt_str = '[querier][%(levelname)s] %(message)s'
454 hdl.setFormatter(logging.Formatter(fmt_str))
455 self.logger.addHandler(hdl)
458 class IgmpSnooper(IgmpBase):
459 """IGMP snooping class library."""
461 # -------------------------------------------------------------------
463 # -------------------------------------------------------------------
464 def __init__(self, send_event):
465 """initialization."""
466 super(IgmpSnooper, self).__init__()
467 self.name = "IgmpSnooper"
468 self.logger = logging.getLogger(self.name)
469 self._send_event = send_event
471 # the structure of self._to_querier
473 # +------+--------------+
474 # | dpid | 'port': port |
479 # +------+--------------+
481 # +---------------------+
484 # port a port number which connect to the querier.
485 # ip IP address of the querier.
486 # mac MAC address of the querier.
487 self._to_querier = {}
489 # the structure of self._to_hosts
491 # +------+-------+---------------------------------+
492 # | dpid | group | 'replied': True/False |
493 # | | +---------------------------------+
494 # | | | 'leave': leave |
495 # | | +-----------+--------+------------+
496 # | | | 'ports' | portno | 'out': out |
497 # | | | | +------------+
498 # | | | | | 'in': in |
499 # | | | +--------+------------+
501 # | +-------+-----------+---------------------+
503 # +------+-----------------------------------------+
505 # +------------------------------------------------+
508 # group multicast address.
509 # replied the value indicates whether a REPORT message was
511 # leave a LEAVE message.
512 # portno a port number which has joined to the multicast
514 # out the value indicates whether a flow entry for the
515 # packet outputted to the port was registered.
516 # in the value indicates whether a flow entry for the
517 # packet inputted from the port was registered.
522 def packet_in_handler(self, req_pkt, req_igmp, msg):
523 """the process when the snooper received IGMP."""
524 dpid = msg.datapath.id
525 ofproto = msg.datapath.ofproto
526 if ofproto.OFP_VERSION == ofproto_v1_0.OFP_VERSION:
527 in_port = msg.in_port
529 in_port = msg.match['in_port']
531 log = "SW=%s PORT=%d IGMP received. " % (
532 dpid_to_str(dpid), in_port)
533 self.logger.debug(str(req_igmp))
534 if igmp.IGMP_TYPE_QUERY == req_igmp.msgtype:
535 self.logger.info(log + "[QUERY]")
536 (req_ipv4, ) = req_pkt.get_protocols(ipv4.ipv4)
537 (req_eth, ) = req_pkt.get_protocols(ethernet.ethernet)
538 self._do_query(req_igmp, req_ipv4, req_eth, in_port, msg)
539 elif (igmp.IGMP_TYPE_REPORT_V1 == req_igmp.msgtype or
540 igmp.IGMP_TYPE_REPORT_V2 == req_igmp.msgtype):
541 self.logger.info(log + "[REPORT]")
542 self._do_report(req_igmp, in_port, msg)
543 elif igmp.IGMP_TYPE_LEAVE == req_igmp.msgtype:
544 self.logger.info(log + "[LEAVE]")
545 self._do_leave(req_igmp, in_port, msg)
546 elif igmp.IGMP_TYPE_REPORT_V3 == req_igmp.msgtype:
547 self.logger.info(log + "V3 is not supported yet.")
548 self._do_flood(in_port, msg)
550 self.logger.info(log + "[unknown type:%d]",
552 self._do_flood(in_port, msg)
554 # -------------------------------------------------------------------
555 # PRIVATE METHODS ( RELATED TO IGMP )
556 # -------------------------------------------------------------------
557 def _do_query(self, query, iph, eth, in_port, msg):
558 """the process when the snooper received a QUERY message."""
559 datapath = msg.datapath
561 ofproto = datapath.ofproto
562 parser = datapath.ofproto_parser
565 self._to_querier[dpid] = {
571 # set the timeout time.
572 timeout = igmp.QUERY_RESPONSE_INTERVAL
574 timeout = query.maxresp / 10
576 self._to_hosts.setdefault(dpid, {})
577 if query.address == '0.0.0.0':
578 # general query. reset all reply status.
579 for group in self._to_hosts[dpid].values():
580 group['replied'] = False
581 group['leave'] = None
583 # specific query. reset the reply status of the specific
585 group = self._to_hosts[dpid].get(query.address)
587 group['replied'] = False
588 group['leave'] = None
590 actions = [parser.OFPActionOutput(ofproto.OFPP_FLOOD)]
592 datapath, msg.data, in_port, actions)
594 # wait for REPORT messages.
595 hub.spawn(self._do_timeout_for_query, timeout, datapath)
597 def _do_report(self, report, in_port, msg):
598 """the process when the snooper received a REPORT message."""
599 datapath = msg.datapath
601 ofproto = datapath.ofproto
602 parser = datapath.ofproto_parser
604 if ofproto.OFP_VERSION == ofproto_v1_0.OFP_VERSION:
607 size = ofproto.OFPCML_MAX
609 # check whether the querier port has been specified.
611 value = self._to_querier.get(dpid)
613 outport = value['port']
615 # send a event when the multicast group address is new.
616 self._to_hosts.setdefault(dpid, {})
617 if not self._to_hosts[dpid].get(report.address):
619 EventMulticastGroupStateChanged(
620 MG_GROUP_ADDED, report.address, outport, []))
621 self._to_hosts[dpid].setdefault(
623 {'replied': False, 'leave': None, 'ports': {}})
625 # set a flow entry from a host to the controller when
626 # a host sent a REPORT message.
627 if not self._to_hosts[dpid][report.address]['ports'].get(
629 self._to_hosts[dpid][report.address]['ports'][
630 in_port] = {'out': False, 'in': False}
631 self._set_flow_entry(
633 [parser.OFPActionOutput(ofproto.OFPP_CONTROLLER, size)],
634 in_port, report.address)
636 if not self._to_hosts[dpid][report.address]['ports'][
638 self._to_hosts[dpid][report.address]['ports'][
639 in_port]['out'] = True
642 self.logger.info("no querier exists.")
645 # set a flow entry from a multicast server to hosts.
646 if not self._to_hosts[dpid][report.address]['ports'][
650 for port in self._to_hosts[dpid][report.address]['ports']:
651 actions.append(parser.OFPActionOutput(port))
654 EventMulticastGroupStateChanged(
655 MG_MEMBER_CHANGED, report.address, outport, ports))
656 self._set_flow_entry(
657 datapath, actions, outport, report.address)
658 self._to_hosts[dpid][report.address]['ports'][
659 in_port]['in'] = True
661 # send a REPORT message to the querier if this message arrived
662 # first after a QUERY message was sent.
663 if not self._to_hosts[dpid][report.address]['replied']:
664 actions = [parser.OFPActionOutput(outport, size)]
665 self._do_packet_out(datapath, msg.data, in_port, actions)
666 self._to_hosts[dpid][report.address]['replied'] = True
668 def _do_leave(self, leave, in_port, msg):
669 """the process when the snooper received a LEAVE message."""
670 datapath = msg.datapath
672 ofproto = datapath.ofproto
673 parser = datapath.ofproto_parser
675 # check whether the querier port has been specified.
676 if not self._to_querier.get(dpid):
677 self.logger.info("no querier exists.")
680 # save this LEAVE message and reset the condition of the port
681 # that received this message.
682 self._to_hosts.setdefault(dpid, {})
683 self._to_hosts[dpid].setdefault(
685 {'replied': False, 'leave': None, 'ports': {}})
686 self._to_hosts[dpid][leave.address]['leave'] = msg
687 self._to_hosts[dpid][leave.address]['ports'][in_port] = {
688 'out': False, 'in': False}
690 # create a specific query.
691 timeout = igmp.LAST_MEMBER_QUERY_INTERVAL
692 res_igmp = igmp.igmp(
693 msgtype=igmp.IGMP_TYPE_QUERY,
694 maxresp=timeout * 10,
696 address=leave.address)
697 res_ipv4 = ipv4.ipv4(
698 total_length=len(ipv4.ipv4()) + len(res_igmp),
699 proto=inet.IPPROTO_IGMP, ttl=1,
700 src=self._to_querier[dpid]['ip'],
701 dst=igmp.MULTICAST_IP_ALL_HOST)
702 res_ether = ethernet.ethernet(
703 dst=igmp.MULTICAST_MAC_ALL_HOST,
704 src=self._to_querier[dpid]['mac'],
705 ethertype=ether.ETH_TYPE_IP)
706 res_pkt = packet.Packet()
707 res_pkt.add_protocol(res_ether)
708 res_pkt.add_protocol(res_ipv4)
709 res_pkt.add_protocol(res_igmp)
712 # send a specific query to the host that sent this message.
713 actions = [parser.OFPActionOutput(ofproto.OFPP_IN_PORT)]
714 self._do_packet_out(datapath, res_pkt.data, in_port, actions)
716 # wait for REPORT messages.
717 hub.spawn(self._do_timeout_for_leave, timeout, datapath,
718 leave.address, in_port)
720 def _do_flood(self, in_port, msg):
721 """the process when the snooper received a message of the
722 outside for processing. """
723 datapath = msg.datapath
724 ofproto = datapath.ofproto
725 parser = datapath.ofproto_parser
727 actions = [parser.OFPActionOutput(ofproto.OFPP_FLOOD)]
728 self._do_packet_out(datapath, msg.data, in_port, actions)
730 def _do_timeout_for_query(self, timeout, datapath):
731 """the process when the QUERY from the querier timeout expired."""
735 outport = self._to_querier[dpid]['port']
738 for dst in self._to_hosts[dpid]:
739 if not self._to_hosts[dpid][dst]['replied']:
740 # if no REPORT message sent from any members of
741 # the group, remove flow entries about the group and
742 # send a LEAVE message if exists.
743 self._remove_multicast_group(datapath, outport, dst)
744 remove_dsts.append(dst)
746 for dst in remove_dsts:
747 del self._to_hosts[dpid][dst]
749 def _do_timeout_for_leave(self, timeout, datapath, dst, in_port):
750 """the process when the QUERY from the switch timeout expired."""
751 parser = datapath.ofproto_parser
755 outport = self._to_querier[dpid]['port']
757 if self._to_hosts[dpid][dst]['ports'][in_port]['out']:
760 del self._to_hosts[dpid][dst]['ports'][in_port]
761 self._del_flow_entry(datapath, in_port, dst)
764 for port in self._to_hosts[dpid][dst]['ports']:
765 actions.append(parser.OFPActionOutput(port))
770 EventMulticastGroupStateChanged(
771 MG_MEMBER_CHANGED, dst, outport, ports))
772 self._set_flow_entry(
773 datapath, actions, outport, dst)
774 self._to_hosts[dpid][dst]['leave'] = None
776 self._remove_multicast_group(datapath, outport, dst)
777 del self._to_hosts[dpid][dst]
779 def _remove_multicast_group(self, datapath, outport, dst):
780 """remove flow entries about the group and send a LEAVE message
782 ofproto = datapath.ofproto
783 parser = datapath.ofproto_parser
787 EventMulticastGroupStateChanged(
788 MG_GROUP_REMOVED, dst, outport, []))
789 self._del_flow_entry(datapath, outport, dst)
790 for port in self._to_hosts[dpid][dst]['ports']:
791 self._del_flow_entry(datapath, port, dst)
792 leave = self._to_hosts[dpid][dst]['leave']
794 if ofproto.OFP_VERSION == ofproto_v1_0.OFP_VERSION:
795 in_port = leave.in_port
797 in_port = leave.match['in_port']
798 actions = [parser.OFPActionOutput(outport)]
800 datapath, leave.data, in_port, actions)
802 # -------------------------------------------------------------------
803 # PRIVATE METHODS ( OTHERS )
804 # -------------------------------------------------------------------
805 def _set_logger(self):
806 """change log format."""
807 self.logger.propagate = False
808 hdl = logging.StreamHandler()
809 fmt_str = '[snoop][%(levelname)s] %(message)s'
810 hdl.setFormatter(logging.Formatter(fmt_str))
811 self.logger.addHandler(hdl)