1 # Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 # vim: tabstop=4 shiftwidth=4 softtabstop=4
23 from struct import pack, unpack_from, pack_into
24 from nose.tools import ok_, eq_, raises
25 from ryu.ofproto import ether
26 from ryu.ofproto import inet
27 from ryu.lib.packet.ethernet import ethernet
28 from ryu.lib.packet.ipv4 import ipv4
29 from ryu.lib.packet.packet import Packet
30 from ryu.lib.packet.packet_utils import checksum
31 from ryu.lib import addrconv
32 from ryu.lib.packet.igmp import igmp
33 from ryu.lib.packet.igmp import igmpv3_query
34 from ryu.lib.packet.igmp import igmpv3_report
35 from ryu.lib.packet.igmp import igmpv3_report_group
36 from ryu.lib.packet.igmp import IGMP_TYPE_QUERY
37 from ryu.lib.packet.igmp import IGMP_TYPE_REPORT_V3
38 from ryu.lib.packet.igmp import MODE_IS_INCLUDE
40 LOG = logging.getLogger(__name__)
43 class Test_igmp(unittest.TestCase):
44 """ Test case for Internet Group Management Protocol
48 self.msgtype = IGMP_TYPE_QUERY
51 self.address = '225.0.0.1'
53 self.buf = pack(igmp._PACK_STR, self.msgtype, self.maxresp,
55 addrconv.ipv4.text_to_bin(self.address))
57 self.g = igmp(self.msgtype, self.maxresp, self.csum,
63 def find_protocol(self, pkt, name):
64 for p in pkt.protocols:
65 if p.protocol_name == name:
69 eq_(self.msgtype, self.g.msgtype)
70 eq_(self.maxresp, self.g.maxresp)
71 eq_(self.csum, self.g.csum)
72 eq_(self.address, self.g.address)
74 def test_parser(self):
75 _res = self.g.parser(self.buf)
76 if type(_res) is tuple:
81 eq_(res.msgtype, self.msgtype)
82 eq_(res.maxresp, self.maxresp)
83 eq_(res.csum, self.csum)
84 eq_(res.address, self.address)
86 def test_serialize(self):
89 buf = self.g.serialize(data, prev)
91 res = unpack_from(igmp._PACK_STR, six.binary_type(buf))
93 eq_(res[0], self.msgtype)
94 eq_(res[1], self.maxresp)
95 eq_(res[2], checksum(self.buf))
96 eq_(res[3], addrconv.ipv4.text_to_bin(self.address))
98 def _build_igmp(self):
99 dl_dst = '11:22:33:44:55:66'
100 dl_src = 'aa:bb:cc:dd:ee:ff'
101 dl_type = ether.ETH_TYPE_IP
102 e = ethernet(dl_dst, dl_src, dl_type)
104 total_length = 20 + igmp._MIN_LEN
105 nw_proto = inet.IPPROTO_IGMP
106 nw_dst = '11.22.33.44'
107 nw_src = '55.66.77.88'
108 i = ipv4(total_length=total_length, src=nw_src, dst=nw_dst,
115 p.add_protocol(self.g)
119 def test_build_igmp(self):
120 p = self._build_igmp()
122 e = self.find_protocol(p, "ethernet")
124 eq_(e.ethertype, ether.ETH_TYPE_IP)
126 i = self.find_protocol(p, "ipv4")
128 eq_(i.proto, inet.IPPROTO_IGMP)
130 g = self.find_protocol(p, "igmp")
133 eq_(g.msgtype, self.msgtype)
134 eq_(g.maxresp, self.maxresp)
135 eq_(g.csum, checksum(self.buf))
136 eq_(g.address, self.address)
138 def test_to_string(self):
139 igmp_values = {'msgtype': repr(self.msgtype),
140 'maxresp': repr(self.maxresp),
141 'csum': repr(self.csum),
142 'address': repr(self.address)}
143 _g_str = ','.join(['%s=%s' % (k, igmp_values[k])
144 for k, v in inspect.getmembers(self.g)
145 if k in igmp_values])
146 g_str = '%s(%s)' % (igmp.__name__, _g_str)
148 eq_(str(self.g), g_str)
149 eq_(repr(self.g), g_str)
152 def test_malformed_igmp(self):
153 m_short_buf = self.buf[1:igmp._MIN_LEN]
154 igmp.parser(m_short_buf)
156 def test_default_args(self):
158 buf = ig.serialize(bytearray(), None)
159 res = unpack_from(igmp._PACK_STR, six.binary_type(buf))
163 eq_(res[3], addrconv.ipv4.text_to_bin('0.0.0.0'))
166 jsondict = self.g.to_jsondict()
167 g = igmp.from_jsondict(jsondict['igmp'])
168 eq_(str(self.g), str(g))
171 class Test_igmpv3_query(unittest.TestCase):
172 """ Test case for Internet Group Management Protocol v3
173 Membership Query Message"""
176 self.msgtype = IGMP_TYPE_QUERY
179 self.address = '225.0.0.1'
186 self.s_qrv = self.s_flg << 3 | self.qrv
188 self.buf = pack(igmpv3_query._PACK_STR, self.msgtype,
189 self.maxresp, self.csum,
190 addrconv.ipv4.text_to_bin(self.address),
191 self.s_qrv, self.qqic, self.num)
193 self.g = igmpv3_query(
194 self.msgtype, self.maxresp, self.csum, self.address,
195 self.s_flg, self.qrv, self.qqic, self.num, self.srcs)
197 def setUp_with_srcs(self):
198 self.srcs = ['192.168.1.1', '192.168.1.2', '192.168.1.3']
199 self.num = len(self.srcs)
200 self.buf = pack(igmpv3_query._PACK_STR, self.msgtype,
201 self.maxresp, self.csum,
202 addrconv.ipv4.text_to_bin(self.address),
203 self.s_qrv, self.qqic, self.num)
204 for src in self.srcs:
205 self.buf += pack('4s', addrconv.ipv4.text_to_bin(src))
206 self.g = igmpv3_query(
207 self.msgtype, self.maxresp, self.csum, self.address,
208 self.s_flg, self.qrv, self.qqic, self.num, self.srcs)
213 def find_protocol(self, pkt, name):
214 for p in pkt.protocols:
215 if p.protocol_name == name:
219 eq_(self.msgtype, self.g.msgtype)
220 eq_(self.maxresp, self.g.maxresp)
221 eq_(self.csum, self.g.csum)
222 eq_(self.address, self.g.address)
223 eq_(self.s_flg, self.g.s_flg)
224 eq_(self.qrv, self.g.qrv)
225 eq_(self.qqic, self.g.qqic)
226 eq_(self.num, self.g.num)
227 eq_(self.srcs, self.g.srcs)
229 def test_init_with_srcs(self):
230 self.setUp_with_srcs()
233 def test_parser(self):
234 _res = self.g.parser(self.buf)
235 if type(_res) is tuple:
240 eq_(res.msgtype, self.msgtype)
241 eq_(res.maxresp, self.maxresp)
242 eq_(res.csum, self.csum)
243 eq_(res.address, self.address)
244 eq_(res.s_flg, self.s_flg)
245 eq_(res.qrv, self.qrv)
246 eq_(res.qqic, self.qqic)
247 eq_(res.num, self.num)
248 eq_(res.srcs, self.srcs)
250 def test_parser_with_srcs(self):
251 self.setUp_with_srcs()
254 def test_serialize(self):
257 buf = self.g.serialize(data, prev)
259 res = unpack_from(igmpv3_query._PACK_STR, six.binary_type(buf))
261 eq_(res[0], self.msgtype)
262 eq_(res[1], self.maxresp)
263 eq_(res[2], checksum(self.buf))
264 eq_(res[3], addrconv.ipv4.text_to_bin(self.address))
265 eq_(res[4], self.s_qrv)
266 eq_(res[5], self.qqic)
267 eq_(res[6], self.num)
269 def test_serialize_with_srcs(self):
270 self.setUp_with_srcs()
273 buf = self.g.serialize(data, prev)
275 res = unpack_from(igmpv3_query._PACK_STR, six.binary_type(buf))
276 (src1, src2, src3) = unpack_from('4s4s4s', six.binary_type(buf),
277 igmpv3_query._MIN_LEN)
279 eq_(res[0], self.msgtype)
280 eq_(res[1], self.maxresp)
281 eq_(res[2], checksum(self.buf))
282 eq_(res[3], addrconv.ipv4.text_to_bin(self.address))
283 eq_(res[4], self.s_qrv)
284 eq_(res[5], self.qqic)
285 eq_(res[6], self.num)
286 eq_(src1, addrconv.ipv4.text_to_bin(self.srcs[0]))
287 eq_(src2, addrconv.ipv4.text_to_bin(self.srcs[1]))
288 eq_(src3, addrconv.ipv4.text_to_bin(self.srcs[2]))
290 def _build_igmp(self):
291 dl_dst = '11:22:33:44:55:66'
292 dl_src = 'aa:bb:cc:dd:ee:ff'
293 dl_type = ether.ETH_TYPE_IP
294 e = ethernet(dl_dst, dl_src, dl_type)
296 total_length = len(ipv4()) + len(self.g)
297 nw_proto = inet.IPPROTO_IGMP
298 nw_dst = '11.22.33.44'
299 nw_src = '55.66.77.88'
300 i = ipv4(total_length=total_length, src=nw_src, dst=nw_dst,
301 proto=nw_proto, ttl=1)
307 p.add_protocol(self.g)
311 def test_build_igmp(self):
312 p = self._build_igmp()
314 e = self.find_protocol(p, "ethernet")
316 eq_(e.ethertype, ether.ETH_TYPE_IP)
318 i = self.find_protocol(p, "ipv4")
320 eq_(i.proto, inet.IPPROTO_IGMP)
322 g = self.find_protocol(p, "igmpv3_query")
325 eq_(g.msgtype, self.msgtype)
326 eq_(g.maxresp, self.maxresp)
327 eq_(g.csum, checksum(self.buf))
328 eq_(g.address, self.address)
329 eq_(g.s_flg, self.s_flg)
331 eq_(g.qqic, self.qqic)
333 eq_(g.srcs, self.srcs)
335 def test_build_igmp_with_srcs(self):
336 self.setUp_with_srcs()
337 self.test_build_igmp()
339 def test_to_string(self):
340 igmp_values = {'msgtype': repr(self.msgtype),
341 'maxresp': repr(self.maxresp),
342 'csum': repr(self.csum),
343 'address': repr(self.address),
344 's_flg': repr(self.s_flg),
345 'qrv': repr(self.qrv),
346 'qqic': repr(self.qqic),
347 'num': repr(self.num),
348 'srcs': repr(self.srcs)}
349 _g_str = ','.join(['%s=%s' % (k, igmp_values[k])
350 for k, v in inspect.getmembers(self.g)
351 if k in igmp_values])
352 g_str = '%s(%s)' % (igmpv3_query.__name__, _g_str)
354 eq_(str(self.g), g_str)
355 eq_(repr(self.g), g_str)
357 def test_to_string_with_srcs(self):
358 self.setUp_with_srcs()
359 self.test_to_string()
362 def test_num_larger_than_srcs(self):
363 self.srcs = ['192.168.1.1', '192.168.1.2', '192.168.1.3']
364 self.num = len(self.srcs) + 1
365 self.buf = pack(igmpv3_query._PACK_STR, self.msgtype,
366 self.maxresp, self.csum,
367 addrconv.ipv4.text_to_bin(self.address),
368 self.s_qrv, self.qqic, self.num)
369 for src in self.srcs:
370 self.buf += pack('4s', addrconv.ipv4.text_to_bin(src))
371 self.g = igmpv3_query(
372 self.msgtype, self.maxresp, self.csum, self.address,
373 self.s_flg, self.qrv, self.qqic, self.num, self.srcs)
377 def test_num_smaller_than_srcs(self):
378 self.srcs = ['192.168.1.1', '192.168.1.2', '192.168.1.3']
379 self.num = len(self.srcs) - 1
380 self.buf = pack(igmpv3_query._PACK_STR, self.msgtype,
381 self.maxresp, self.csum,
382 addrconv.ipv4.text_to_bin(self.address),
383 self.s_qrv, self.qqic, self.num)
384 for src in self.srcs:
385 self.buf += pack('4s', addrconv.ipv4.text_to_bin(src))
386 self.g = igmpv3_query(
387 self.msgtype, self.maxresp, self.csum, self.address,
388 self.s_flg, self.qrv, self.qqic, self.num, self.srcs)
391 def test_default_args(self):
392 prev = ipv4(proto=inet.IPPROTO_IGMP)
394 prev.serialize(g, None)
395 buf = g.serialize(bytearray(), prev)
396 res = unpack_from(igmpv3_query._PACK_STR, six.binary_type(buf))
398 pack_into('!H', buf, 2, 0)
400 eq_(res[0], IGMP_TYPE_QUERY)
402 eq_(res[2], checksum(buf))
403 eq_(res[3], addrconv.ipv4.text_to_bin('0.0.0.0'))
409 prev = ipv4(proto=inet.IPPROTO_IGMP)
410 srcs = ['192.168.1.1', '192.168.1.2', '192.168.1.3']
411 g = igmpv3_query(srcs=srcs)
412 prev.serialize(g, None)
413 buf = g.serialize(bytearray(), prev)
414 res = unpack_from(igmpv3_query._PACK_STR, six.binary_type(buf))
416 pack_into('!H', buf, 2, 0)
418 eq_(res[0], IGMP_TYPE_QUERY)
420 eq_(res[2], checksum(buf))
421 eq_(res[3], addrconv.ipv4.text_to_bin('0.0.0.0'))
424 eq_(res[6], len(srcs))
426 res = unpack_from('4s4s4s', six.binary_type(buf), igmpv3_query._MIN_LEN)
428 eq_(res[0], addrconv.ipv4.text_to_bin(srcs[0]))
429 eq_(res[1], addrconv.ipv4.text_to_bin(srcs[1]))
430 eq_(res[2], addrconv.ipv4.text_to_bin(srcs[2]))
433 jsondict = self.g.to_jsondict()
434 g = igmpv3_query.from_jsondict(jsondict['igmpv3_query'])
435 eq_(str(self.g), str(g))
437 def test_json_with_srcs(self):
438 self.setUp_with_srcs()
442 class Test_igmpv3_report(unittest.TestCase):
443 """ Test case for Internet Group Management Protocol v3
444 Membership Report Message"""
447 self.msgtype = IGMP_TYPE_REPORT_V3
452 self.buf = pack(igmpv3_report._PACK_STR, self.msgtype,
453 self.csum, self.record_num)
455 self.g = igmpv3_report(
456 self.msgtype, self.csum, self.record_num, self.records)
458 def setUp_with_records(self):
459 self.record1 = igmpv3_report_group(
460 MODE_IS_INCLUDE, 0, 0, '225.0.0.1')
461 self.record2 = igmpv3_report_group(
462 MODE_IS_INCLUDE, 0, 2, '225.0.0.2',
463 ['172.16.10.10', '172.16.10.27'])
464 self.record3 = igmpv3_report_group(
465 MODE_IS_INCLUDE, 1, 0, '225.0.0.3', [], b'abc\x00')
466 self.record4 = igmpv3_report_group(
467 MODE_IS_INCLUDE, 2, 2, '225.0.0.4',
468 ['172.16.10.10', '172.16.10.27'], b'abcde\x00\x00\x00')
469 self.records = [self.record1, self.record2, self.record3,
471 self.record_num = len(self.records)
472 self.buf = pack(igmpv3_report._PACK_STR, self.msgtype,
473 self.csum, self.record_num)
474 self.buf += self.record1.serialize()
475 self.buf += self.record2.serialize()
476 self.buf += self.record3.serialize()
477 self.buf += self.record4.serialize()
478 self.g = igmpv3_report(
479 self.msgtype, self.csum, self.record_num, self.records)
484 def find_protocol(self, pkt, name):
485 for p in pkt.protocols:
486 if p.protocol_name == name:
490 eq_(self.msgtype, self.g.msgtype)
491 eq_(self.csum, self.g.csum)
492 eq_(self.record_num, self.g.record_num)
493 eq_(self.records, self.g.records)
495 def test_init_with_records(self):
496 self.setUp_with_records()
499 def test_parser(self):
500 _res = self.g.parser(six.binary_type(self.buf))
501 if type(_res) is tuple:
506 eq_(res.msgtype, self.msgtype)
507 eq_(res.csum, self.csum)
508 eq_(res.record_num, self.record_num)
509 eq_(repr(res.records), repr(self.records))
511 def test_parser_with_records(self):
512 self.setUp_with_records()
515 def test_serialize(self):
518 buf = self.g.serialize(data, prev)
520 res = unpack_from(igmpv3_report._PACK_STR, six.binary_type(buf))
522 eq_(res[0], self.msgtype)
523 eq_(res[1], checksum(self.buf))
524 eq_(res[2], self.record_num)
526 def test_serialize_with_records(self):
527 self.setUp_with_records()
530 buf = six.binary_type(self.g.serialize(data, prev))
532 res = unpack_from(igmpv3_report._PACK_STR, buf)
533 offset = igmpv3_report._MIN_LEN
534 rec1 = igmpv3_report_group.parser(buf[offset:])
536 rec2 = igmpv3_report_group.parser(buf[offset:])
538 rec3 = igmpv3_report_group.parser(buf[offset:])
540 rec4 = igmpv3_report_group.parser(buf[offset:])
542 eq_(res[0], self.msgtype)
543 eq_(res[1], checksum(self.buf))
544 eq_(res[2], self.record_num)
545 eq_(repr(rec1), repr(self.record1))
546 eq_(repr(rec2), repr(self.record2))
547 eq_(repr(rec3), repr(self.record3))
548 eq_(repr(rec4), repr(self.record4))
550 def _build_igmp(self):
551 dl_dst = '11:22:33:44:55:66'
552 dl_src = 'aa:bb:cc:dd:ee:ff'
553 dl_type = ether.ETH_TYPE_IP
554 e = ethernet(dl_dst, dl_src, dl_type)
556 total_length = len(ipv4()) + len(self.g)
557 nw_proto = inet.IPPROTO_IGMP
558 nw_dst = '11.22.33.44'
559 nw_src = '55.66.77.88'
560 i = ipv4(total_length=total_length, src=nw_src, dst=nw_dst,
561 proto=nw_proto, ttl=1)
567 p.add_protocol(self.g)
571 def test_build_igmp(self):
572 p = self._build_igmp()
574 e = self.find_protocol(p, "ethernet")
576 eq_(e.ethertype, ether.ETH_TYPE_IP)
578 i = self.find_protocol(p, "ipv4")
580 eq_(i.proto, inet.IPPROTO_IGMP)
582 g = self.find_protocol(p, "igmpv3_report")
585 eq_(g.msgtype, self.msgtype)
586 eq_(g.csum, checksum(self.buf))
587 eq_(g.record_num, self.record_num)
588 eq_(g.records, self.records)
590 def test_build_igmp_with_records(self):
591 self.setUp_with_records()
592 self.test_build_igmp()
594 def test_to_string(self):
595 igmp_values = {'msgtype': repr(self.msgtype),
596 'csum': repr(self.csum),
597 'record_num': repr(self.record_num),
598 'records': repr(self.records)}
599 _g_str = ','.join(['%s=%s' % (k, igmp_values[k])
600 for k, v in inspect.getmembers(self.g)
601 if k in igmp_values])
602 g_str = '%s(%s)' % (igmpv3_report.__name__, _g_str)
604 eq_(str(self.g), g_str)
605 eq_(repr(self.g), g_str)
607 def test_to_string_with_records(self):
608 self.setUp_with_records()
609 self.test_to_string()
612 def test_record_num_larger_than_records(self):
613 self.record1 = igmpv3_report_group(
614 MODE_IS_INCLUDE, 0, 0, '225.0.0.1')
615 self.record2 = igmpv3_report_group(
616 MODE_IS_INCLUDE, 0, 2, '225.0.0.2',
617 ['172.16.10.10', '172.16.10.27'])
618 self.record3 = igmpv3_report_group(
619 MODE_IS_INCLUDE, 1, 0, '225.0.0.3', [], b'abc\x00')
620 self.record4 = igmpv3_report_group(
621 MODE_IS_INCLUDE, 1, 2, '225.0.0.4',
622 ['172.16.10.10', '172.16.10.27'], b'abc\x00')
623 self.records = [self.record1, self.record2, self.record3,
625 self.record_num = len(self.records) + 1
626 self.buf = pack(igmpv3_report._PACK_STR, self.msgtype,
627 self.csum, self.record_num)
628 self.buf += self.record1.serialize()
629 self.buf += self.record2.serialize()
630 self.buf += self.record3.serialize()
631 self.buf += self.record4.serialize()
632 self.g = igmpv3_report(
633 self.msgtype, self.csum, self.record_num, self.records)
637 def test_record_num_smaller_than_records(self):
638 self.record1 = igmpv3_report_group(
639 MODE_IS_INCLUDE, 0, 0, '225.0.0.1')
640 self.record2 = igmpv3_report_group(
641 MODE_IS_INCLUDE, 0, 2, '225.0.0.2',
642 ['172.16.10.10', '172.16.10.27'])
643 self.record3 = igmpv3_report_group(
644 MODE_IS_INCLUDE, 1, 0, '225.0.0.3', [], b'abc\x00')
645 self.record4 = igmpv3_report_group(
646 MODE_IS_INCLUDE, 1, 2, '225.0.0.4',
647 ['172.16.10.10', '172.16.10.27'], b'abc\x00')
648 self.records = [self.record1, self.record2, self.record3,
650 self.record_num = len(self.records) - 1
651 self.buf = pack(igmpv3_report._PACK_STR, self.msgtype,
652 self.csum, self.record_num)
653 self.buf += self.record1.serialize()
654 self.buf += self.record2.serialize()
655 self.buf += self.record3.serialize()
656 self.buf += self.record4.serialize()
657 self.g = igmpv3_report(
658 self.msgtype, self.csum, self.record_num, self.records)
661 def test_default_args(self):
662 prev = ipv4(proto=inet.IPPROTO_IGMP)
664 prev.serialize(g, None)
665 buf = g.serialize(bytearray(), prev)
666 res = unpack_from(igmpv3_report._PACK_STR, six.binary_type(buf))
668 pack_into('!H', buf, 2, 0)
670 eq_(res[0], IGMP_TYPE_REPORT_V3)
671 eq_(res[1], checksum(buf))
674 # records without record_num
675 prev = ipv4(proto=inet.IPPROTO_IGMP)
676 record1 = igmpv3_report_group(
677 MODE_IS_INCLUDE, 0, 0, '225.0.0.1')
678 record2 = igmpv3_report_group(
679 MODE_IS_INCLUDE, 0, 2, '225.0.0.2',
680 ['172.16.10.10', '172.16.10.27'])
681 record3 = igmpv3_report_group(
682 MODE_IS_INCLUDE, 1, 0, '225.0.0.3', [], b'abc\x00')
683 record4 = igmpv3_report_group(
684 MODE_IS_INCLUDE, 1, 2, '225.0.0.4',
685 ['172.16.10.10', '172.16.10.27'], b'abc\x00')
686 records = [record1, record2, record3, record4]
687 g = igmpv3_report(records=records)
688 prev.serialize(g, None)
689 buf = g.serialize(bytearray(), prev)
690 res = unpack_from(igmpv3_report._PACK_STR, six.binary_type(buf))
692 pack_into('!H', buf, 2, 0)
694 eq_(res[0], IGMP_TYPE_REPORT_V3)
695 eq_(res[1], checksum(buf))
696 eq_(res[2], len(records))
699 jsondict = self.g.to_jsondict()
700 g = igmpv3_report.from_jsondict(jsondict['igmpv3_report'])
701 eq_(str(self.g), str(g))
703 def test_json_with_records(self):
704 self.setUp_with_records()
708 class Test_igmpv3_report_group(unittest.TestCase):
709 """Test case for Group Records of
710 Internet Group Management Protocol v3 Membership Report Message"""
713 self.type_ = MODE_IS_INCLUDE
716 self.address = '225.0.0.1'
720 self.buf = pack(igmpv3_report_group._PACK_STR, self.type_,
721 self.aux_len, self.num,
722 addrconv.ipv4.text_to_bin(self.address))
724 self.g = igmpv3_report_group(
725 self.type_, self.aux_len, self.num, self.address,
728 def setUp_with_srcs(self):
729 self.srcs = ['192.168.1.1', '192.168.1.2', '192.168.1.3']
730 self.num = len(self.srcs)
731 self.buf = pack(igmpv3_report_group._PACK_STR, self.type_,
732 self.aux_len, self.num,
733 addrconv.ipv4.text_to_bin(self.address))
734 for src in self.srcs:
735 self.buf += pack('4s', addrconv.ipv4.text_to_bin(src))
736 self.g = igmpv3_report_group(
737 self.type_, self.aux_len, self.num, self.address,
740 def setUp_with_aux(self):
741 self.aux = b'\x01\x02\x03\x04\x05\x00\x00\x00'
742 self.aux_len = len(self.aux) // 4
743 self.buf = pack(igmpv3_report_group._PACK_STR, self.type_,
744 self.aux_len, self.num,
745 addrconv.ipv4.text_to_bin(self.address))
747 self.g = igmpv3_report_group(
748 self.type_, self.aux_len, self.num, self.address,
751 def setUp_with_srcs_and_aux(self):
752 self.srcs = ['192.168.1.1', '192.168.1.2', '192.168.1.3']
753 self.num = len(self.srcs)
754 self.aux = b'\x01\x02\x03\x04\x05\x00\x00\x00'
755 self.aux_len = len(self.aux) // 4
756 self.buf = pack(igmpv3_report_group._PACK_STR, self.type_,
757 self.aux_len, self.num,
758 addrconv.ipv4.text_to_bin(self.address))
759 for src in self.srcs:
760 self.buf += pack('4s', addrconv.ipv4.text_to_bin(src))
762 self.g = igmpv3_report_group(
763 self.type_, self.aux_len, self.num, self.address,
770 eq_(self.type_, self.g.type_)
771 eq_(self.aux_len, self.g.aux_len)
772 eq_(self.num, self.g.num)
773 eq_(self.address, self.g.address)
774 eq_(self.srcs, self.g.srcs)
775 eq_(self.aux, self.g.aux)
777 def test_init_with_srcs(self):
778 self.setUp_with_srcs()
781 def test_init_with_aux(self):
782 self.setUp_with_aux()
785 def test_init_with_srcs_and_aux(self):
786 self.setUp_with_srcs_and_aux()
789 def test_parser(self):
790 _res = self.g.parser(self.buf)
791 if type(_res) is tuple:
796 eq_(res.type_, self.type_)
797 eq_(res.aux_len, self.aux_len)
798 eq_(res.num, self.num)
799 eq_(res.address, self.address)
800 eq_(res.srcs, self.srcs)
801 eq_(res.aux, self.aux)
803 def test_parser_with_srcs(self):
804 self.setUp_with_srcs()
807 def test_parser_with_aux(self):
808 self.setUp_with_aux()
811 def test_parser_with_srcs_and_aux(self):
812 self.setUp_with_srcs_and_aux()
815 def test_serialize(self):
816 buf = self.g.serialize()
817 res = unpack_from(igmpv3_report_group._PACK_STR, six.binary_type(buf))
819 eq_(res[0], self.type_)
820 eq_(res[1], self.aux_len)
821 eq_(res[2], self.num)
822 eq_(res[3], addrconv.ipv4.text_to_bin(self.address))
824 def test_serialize_with_srcs(self):
825 self.setUp_with_srcs()
826 buf = self.g.serialize()
827 res = unpack_from(igmpv3_report_group._PACK_STR, six.binary_type(buf))
828 (src1, src2, src3) = unpack_from('4s4s4s', six.binary_type(buf),
829 igmpv3_report_group._MIN_LEN)
830 eq_(res[0], self.type_)
831 eq_(res[1], self.aux_len)
832 eq_(res[2], self.num)
833 eq_(res[3], addrconv.ipv4.text_to_bin(self.address))
834 eq_(src1, addrconv.ipv4.text_to_bin(self.srcs[0]))
835 eq_(src2, addrconv.ipv4.text_to_bin(self.srcs[1]))
836 eq_(src3, addrconv.ipv4.text_to_bin(self.srcs[2]))
838 def test_serialize_with_aux(self):
839 self.setUp_with_aux()
840 buf = self.g.serialize()
841 res = unpack_from(igmpv3_report_group._PACK_STR, six.binary_type(buf))
842 (aux, ) = unpack_from('%ds' % (self.aux_len * 4), six.binary_type(buf),
843 igmpv3_report_group._MIN_LEN)
844 eq_(res[0], self.type_)
845 eq_(res[1], self.aux_len)
846 eq_(res[2], self.num)
847 eq_(res[3], addrconv.ipv4.text_to_bin(self.address))
850 def test_serialize_with_srcs_and_aux(self):
851 self.setUp_with_srcs_and_aux()
852 buf = self.g.serialize()
853 res = unpack_from(igmpv3_report_group._PACK_STR, six.binary_type(buf))
854 (src1, src2, src3) = unpack_from('4s4s4s', six.binary_type(buf),
855 igmpv3_report_group._MIN_LEN)
856 (aux, ) = unpack_from('%ds' % (self.aux_len * 4), six.binary_type(buf),
857 igmpv3_report_group._MIN_LEN + 12)
858 eq_(res[0], self.type_)
859 eq_(res[1], self.aux_len)
860 eq_(res[2], self.num)
861 eq_(res[3], addrconv.ipv4.text_to_bin(self.address))
862 eq_(src1, addrconv.ipv4.text_to_bin(self.srcs[0]))
863 eq_(src2, addrconv.ipv4.text_to_bin(self.srcs[1]))
864 eq_(src3, addrconv.ipv4.text_to_bin(self.srcs[2]))
867 def test_to_string(self):
868 igmp_values = {'type_': repr(self.type_),
869 'aux_len': repr(self.aux_len),
870 'num': repr(self.num),
871 'address': repr(self.address),
872 'srcs': repr(self.srcs),
873 'aux': repr(self.aux)}
874 _g_str = ','.join(['%s=%s' % (k, igmp_values[k])
875 for k, v in inspect.getmembers(self.g)
876 if k in igmp_values])
877 g_str = '%s(%s)' % (igmpv3_report_group.__name__, _g_str)
879 eq_(str(self.g), g_str)
880 eq_(repr(self.g), g_str)
882 def test_to_string_with_srcs(self):
883 self.setUp_with_srcs()
884 self.test_to_string()
886 def test_to_string_with_aux(self):
887 self.setUp_with_aux()
888 self.test_to_string()
890 def test_to_string_with_srcs_and_aux(self):
891 self.setUp_with_srcs_and_aux()
892 self.test_to_string()
897 def test_len_with_srcs(self):
898 self.setUp_with_srcs()
901 def test_len_with_aux(self):
902 self.setUp_with_aux()
905 def test_len_with_srcs_and_aux(self):
906 self.setUp_with_srcs_and_aux()
910 def test_num_larger_than_srcs(self):
911 self.srcs = ['192.168.1.1', '192.168.1.2', '192.168.1.3']
912 self.num = len(self.srcs) + 1
913 self.buf = pack(igmpv3_report_group._PACK_STR, self.type_,
914 self.aux_len, self.num,
915 addrconv.ipv4.text_to_bin(self.address))
916 for src in self.srcs:
917 self.buf += pack('4s', addrconv.ipv4.text_to_bin(src))
918 self.g = igmpv3_report_group(
919 self.type_, self.aux_len, self.num, self.address,
924 def test_num_smaller_than_srcs(self):
925 self.srcs = ['192.168.1.1', '192.168.1.2', '192.168.1.3']
926 self.num = len(self.srcs) - 1
927 self.buf = pack(igmpv3_report_group._PACK_STR, self.type_,
928 self.aux_len, self.num,
929 addrconv.ipv4.text_to_bin(self.address))
930 for src in self.srcs:
931 self.buf += pack('4s', addrconv.ipv4.text_to_bin(src))
932 self.g = igmpv3_report_group(
933 self.type_, self.aux_len, self.num, self.address,
938 def test_aux_len_larger_than_aux(self):
939 self.aux = b'\x01\x02\x03\x04\x05\x00\x00\x00'
940 self.aux_len = len(self.aux) // 4 + 1
941 self.buf = pack(igmpv3_report_group._PACK_STR, self.type_,
942 self.aux_len, self.num,
943 addrconv.ipv4.text_to_bin(self.address))
945 self.g = igmpv3_report_group(
946 self.type_, self.aux_len, self.num, self.address,
951 def test_aux_len_smaller_than_aux(self):
952 self.aux = b'\x01\x02\x03\x04\x05\x00\x00\x00'
953 self.aux_len = len(self.aux) // 4 - 1
954 self.buf = pack(igmpv3_report_group._PACK_STR, self.type_,
955 self.aux_len, self.num,
956 addrconv.ipv4.text_to_bin(self.address))
958 self.g = igmpv3_report_group(
959 self.type_, self.aux_len, self.num, self.address,
963 def test_default_args(self):
964 rep = igmpv3_report_group()
965 buf = rep.serialize()
966 res = unpack_from(igmpv3_report_group._PACK_STR, six.binary_type(buf))
971 eq_(res[3], addrconv.ipv4.text_to_bin('0.0.0.0'))
974 srcs = ['192.168.1.1', '192.168.1.2', '192.168.1.3']
975 rep = igmpv3_report_group(srcs=srcs)
976 buf = rep.serialize()
977 res = unpack_from(igmpv3_report_group._PACK_STR, six.binary_type(buf))
981 eq_(res[2], len(srcs))
982 eq_(res[3], addrconv.ipv4.text_to_bin('0.0.0.0'))
984 res = unpack_from('4s4s4s', six.binary_type(buf),
985 igmpv3_report_group._MIN_LEN)
987 eq_(res[0], addrconv.ipv4.text_to_bin(srcs[0]))
988 eq_(res[1], addrconv.ipv4.text_to_bin(srcs[1]))
989 eq_(res[2], addrconv.ipv4.text_to_bin(srcs[2]))
991 # aux without aux_len
993 rep = igmpv3_report_group(aux=aux)
994 buf = rep.serialize()
995 res = unpack_from(igmpv3_report_group._PACK_STR, six.binary_type(buf))
1000 eq_(res[3], addrconv.ipv4.text_to_bin('0.0.0.0'))
1001 eq_(buf[igmpv3_report_group._MIN_LEN:], b'abcde\x00\x00\x00')