backing up
[vsorcdistro/.git] / ryu / build / lib.linux-armv7l-2.7 / ryu / lib / igmplib.py
1 # Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #    http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 import logging
17 import struct
18
19 from ryu.base import app_manager
20 from ryu.controller import event
21 from ryu.controller import ofp_event
22 from ryu.controller.handler import DEAD_DISPATCHER
23 from ryu.controller.handler import MAIN_DISPATCHER
24 from ryu.controller.handler import set_ev_cls
25 from ryu.ofproto import ether
26 from ryu.ofproto import inet
27 from ryu.ofproto import ofproto_v1_0
28 from ryu.ofproto import ofproto_v1_2
29 from ryu.ofproto import ofproto_v1_3
30 from ryu.lib import addrconv
31 from ryu.lib import hub
32 from ryu.lib.dpid import dpid_to_str
33 from ryu.lib.packet import packet
34 from ryu.lib.packet import ethernet
35 from ryu.lib.packet import ipv4
36 from ryu.lib.packet import igmp
37
38
39 class EventPacketIn(event.EventBase):
40     """a PacketIn event class using except IGMP."""
41
42     def __init__(self, msg):
43         """initialization."""
44         super(EventPacketIn, self).__init__()
45         self.msg = msg
46
47
48 MG_GROUP_ADDED = 1
49 MG_MEMBER_CHANGED = 2
50 MG_GROUP_REMOVED = 3
51
52
53 class EventMulticastGroupStateChanged(event.EventBase):
54     """a event class that notifies the changes of the statuses of the
55     multicast groups."""
56
57     def __init__(self, reason, address, src, dsts):
58         """
59         ========= =====================================================
60         Attribute Description
61         ========= =====================================================
62         reason    why the event occurs. use one of MG_*.
63         address   a multicast group address.
64         src       a port number in which a querier exists.
65         dsts      a list of port numbers in which the members exist.
66         ========= =====================================================
67         """
68         super(EventMulticastGroupStateChanged, self).__init__()
69         self.reason = reason
70         self.address = address
71         self.src = src
72         self.dsts = dsts
73
74
75 class IgmpLib(app_manager.RyuApp):
76     """IGMP snooping library."""
77
78     # -------------------------------------------------------------------
79     # PUBLIC METHODS
80     # -------------------------------------------------------------------
81     def __init__(self):
82         """initialization."""
83         super(IgmpLib, self).__init__()
84         self.name = 'igmplib'
85         self._querier = IgmpQuerier()
86         self._snooper = IgmpSnooper(self.send_event_to_observers)
87
88     def set_querier_mode(self, dpid, server_port):
89         """set a datapath id and server port number to the instance
90         of IgmpQuerier.
91
92         ============ ==================================================
93         Attribute    Description
94         ============ ==================================================
95         dpid         the datapath id that will operate as a querier.
96         server_port  the port number linked to the multicasting server.
97         ============ ==================================================
98         """
99         self._querier.set_querier_mode(dpid, server_port)
100
101     # -------------------------------------------------------------------
102     # PUBLIC METHODS ( EVENT HANDLERS )
103     # -------------------------------------------------------------------
104     @set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER)
105     def packet_in_handler(self, evt):
106         """PacketIn event handler. when the received packet was IGMP,
107         proceed it. otherwise, send a event."""
108         msg = evt.msg
109         dpid = msg.datapath.id
110
111         req_pkt = packet.Packet(msg.data)
112         req_igmp = req_pkt.get_protocol(igmp.igmp)
113         if req_igmp:
114             if self._querier.dpid == dpid:
115                 self._querier.packet_in_handler(req_igmp, msg)
116             else:
117                 self._snooper.packet_in_handler(req_pkt, req_igmp, msg)
118         else:
119             self.send_event_to_observers(EventPacketIn(msg))
120
121     @set_ev_cls(ofp_event.EventOFPStateChange,
122                 [MAIN_DISPATCHER, DEAD_DISPATCHER])
123     def state_change_handler(self, evt):
124         """StateChange event handler."""
125         datapath = evt.datapath
126         assert datapath is not None
127         if datapath.id == self._querier.dpid:
128             if evt.state == MAIN_DISPATCHER:
129                 self._querier.start_loop(datapath)
130             elif evt.state == DEAD_DISPATCHER:
131                 self._querier.stop_loop()
132
133
134 class IgmpBase(object):
135     """IGMP abstract class library."""
136
137     # -------------------------------------------------------------------
138     # PUBLIC METHODS
139     # -------------------------------------------------------------------
140     def __init__(self):
141         self._set_flow_func = {
142             ofproto_v1_0.OFP_VERSION: self._set_flow_entry_v1_0,
143             ofproto_v1_2.OFP_VERSION: self._set_flow_entry_v1_2,
144             ofproto_v1_3.OFP_VERSION: self._set_flow_entry_v1_2,
145         }
146         self._del_flow_func = {
147             ofproto_v1_0.OFP_VERSION: self._del_flow_entry_v1_0,
148             ofproto_v1_2.OFP_VERSION: self._del_flow_entry_v1_2,
149             ofproto_v1_3.OFP_VERSION: self._del_flow_entry_v1_2,
150         }
151
152     # -------------------------------------------------------------------
153     # PROTECTED METHODS ( RELATED TO OPEN FLOW PROTOCOL )
154     # -------------------------------------------------------------------
155     def _set_flow_entry_v1_0(self, datapath, actions, in_port, dst,
156                              src=None):
157         ofproto = datapath.ofproto
158         parser = datapath.ofproto_parser
159
160         match = parser.OFPMatch(
161             dl_type=ether.ETH_TYPE_IP, in_port=in_port,
162             nw_src=self._ipv4_text_to_int(src),
163             nw_dst=self._ipv4_text_to_int(dst))
164         mod = parser.OFPFlowMod(
165             datapath=datapath, match=match, cookie=0,
166             command=ofproto.OFPFC_ADD, actions=actions)
167         datapath.send_msg(mod)
168
169     def _set_flow_entry_v1_2(self, datapath, actions, in_port, dst,
170                              src=None):
171         ofproto = datapath.ofproto
172         parser = datapath.ofproto_parser
173
174         match = parser.OFPMatch(
175             eth_type=ether.ETH_TYPE_IP, in_port=in_port, ipv4_dst=dst)
176         if src is not None:
177             match.append_field(ofproto.OXM_OF_IPV4_SRC, src)
178         inst = [parser.OFPInstructionActions(
179             ofproto.OFPIT_APPLY_ACTIONS, actions)]
180         mod = parser.OFPFlowMod(
181             datapath=datapath, command=ofproto.OFPFC_ADD,
182             priority=65535, match=match, instructions=inst)
183         datapath.send_msg(mod)
184
185     def _set_flow_entry(self, datapath, actions, in_port, dst, src=None):
186         """set a flow entry."""
187         set_flow = self._set_flow_func.get(datapath.ofproto.OFP_VERSION)
188         assert set_flow
189         set_flow(datapath, actions, in_port, dst, src)
190
191     def _del_flow_entry_v1_0(self, datapath, in_port, dst, src=None):
192         ofproto = datapath.ofproto
193         parser = datapath.ofproto_parser
194
195         match = parser.OFPMatch(
196             dl_type=ether.ETH_TYPE_IP, in_port=in_port,
197             nw_src=self._ipv4_text_to_int(src),
198             nw_dst=self._ipv4_text_to_int(dst))
199         mod = parser.OFPFlowMod(
200             datapath=datapath, match=match, cookie=0,
201             command=ofproto.OFPFC_DELETE)
202         datapath.send_msg(mod)
203
204     def _del_flow_entry_v1_2(self, datapath, in_port, dst, src=None):
205         ofproto = datapath.ofproto
206         parser = datapath.ofproto_parser
207
208         match = parser.OFPMatch(
209             eth_type=ether.ETH_TYPE_IP, in_port=in_port, ipv4_dst=dst)
210         if src is not None:
211             match.append_field(ofproto.OXM_OF_IPV4_SRC, src)
212         mod = parser.OFPFlowMod(
213             datapath=datapath, command=ofproto.OFPFC_DELETE,
214             out_port=ofproto.OFPP_ANY, out_group=ofproto.OFPG_ANY,
215             match=match)
216         datapath.send_msg(mod)
217
218     def _del_flow_entry(self, datapath, in_port, dst, src=None):
219         """remove a flow entry."""
220         del_flow = self._del_flow_func.get(datapath.ofproto.OFP_VERSION)
221         assert del_flow
222         del_flow(datapath, in_port, dst, src)
223
224     def _do_packet_out(self, datapath, data, in_port, actions):
225         """send a packet."""
226         ofproto = datapath.ofproto
227         parser = datapath.ofproto_parser
228
229         out = parser.OFPPacketOut(
230             datapath=datapath, buffer_id=ofproto.OFP_NO_BUFFER,
231             data=data, in_port=in_port, actions=actions)
232         datapath.send_msg(out)
233
234     # -------------------------------------------------------------------
235     # PROTECTED METHODS ( OTHERS )
236     # -------------------------------------------------------------------
237     def _ipv4_text_to_int(self, ip_text):
238         """convert ip v4 string to integer."""
239         if ip_text is None:
240             return None
241         assert isinstance(ip_text, str)
242         return struct.unpack('!I', addrconv.ipv4.text_to_bin(ip_text))[0]
243
244
245 class IgmpQuerier(IgmpBase):
246     """IGMP querier emulation class library.
247
248     this querier is a simplified implementation, and is not based on RFC,
249     for example as following points:
250     - ignore some constant values
251     - does not send a specific QUERY in response to LEAVE
252     - and so on
253     """
254
255     # -------------------------------------------------------------------
256     # PUBLIC METHODS
257     # -------------------------------------------------------------------
258     def __init__(self):
259         """initialization."""
260         super(IgmpQuerier, self).__init__()
261         self.name = "IgmpQuerier"
262         self.logger = logging.getLogger(self.name)
263         self.dpid = None
264         self.server_port = None
265
266         self._datapath = None
267         self._querier_thread = None
268
269         # the structure of self._macst
270         #
271         # +-------+------------------+
272         # | group | port: True/False |
273         # |       +------------------+
274         # |       |...               |
275         # +-------+------------------+
276         # | ...                      |
277         # +--------------------------+
278         #
279         # group       multicast address.
280         # port        a port number which connect to the group member.
281         #             the value indicates that whether a flow entry
282         #             was registered.
283         self._mcast = {}
284
285         self._set_logger()
286
287     def set_querier_mode(self, dpid, server_port):
288         """set the datapath to work as a querier. note that you can set
289         up only the one querier. when you called this method several
290         times, only the last one becomes effective."""
291         self.dpid = dpid
292         self.server_port = server_port
293         if self._querier_thread:
294             hub.kill(self._querier_thread)
295             self._querier_thread = None
296
297     def packet_in_handler(self, req_igmp, msg):
298         """the process when the querier received IGMP."""
299         ofproto = msg.datapath.ofproto
300         if ofproto.OFP_VERSION == ofproto_v1_0.OFP_VERSION:
301             in_port = msg.in_port
302         else:
303             in_port = msg.match['in_port']
304         if (igmp.IGMP_TYPE_REPORT_V1 == req_igmp.msgtype or
305                 igmp.IGMP_TYPE_REPORT_V2 == req_igmp.msgtype):
306             self._do_report(req_igmp, in_port, msg)
307         elif igmp.IGMP_TYPE_LEAVE == req_igmp.msgtype:
308             self._do_leave(req_igmp, in_port, msg)
309
310     def start_loop(self, datapath):
311         """start QUERY thread."""
312         self._datapath = datapath
313         self._querier_thread = hub.spawn(self._send_query)
314         self.logger.info("started a querier.")
315
316     def stop_loop(self):
317         """stop QUERY thread."""
318         hub.kill(self._querier_thread)
319         self._querier_thread = None
320         self._datapath = None
321         self.logger.info("stopped a querier.")
322
323     # -------------------------------------------------------------------
324     # PRIVATE METHODS ( RELATED TO IGMP )
325     # -------------------------------------------------------------------
326     def _send_query(self):
327         """ send a QUERY message periodically."""
328         timeout = 60
329         ofproto = self._datapath.ofproto
330         parser = self._datapath.ofproto_parser
331         if ofproto_v1_0.OFP_VERSION == ofproto.OFP_VERSION:
332             send_port = ofproto.OFPP_NONE
333         else:
334             send_port = ofproto.OFPP_ANY
335
336         # create a general query.
337         res_igmp = igmp.igmp(
338             msgtype=igmp.IGMP_TYPE_QUERY,
339             maxresp=igmp.QUERY_RESPONSE_INTERVAL * 10,
340             csum=0,
341             address='0.0.0.0')
342         res_ipv4 = ipv4.ipv4(
343             total_length=len(ipv4.ipv4()) + len(res_igmp),
344             proto=inet.IPPROTO_IGMP, ttl=1,
345             src='0.0.0.0',
346             dst=igmp.MULTICAST_IP_ALL_HOST)
347         res_ether = ethernet.ethernet(
348             dst=igmp.MULTICAST_MAC_ALL_HOST,
349             src=self._datapath.ports[ofproto.OFPP_LOCAL].hw_addr,
350             ethertype=ether.ETH_TYPE_IP)
351         res_pkt = packet.Packet()
352         res_pkt.add_protocol(res_ether)
353         res_pkt.add_protocol(res_ipv4)
354         res_pkt.add_protocol(res_igmp)
355         res_pkt.serialize()
356
357         flood = [parser.OFPActionOutput(ofproto.OFPP_FLOOD)]
358
359         while True:
360             # reset reply status.
361             for status in self._mcast.values():
362                 for port in status.keys():
363                     status[port] = False
364
365             # send a general query to the host that sent this message.
366             self._do_packet_out(
367                 self._datapath, res_pkt.data, send_port, flood)
368             hub.sleep(igmp.QUERY_RESPONSE_INTERVAL)
369
370             # QUERY timeout expired.
371             del_groups = []
372             for group, status in self._mcast.items():
373                 del_ports = []
374                 actions = []
375                 for port in status.keys():
376                     if not status[port]:
377                         del_ports.append(port)
378                     else:
379                         actions.append(parser.OFPActionOutput(port))
380                 if len(actions) and len(del_ports):
381                     self._set_flow_entry(
382                         self._datapath, actions, self.server_port, group)
383                 if not len(actions):
384                     self._del_flow_entry(
385                         self._datapath, self.server_port, group)
386                     del_groups.append(group)
387                 if len(del_ports):
388                     for port in del_ports:
389                         self._del_flow_entry(self._datapath, port, group)
390                 for port in del_ports:
391                     del status[port]
392             for group in del_groups:
393                 del self._mcast[group]
394
395             rest_time = timeout - igmp.QUERY_RESPONSE_INTERVAL
396             hub.sleep(rest_time)
397
398     def _do_report(self, report, in_port, msg):
399         """the process when the querier received a REPORT message."""
400         datapath = msg.datapath
401         ofproto = datapath.ofproto
402         parser = datapath.ofproto_parser
403
404         if ofproto.OFP_VERSION == ofproto_v1_0.OFP_VERSION:
405             size = 65535
406         else:
407             size = ofproto.OFPCML_MAX
408
409         update = False
410         self._mcast.setdefault(report.address, {})
411         if in_port not in self._mcast[report.address]:
412             update = True
413         self._mcast[report.address][in_port] = True
414
415         if update:
416             actions = []
417             for port in self._mcast[report.address]:
418                 actions.append(parser.OFPActionOutput(port))
419             self._set_flow_entry(
420                 datapath, actions, self.server_port, report.address)
421             self._set_flow_entry(
422                 datapath,
423                 [parser.OFPActionOutput(ofproto.OFPP_CONTROLLER, size)],
424                 in_port, report.address)
425
426     def _do_leave(self, leave, in_port, msg):
427         """the process when the querier received a LEAVE message."""
428         datapath = msg.datapath
429         parser = datapath.ofproto_parser
430
431         self._mcast.setdefault(leave.address, {})
432         if in_port in self._mcast[leave.address]:
433             self._del_flow_entry(
434                 datapath, in_port, leave.address)
435             del self._mcast[leave.address][in_port]
436             actions = []
437             for port in self._mcast[leave.address]:
438                 actions.append(parser.OFPActionOutput(port))
439             if len(actions):
440                 self._set_flow_entry(
441                     datapath, actions, self.server_port, leave.address)
442             else:
443                 self._del_flow_entry(
444                     datapath, self.server_port, leave.address)
445
446     # -------------------------------------------------------------------
447     # PRIVATE METHODS ( OTHERS )
448     # -------------------------------------------------------------------
449     def _set_logger(self):
450         """change log format."""
451         self.logger.propagate = False
452         hdl = logging.StreamHandler()
453         fmt_str = '[querier][%(levelname)s] %(message)s'
454         hdl.setFormatter(logging.Formatter(fmt_str))
455         self.logger.addHandler(hdl)
456
457
458 class IgmpSnooper(IgmpBase):
459     """IGMP snooping class library."""
460
461     # -------------------------------------------------------------------
462     # PUBLIC METHODS
463     # -------------------------------------------------------------------
464     def __init__(self, send_event):
465         """initialization."""
466         super(IgmpSnooper, self).__init__()
467         self.name = "IgmpSnooper"
468         self.logger = logging.getLogger(self.name)
469         self._send_event = send_event
470
471         # the structure of self._to_querier
472         #
473         # +------+--------------+
474         # | dpid | 'port': port |
475         # |      +--------------+
476         # |      | 'ip': ip     |
477         # |      +--------------+
478         # |      | 'mac': mac   |
479         # +------+--------------+
480         # | ...                 |
481         # +---------------------+
482         #
483         # dpid        datapath id.
484         # port        a port number which connect to the querier.
485         # ip          IP address of the querier.
486         # mac         MAC address of the querier.
487         self._to_querier = {}
488
489         # the structure of self._to_hosts
490         #
491         # +------+-------+---------------------------------+
492         # | dpid | group | 'replied': True/False           |
493         # |      |       +---------------------------------+
494         # |      |       | 'leave': leave                  |
495         # |      |       +-----------+--------+------------+
496         # |      |       | 'ports'   | portno | 'out': out |
497         # |      |       |           |        +------------+
498         # |      |       |           |        | 'in': in   |
499         # |      |       |           +--------+------------+
500         # |      |       |           | ...                 |
501         # |      +-------+-----------+---------------------+
502         # |      | ...                                     |
503         # +------+-----------------------------------------+
504         # | ...                                            |
505         # +------------------------------------------------+
506         #
507         # dpid        datapath id.
508         # group       multicast address.
509         # replied     the value indicates whether a REPORT message was
510         #             replied.
511         # leave       a LEAVE message.
512         # portno      a port number which has joined to the multicast
513         #             group.
514         # out         the value indicates whether a flow entry for the
515         #             packet outputted to the port was registered.
516         # in          the value indicates whether a flow entry for the
517         #             packet inputted from the port was registered.
518         self._to_hosts = {}
519
520         self._set_logger()
521
522     def packet_in_handler(self, req_pkt, req_igmp, msg):
523         """the process when the snooper received IGMP."""
524         dpid = msg.datapath.id
525         ofproto = msg.datapath.ofproto
526         if ofproto.OFP_VERSION == ofproto_v1_0.OFP_VERSION:
527             in_port = msg.in_port
528         else:
529             in_port = msg.match['in_port']
530
531         log = "SW=%s PORT=%d IGMP received. " % (
532             dpid_to_str(dpid), in_port)
533         self.logger.debug(str(req_igmp))
534         if igmp.IGMP_TYPE_QUERY == req_igmp.msgtype:
535             self.logger.info(log + "[QUERY]")
536             (req_ipv4, ) = req_pkt.get_protocols(ipv4.ipv4)
537             (req_eth, ) = req_pkt.get_protocols(ethernet.ethernet)
538             self._do_query(req_igmp, req_ipv4, req_eth, in_port, msg)
539         elif (igmp.IGMP_TYPE_REPORT_V1 == req_igmp.msgtype or
540               igmp.IGMP_TYPE_REPORT_V2 == req_igmp.msgtype):
541             self.logger.info(log + "[REPORT]")
542             self._do_report(req_igmp, in_port, msg)
543         elif igmp.IGMP_TYPE_LEAVE == req_igmp.msgtype:
544             self.logger.info(log + "[LEAVE]")
545             self._do_leave(req_igmp, in_port, msg)
546         elif igmp.IGMP_TYPE_REPORT_V3 == req_igmp.msgtype:
547             self.logger.info(log + "V3 is not supported yet.")
548             self._do_flood(in_port, msg)
549         else:
550             self.logger.info(log + "[unknown type:%d]",
551                              req_igmp.msgtype)
552             self._do_flood(in_port, msg)
553
554     # -------------------------------------------------------------------
555     # PRIVATE METHODS ( RELATED TO IGMP )
556     # -------------------------------------------------------------------
557     def _do_query(self, query, iph, eth, in_port, msg):
558         """the process when the snooper received a QUERY message."""
559         datapath = msg.datapath
560         dpid = datapath.id
561         ofproto = datapath.ofproto
562         parser = datapath.ofproto_parser
563
564         # learn the querier.
565         self._to_querier[dpid] = {
566             'port': in_port,
567             'ip': iph.src,
568             'mac': eth.src
569         }
570
571         # set the timeout time.
572         timeout = igmp.QUERY_RESPONSE_INTERVAL
573         if query.maxresp:
574             timeout = query.maxresp / 10
575
576         self._to_hosts.setdefault(dpid, {})
577         if query.address == '0.0.0.0':
578             # general query. reset all reply status.
579             for group in self._to_hosts[dpid].values():
580                 group['replied'] = False
581                 group['leave'] = None
582         else:
583             # specific query. reset the reply status of the specific
584             # group.
585             group = self._to_hosts[dpid].get(query.address)
586             if group:
587                 group['replied'] = False
588                 group['leave'] = None
589
590         actions = [parser.OFPActionOutput(ofproto.OFPP_FLOOD)]
591         self._do_packet_out(
592             datapath, msg.data, in_port, actions)
593
594         # wait for REPORT messages.
595         hub.spawn(self._do_timeout_for_query, timeout, datapath)
596
597     def _do_report(self, report, in_port, msg):
598         """the process when the snooper received a REPORT message."""
599         datapath = msg.datapath
600         dpid = datapath.id
601         ofproto = datapath.ofproto
602         parser = datapath.ofproto_parser
603
604         if ofproto.OFP_VERSION == ofproto_v1_0.OFP_VERSION:
605             size = 65535
606         else:
607             size = ofproto.OFPCML_MAX
608
609         # check whether the querier port has been specified.
610         outport = None
611         value = self._to_querier.get(dpid)
612         if value:
613             outport = value['port']
614
615         # send a event when the multicast group address is new.
616         self._to_hosts.setdefault(dpid, {})
617         if not self._to_hosts[dpid].get(report.address):
618             self._send_event(
619                 EventMulticastGroupStateChanged(
620                     MG_GROUP_ADDED, report.address, outport, []))
621             self._to_hosts[dpid].setdefault(
622                 report.address,
623                 {'replied': False, 'leave': None, 'ports': {}})
624
625         # set a flow entry from a host to the controller when
626         # a host sent a REPORT message.
627         if not self._to_hosts[dpid][report.address]['ports'].get(
628                 in_port):
629             self._to_hosts[dpid][report.address]['ports'][
630                 in_port] = {'out': False, 'in': False}
631             self._set_flow_entry(
632                 datapath,
633                 [parser.OFPActionOutput(ofproto.OFPP_CONTROLLER, size)],
634                 in_port, report.address)
635
636         if not self._to_hosts[dpid][report.address]['ports'][
637                 in_port]['out']:
638             self._to_hosts[dpid][report.address]['ports'][
639                 in_port]['out'] = True
640
641         if not outport:
642             self.logger.info("no querier exists.")
643             return
644
645         # set a flow entry from a multicast server to hosts.
646         if not self._to_hosts[dpid][report.address]['ports'][
647                 in_port]['in']:
648             actions = []
649             ports = []
650             for port in self._to_hosts[dpid][report.address]['ports']:
651                 actions.append(parser.OFPActionOutput(port))
652                 ports.append(port)
653             self._send_event(
654                 EventMulticastGroupStateChanged(
655                     MG_MEMBER_CHANGED, report.address, outport, ports))
656             self._set_flow_entry(
657                 datapath, actions, outport, report.address)
658             self._to_hosts[dpid][report.address]['ports'][
659                 in_port]['in'] = True
660
661         # send a REPORT message to the querier if this message arrived
662         # first after a QUERY message was sent.
663         if not self._to_hosts[dpid][report.address]['replied']:
664             actions = [parser.OFPActionOutput(outport, size)]
665             self._do_packet_out(datapath, msg.data, in_port, actions)
666             self._to_hosts[dpid][report.address]['replied'] = True
667
668     def _do_leave(self, leave, in_port, msg):
669         """the process when the snooper received a LEAVE message."""
670         datapath = msg.datapath
671         dpid = datapath.id
672         ofproto = datapath.ofproto
673         parser = datapath.ofproto_parser
674
675         # check whether the querier port has been specified.
676         if not self._to_querier.get(dpid):
677             self.logger.info("no querier exists.")
678             return
679
680         # save this LEAVE message and reset the condition of the port
681         # that received this message.
682         self._to_hosts.setdefault(dpid, {})
683         self._to_hosts[dpid].setdefault(
684             leave.address,
685             {'replied': False, 'leave': None, 'ports': {}})
686         self._to_hosts[dpid][leave.address]['leave'] = msg
687         self._to_hosts[dpid][leave.address]['ports'][in_port] = {
688             'out': False, 'in': False}
689
690         # create a specific query.
691         timeout = igmp.LAST_MEMBER_QUERY_INTERVAL
692         res_igmp = igmp.igmp(
693             msgtype=igmp.IGMP_TYPE_QUERY,
694             maxresp=timeout * 10,
695             csum=0,
696             address=leave.address)
697         res_ipv4 = ipv4.ipv4(
698             total_length=len(ipv4.ipv4()) + len(res_igmp),
699             proto=inet.IPPROTO_IGMP, ttl=1,
700             src=self._to_querier[dpid]['ip'],
701             dst=igmp.MULTICAST_IP_ALL_HOST)
702         res_ether = ethernet.ethernet(
703             dst=igmp.MULTICAST_MAC_ALL_HOST,
704             src=self._to_querier[dpid]['mac'],
705             ethertype=ether.ETH_TYPE_IP)
706         res_pkt = packet.Packet()
707         res_pkt.add_protocol(res_ether)
708         res_pkt.add_protocol(res_ipv4)
709         res_pkt.add_protocol(res_igmp)
710         res_pkt.serialize()
711
712         # send a specific query to the host that sent this message.
713         actions = [parser.OFPActionOutput(ofproto.OFPP_IN_PORT)]
714         self._do_packet_out(datapath, res_pkt.data, in_port, actions)
715
716         # wait for REPORT messages.
717         hub.spawn(self._do_timeout_for_leave, timeout, datapath,
718                   leave.address, in_port)
719
720     def _do_flood(self, in_port, msg):
721         """the process when the snooper received a message of the
722         outside for processing. """
723         datapath = msg.datapath
724         ofproto = datapath.ofproto
725         parser = datapath.ofproto_parser
726
727         actions = [parser.OFPActionOutput(ofproto.OFPP_FLOOD)]
728         self._do_packet_out(datapath, msg.data, in_port, actions)
729
730     def _do_timeout_for_query(self, timeout, datapath):
731         """the process when the QUERY from the querier timeout expired."""
732         dpid = datapath.id
733
734         hub.sleep(timeout)
735         outport = self._to_querier[dpid]['port']
736
737         remove_dsts = []
738         for dst in self._to_hosts[dpid]:
739             if not self._to_hosts[dpid][dst]['replied']:
740                 # if no REPORT message sent from any members of
741                 # the group, remove flow entries about the group and
742                 # send a LEAVE message if exists.
743                 self._remove_multicast_group(datapath, outport, dst)
744                 remove_dsts.append(dst)
745
746         for dst in remove_dsts:
747             del self._to_hosts[dpid][dst]
748
749     def _do_timeout_for_leave(self, timeout, datapath, dst, in_port):
750         """the process when the QUERY from the switch timeout expired."""
751         parser = datapath.ofproto_parser
752         dpid = datapath.id
753
754         hub.sleep(timeout)
755         outport = self._to_querier[dpid]['port']
756
757         if self._to_hosts[dpid][dst]['ports'][in_port]['out']:
758             return
759
760         del self._to_hosts[dpid][dst]['ports'][in_port]
761         self._del_flow_entry(datapath, in_port, dst)
762         actions = []
763         ports = []
764         for port in self._to_hosts[dpid][dst]['ports']:
765             actions.append(parser.OFPActionOutput(port))
766             ports.append(port)
767
768         if len(actions):
769             self._send_event(
770                 EventMulticastGroupStateChanged(
771                     MG_MEMBER_CHANGED, dst, outport, ports))
772             self._set_flow_entry(
773                 datapath, actions, outport, dst)
774             self._to_hosts[dpid][dst]['leave'] = None
775         else:
776             self._remove_multicast_group(datapath, outport, dst)
777             del self._to_hosts[dpid][dst]
778
779     def _remove_multicast_group(self, datapath, outport, dst):
780         """remove flow entries about the group and send a LEAVE message
781         if exists."""
782         ofproto = datapath.ofproto
783         parser = datapath.ofproto_parser
784         dpid = datapath.id
785
786         self._send_event(
787             EventMulticastGroupStateChanged(
788                 MG_GROUP_REMOVED, dst, outport, []))
789         self._del_flow_entry(datapath, outport, dst)
790         for port in self._to_hosts[dpid][dst]['ports']:
791             self._del_flow_entry(datapath, port, dst)
792         leave = self._to_hosts[dpid][dst]['leave']
793         if leave:
794             if ofproto.OFP_VERSION == ofproto_v1_0.OFP_VERSION:
795                 in_port = leave.in_port
796             else:
797                 in_port = leave.match['in_port']
798             actions = [parser.OFPActionOutput(outport)]
799             self._do_packet_out(
800                 datapath, leave.data, in_port, actions)
801
802     # -------------------------------------------------------------------
803     # PRIVATE METHODS ( OTHERS )
804     # -------------------------------------------------------------------
805     def _set_logger(self):
806         """change log format."""
807         self.logger.propagate = False
808         hdl = logging.StreamHandler()
809         fmt_str = '[snoop][%(levelname)s] %(message)s'
810         hdl.setFormatter(logging.Formatter(fmt_str))
811         self.logger.addHandler(hdl)