backing up
[vsorcdistro/.git] / ryu / build / lib.linux-armv7l-2.7 / ryu / tests / unit / packet / test_igmp.py
1 # Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #    http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 # vim: tabstop=4 shiftwidth=4 softtabstop=4
17
18 import unittest
19 import inspect
20 import logging
21 import six
22
23 from struct import pack, unpack_from, pack_into
24 from nose.tools import ok_, eq_, raises
25 from ryu.ofproto import ether
26 from ryu.ofproto import inet
27 from ryu.lib.packet.ethernet import ethernet
28 from ryu.lib.packet.ipv4 import ipv4
29 from ryu.lib.packet.packet import Packet
30 from ryu.lib.packet.packet_utils import checksum
31 from ryu.lib import addrconv
32 from ryu.lib.packet.igmp import igmp
33 from ryu.lib.packet.igmp import igmpv3_query
34 from ryu.lib.packet.igmp import igmpv3_report
35 from ryu.lib.packet.igmp import igmpv3_report_group
36 from ryu.lib.packet.igmp import IGMP_TYPE_QUERY
37 from ryu.lib.packet.igmp import IGMP_TYPE_REPORT_V3
38 from ryu.lib.packet.igmp import MODE_IS_INCLUDE
39
40 LOG = logging.getLogger(__name__)
41
42
43 class Test_igmp(unittest.TestCase):
44     """ Test case for Internet Group Management Protocol
45     """
46
47     def setUp(self):
48         self.msgtype = IGMP_TYPE_QUERY
49         self.maxresp = 100
50         self.csum = 0
51         self.address = '225.0.0.1'
52
53         self.buf = pack(igmp._PACK_STR, self.msgtype, self.maxresp,
54                         self.csum,
55                         addrconv.ipv4.text_to_bin(self.address))
56
57         self.g = igmp(self.msgtype, self.maxresp, self.csum,
58                       self.address)
59
60     def tearDown(self):
61         pass
62
63     def find_protocol(self, pkt, name):
64         for p in pkt.protocols:
65             if p.protocol_name == name:
66                 return p
67
68     def test_init(self):
69         eq_(self.msgtype, self.g.msgtype)
70         eq_(self.maxresp, self.g.maxresp)
71         eq_(self.csum, self.g.csum)
72         eq_(self.address, self.g.address)
73
74     def test_parser(self):
75         _res = self.g.parser(self.buf)
76         if type(_res) is tuple:
77             res = _res[0]
78         else:
79             res = _res
80
81         eq_(res.msgtype, self.msgtype)
82         eq_(res.maxresp, self.maxresp)
83         eq_(res.csum, self.csum)
84         eq_(res.address, self.address)
85
86     def test_serialize(self):
87         data = bytearray()
88         prev = None
89         buf = self.g.serialize(data, prev)
90
91         res = unpack_from(igmp._PACK_STR, six.binary_type(buf))
92
93         eq_(res[0], self.msgtype)
94         eq_(res[1], self.maxresp)
95         eq_(res[2], checksum(self.buf))
96         eq_(res[3], addrconv.ipv4.text_to_bin(self.address))
97
98     def _build_igmp(self):
99         dl_dst = '11:22:33:44:55:66'
100         dl_src = 'aa:bb:cc:dd:ee:ff'
101         dl_type = ether.ETH_TYPE_IP
102         e = ethernet(dl_dst, dl_src, dl_type)
103
104         total_length = 20 + igmp._MIN_LEN
105         nw_proto = inet.IPPROTO_IGMP
106         nw_dst = '11.22.33.44'
107         nw_src = '55.66.77.88'
108         i = ipv4(total_length=total_length, src=nw_src, dst=nw_dst,
109                  proto=nw_proto)
110
111         p = Packet()
112
113         p.add_protocol(e)
114         p.add_protocol(i)
115         p.add_protocol(self.g)
116         p.serialize()
117         return p
118
119     def test_build_igmp(self):
120         p = self._build_igmp()
121
122         e = self.find_protocol(p, "ethernet")
123         ok_(e)
124         eq_(e.ethertype, ether.ETH_TYPE_IP)
125
126         i = self.find_protocol(p, "ipv4")
127         ok_(i)
128         eq_(i.proto, inet.IPPROTO_IGMP)
129
130         g = self.find_protocol(p, "igmp")
131         ok_(g)
132
133         eq_(g.msgtype, self.msgtype)
134         eq_(g.maxresp, self.maxresp)
135         eq_(g.csum, checksum(self.buf))
136         eq_(g.address, self.address)
137
138     def test_to_string(self):
139         igmp_values = {'msgtype': repr(self.msgtype),
140                        'maxresp': repr(self.maxresp),
141                        'csum': repr(self.csum),
142                        'address': repr(self.address)}
143         _g_str = ','.join(['%s=%s' % (k, igmp_values[k])
144                            for k, v in inspect.getmembers(self.g)
145                            if k in igmp_values])
146         g_str = '%s(%s)' % (igmp.__name__, _g_str)
147
148         eq_(str(self.g), g_str)
149         eq_(repr(self.g), g_str)
150
151     @raises(Exception)
152     def test_malformed_igmp(self):
153         m_short_buf = self.buf[1:igmp._MIN_LEN]
154         igmp.parser(m_short_buf)
155
156     def test_default_args(self):
157         ig = igmp()
158         buf = ig.serialize(bytearray(), None)
159         res = unpack_from(igmp._PACK_STR, six.binary_type(buf))
160
161         eq_(res[0], 0x11)
162         eq_(res[1], 0)
163         eq_(res[3], addrconv.ipv4.text_to_bin('0.0.0.0'))
164
165     def test_json(self):
166         jsondict = self.g.to_jsondict()
167         g = igmp.from_jsondict(jsondict['igmp'])
168         eq_(str(self.g), str(g))
169
170
171 class Test_igmpv3_query(unittest.TestCase):
172     """ Test case for Internet Group Management Protocol v3
173     Membership Query Message"""
174
175     def setUp(self):
176         self.msgtype = IGMP_TYPE_QUERY
177         self.maxresp = 100
178         self.csum = 0
179         self.address = '225.0.0.1'
180         self.s_flg = 0
181         self.qrv = 2
182         self.qqic = 10
183         self.num = 0
184         self.srcs = []
185
186         self.s_qrv = self.s_flg << 3 | self.qrv
187
188         self.buf = pack(igmpv3_query._PACK_STR, self.msgtype,
189                         self.maxresp, self.csum,
190                         addrconv.ipv4.text_to_bin(self.address),
191                         self.s_qrv, self.qqic, self.num)
192
193         self.g = igmpv3_query(
194             self.msgtype, self.maxresp, self.csum, self.address,
195             self.s_flg, self.qrv, self.qqic, self.num, self.srcs)
196
197     def setUp_with_srcs(self):
198         self.srcs = ['192.168.1.1', '192.168.1.2', '192.168.1.3']
199         self.num = len(self.srcs)
200         self.buf = pack(igmpv3_query._PACK_STR, self.msgtype,
201                         self.maxresp, self.csum,
202                         addrconv.ipv4.text_to_bin(self.address),
203                         self.s_qrv, self.qqic, self.num)
204         for src in self.srcs:
205             self.buf += pack('4s', addrconv.ipv4.text_to_bin(src))
206         self.g = igmpv3_query(
207             self.msgtype, self.maxresp, self.csum, self.address,
208             self.s_flg, self.qrv, self.qqic, self.num, self.srcs)
209
210     def tearDown(self):
211         pass
212
213     def find_protocol(self, pkt, name):
214         for p in pkt.protocols:
215             if p.protocol_name == name:
216                 return p
217
218     def test_init(self):
219         eq_(self.msgtype, self.g.msgtype)
220         eq_(self.maxresp, self.g.maxresp)
221         eq_(self.csum, self.g.csum)
222         eq_(self.address, self.g.address)
223         eq_(self.s_flg, self.g.s_flg)
224         eq_(self.qrv, self.g.qrv)
225         eq_(self.qqic, self.g.qqic)
226         eq_(self.num, self.g.num)
227         eq_(self.srcs, self.g.srcs)
228
229     def test_init_with_srcs(self):
230         self.setUp_with_srcs()
231         self.test_init()
232
233     def test_parser(self):
234         _res = self.g.parser(self.buf)
235         if type(_res) is tuple:
236             res = _res[0]
237         else:
238             res = _res
239
240         eq_(res.msgtype, self.msgtype)
241         eq_(res.maxresp, self.maxresp)
242         eq_(res.csum, self.csum)
243         eq_(res.address, self.address)
244         eq_(res.s_flg, self.s_flg)
245         eq_(res.qrv, self.qrv)
246         eq_(res.qqic, self.qqic)
247         eq_(res.num, self.num)
248         eq_(res.srcs, self.srcs)
249
250     def test_parser_with_srcs(self):
251         self.setUp_with_srcs()
252         self.test_parser()
253
254     def test_serialize(self):
255         data = bytearray()
256         prev = None
257         buf = self.g.serialize(data, prev)
258
259         res = unpack_from(igmpv3_query._PACK_STR, six.binary_type(buf))
260
261         eq_(res[0], self.msgtype)
262         eq_(res[1], self.maxresp)
263         eq_(res[2], checksum(self.buf))
264         eq_(res[3], addrconv.ipv4.text_to_bin(self.address))
265         eq_(res[4], self.s_qrv)
266         eq_(res[5], self.qqic)
267         eq_(res[6], self.num)
268
269     def test_serialize_with_srcs(self):
270         self.setUp_with_srcs()
271         data = bytearray()
272         prev = None
273         buf = self.g.serialize(data, prev)
274
275         res = unpack_from(igmpv3_query._PACK_STR, six.binary_type(buf))
276         (src1, src2, src3) = unpack_from('4s4s4s', six.binary_type(buf),
277                                          igmpv3_query._MIN_LEN)
278
279         eq_(res[0], self.msgtype)
280         eq_(res[1], self.maxresp)
281         eq_(res[2], checksum(self.buf))
282         eq_(res[3], addrconv.ipv4.text_to_bin(self.address))
283         eq_(res[4], self.s_qrv)
284         eq_(res[5], self.qqic)
285         eq_(res[6], self.num)
286         eq_(src1, addrconv.ipv4.text_to_bin(self.srcs[0]))
287         eq_(src2, addrconv.ipv4.text_to_bin(self.srcs[1]))
288         eq_(src3, addrconv.ipv4.text_to_bin(self.srcs[2]))
289
290     def _build_igmp(self):
291         dl_dst = '11:22:33:44:55:66'
292         dl_src = 'aa:bb:cc:dd:ee:ff'
293         dl_type = ether.ETH_TYPE_IP
294         e = ethernet(dl_dst, dl_src, dl_type)
295
296         total_length = len(ipv4()) + len(self.g)
297         nw_proto = inet.IPPROTO_IGMP
298         nw_dst = '11.22.33.44'
299         nw_src = '55.66.77.88'
300         i = ipv4(total_length=total_length, src=nw_src, dst=nw_dst,
301                  proto=nw_proto, ttl=1)
302
303         p = Packet()
304
305         p.add_protocol(e)
306         p.add_protocol(i)
307         p.add_protocol(self.g)
308         p.serialize()
309         return p
310
311     def test_build_igmp(self):
312         p = self._build_igmp()
313
314         e = self.find_protocol(p, "ethernet")
315         ok_(e)
316         eq_(e.ethertype, ether.ETH_TYPE_IP)
317
318         i = self.find_protocol(p, "ipv4")
319         ok_(i)
320         eq_(i.proto, inet.IPPROTO_IGMP)
321
322         g = self.find_protocol(p, "igmpv3_query")
323         ok_(g)
324
325         eq_(g.msgtype, self.msgtype)
326         eq_(g.maxresp, self.maxresp)
327         eq_(g.csum, checksum(self.buf))
328         eq_(g.address, self.address)
329         eq_(g.s_flg, self.s_flg)
330         eq_(g.qrv, self.qrv)
331         eq_(g.qqic, self.qqic)
332         eq_(g.num, self.num)
333         eq_(g.srcs, self.srcs)
334
335     def test_build_igmp_with_srcs(self):
336         self.setUp_with_srcs()
337         self.test_build_igmp()
338
339     def test_to_string(self):
340         igmp_values = {'msgtype': repr(self.msgtype),
341                        'maxresp': repr(self.maxresp),
342                        'csum': repr(self.csum),
343                        'address': repr(self.address),
344                        's_flg': repr(self.s_flg),
345                        'qrv': repr(self.qrv),
346                        'qqic': repr(self.qqic),
347                        'num': repr(self.num),
348                        'srcs': repr(self.srcs)}
349         _g_str = ','.join(['%s=%s' % (k, igmp_values[k])
350                            for k, v in inspect.getmembers(self.g)
351                            if k in igmp_values])
352         g_str = '%s(%s)' % (igmpv3_query.__name__, _g_str)
353
354         eq_(str(self.g), g_str)
355         eq_(repr(self.g), g_str)
356
357     def test_to_string_with_srcs(self):
358         self.setUp_with_srcs()
359         self.test_to_string()
360
361     @raises(Exception)
362     def test_num_larger_than_srcs(self):
363         self.srcs = ['192.168.1.1', '192.168.1.2', '192.168.1.3']
364         self.num = len(self.srcs) + 1
365         self.buf = pack(igmpv3_query._PACK_STR, self.msgtype,
366                         self.maxresp, self.csum,
367                         addrconv.ipv4.text_to_bin(self.address),
368                         self.s_qrv, self.qqic, self.num)
369         for src in self.srcs:
370             self.buf += pack('4s', addrconv.ipv4.text_to_bin(src))
371         self.g = igmpv3_query(
372             self.msgtype, self.maxresp, self.csum, self.address,
373             self.s_flg, self.qrv, self.qqic, self.num, self.srcs)
374         self.test_parser()
375
376     @raises(Exception)
377     def test_num_smaller_than_srcs(self):
378         self.srcs = ['192.168.1.1', '192.168.1.2', '192.168.1.3']
379         self.num = len(self.srcs) - 1
380         self.buf = pack(igmpv3_query._PACK_STR, self.msgtype,
381                         self.maxresp, self.csum,
382                         addrconv.ipv4.text_to_bin(self.address),
383                         self.s_qrv, self.qqic, self.num)
384         for src in self.srcs:
385             self.buf += pack('4s', addrconv.ipv4.text_to_bin(src))
386         self.g = igmpv3_query(
387             self.msgtype, self.maxresp, self.csum, self.address,
388             self.s_flg, self.qrv, self.qqic, self.num, self.srcs)
389         self.test_parser()
390
391     def test_default_args(self):
392         prev = ipv4(proto=inet.IPPROTO_IGMP)
393         g = igmpv3_query()
394         prev.serialize(g, None)
395         buf = g.serialize(bytearray(), prev)
396         res = unpack_from(igmpv3_query._PACK_STR, six.binary_type(buf))
397         buf = bytearray(buf)
398         pack_into('!H', buf, 2, 0)
399
400         eq_(res[0], IGMP_TYPE_QUERY)
401         eq_(res[1], 100)
402         eq_(res[2], checksum(buf))
403         eq_(res[3], addrconv.ipv4.text_to_bin('0.0.0.0'))
404         eq_(res[4], 2)
405         eq_(res[5], 0)
406         eq_(res[6], 0)
407
408         # srcs without num
409         prev = ipv4(proto=inet.IPPROTO_IGMP)
410         srcs = ['192.168.1.1', '192.168.1.2', '192.168.1.3']
411         g = igmpv3_query(srcs=srcs)
412         prev.serialize(g, None)
413         buf = g.serialize(bytearray(), prev)
414         res = unpack_from(igmpv3_query._PACK_STR, six.binary_type(buf))
415         buf = bytearray(buf)
416         pack_into('!H', buf, 2, 0)
417
418         eq_(res[0], IGMP_TYPE_QUERY)
419         eq_(res[1], 100)
420         eq_(res[2], checksum(buf))
421         eq_(res[3], addrconv.ipv4.text_to_bin('0.0.0.0'))
422         eq_(res[4], 2)
423         eq_(res[5], 0)
424         eq_(res[6], len(srcs))
425
426         res = unpack_from('4s4s4s', six.binary_type(buf), igmpv3_query._MIN_LEN)
427
428         eq_(res[0], addrconv.ipv4.text_to_bin(srcs[0]))
429         eq_(res[1], addrconv.ipv4.text_to_bin(srcs[1]))
430         eq_(res[2], addrconv.ipv4.text_to_bin(srcs[2]))
431
432     def test_json(self):
433         jsondict = self.g.to_jsondict()
434         g = igmpv3_query.from_jsondict(jsondict['igmpv3_query'])
435         eq_(str(self.g), str(g))
436
437     def test_json_with_srcs(self):
438         self.setUp_with_srcs()
439         self.test_json()
440
441
442 class Test_igmpv3_report(unittest.TestCase):
443     """ Test case for Internet Group Management Protocol v3
444     Membership Report Message"""
445
446     def setUp(self):
447         self.msgtype = IGMP_TYPE_REPORT_V3
448         self.csum = 0
449         self.record_num = 0
450         self.records = []
451
452         self.buf = pack(igmpv3_report._PACK_STR, self.msgtype,
453                         self.csum, self.record_num)
454
455         self.g = igmpv3_report(
456             self.msgtype, self.csum, self.record_num, self.records)
457
458     def setUp_with_records(self):
459         self.record1 = igmpv3_report_group(
460             MODE_IS_INCLUDE, 0, 0, '225.0.0.1')
461         self.record2 = igmpv3_report_group(
462             MODE_IS_INCLUDE, 0, 2, '225.0.0.2',
463             ['172.16.10.10', '172.16.10.27'])
464         self.record3 = igmpv3_report_group(
465             MODE_IS_INCLUDE, 1, 0, '225.0.0.3', [], b'abc\x00')
466         self.record4 = igmpv3_report_group(
467             MODE_IS_INCLUDE, 2, 2, '225.0.0.4',
468             ['172.16.10.10', '172.16.10.27'], b'abcde\x00\x00\x00')
469         self.records = [self.record1, self.record2, self.record3,
470                         self.record4]
471         self.record_num = len(self.records)
472         self.buf = pack(igmpv3_report._PACK_STR, self.msgtype,
473                         self.csum, self.record_num)
474         self.buf += self.record1.serialize()
475         self.buf += self.record2.serialize()
476         self.buf += self.record3.serialize()
477         self.buf += self.record4.serialize()
478         self.g = igmpv3_report(
479             self.msgtype, self.csum, self.record_num, self.records)
480
481     def tearDown(self):
482         pass
483
484     def find_protocol(self, pkt, name):
485         for p in pkt.protocols:
486             if p.protocol_name == name:
487                 return p
488
489     def test_init(self):
490         eq_(self.msgtype, self.g.msgtype)
491         eq_(self.csum, self.g.csum)
492         eq_(self.record_num, self.g.record_num)
493         eq_(self.records, self.g.records)
494
495     def test_init_with_records(self):
496         self.setUp_with_records()
497         self.test_init()
498
499     def test_parser(self):
500         _res = self.g.parser(six.binary_type(self.buf))
501         if type(_res) is tuple:
502             res = _res[0]
503         else:
504             res = _res
505
506         eq_(res.msgtype, self.msgtype)
507         eq_(res.csum, self.csum)
508         eq_(res.record_num, self.record_num)
509         eq_(repr(res.records), repr(self.records))
510
511     def test_parser_with_records(self):
512         self.setUp_with_records()
513         self.test_parser()
514
515     def test_serialize(self):
516         data = bytearray()
517         prev = None
518         buf = self.g.serialize(data, prev)
519
520         res = unpack_from(igmpv3_report._PACK_STR, six.binary_type(buf))
521
522         eq_(res[0], self.msgtype)
523         eq_(res[1], checksum(self.buf))
524         eq_(res[2], self.record_num)
525
526     def test_serialize_with_records(self):
527         self.setUp_with_records()
528         data = bytearray()
529         prev = None
530         buf = six.binary_type(self.g.serialize(data, prev))
531
532         res = unpack_from(igmpv3_report._PACK_STR, buf)
533         offset = igmpv3_report._MIN_LEN
534         rec1 = igmpv3_report_group.parser(buf[offset:])
535         offset += len(rec1)
536         rec2 = igmpv3_report_group.parser(buf[offset:])
537         offset += len(rec2)
538         rec3 = igmpv3_report_group.parser(buf[offset:])
539         offset += len(rec3)
540         rec4 = igmpv3_report_group.parser(buf[offset:])
541
542         eq_(res[0], self.msgtype)
543         eq_(res[1], checksum(self.buf))
544         eq_(res[2], self.record_num)
545         eq_(repr(rec1), repr(self.record1))
546         eq_(repr(rec2), repr(self.record2))
547         eq_(repr(rec3), repr(self.record3))
548         eq_(repr(rec4), repr(self.record4))
549
550     def _build_igmp(self):
551         dl_dst = '11:22:33:44:55:66'
552         dl_src = 'aa:bb:cc:dd:ee:ff'
553         dl_type = ether.ETH_TYPE_IP
554         e = ethernet(dl_dst, dl_src, dl_type)
555
556         total_length = len(ipv4()) + len(self.g)
557         nw_proto = inet.IPPROTO_IGMP
558         nw_dst = '11.22.33.44'
559         nw_src = '55.66.77.88'
560         i = ipv4(total_length=total_length, src=nw_src, dst=nw_dst,
561                  proto=nw_proto, ttl=1)
562
563         p = Packet()
564
565         p.add_protocol(e)
566         p.add_protocol(i)
567         p.add_protocol(self.g)
568         p.serialize()
569         return p
570
571     def test_build_igmp(self):
572         p = self._build_igmp()
573
574         e = self.find_protocol(p, "ethernet")
575         ok_(e)
576         eq_(e.ethertype, ether.ETH_TYPE_IP)
577
578         i = self.find_protocol(p, "ipv4")
579         ok_(i)
580         eq_(i.proto, inet.IPPROTO_IGMP)
581
582         g = self.find_protocol(p, "igmpv3_report")
583         ok_(g)
584
585         eq_(g.msgtype, self.msgtype)
586         eq_(g.csum, checksum(self.buf))
587         eq_(g.record_num, self.record_num)
588         eq_(g.records, self.records)
589
590     def test_build_igmp_with_records(self):
591         self.setUp_with_records()
592         self.test_build_igmp()
593
594     def test_to_string(self):
595         igmp_values = {'msgtype': repr(self.msgtype),
596                        'csum': repr(self.csum),
597                        'record_num': repr(self.record_num),
598                        'records': repr(self.records)}
599         _g_str = ','.join(['%s=%s' % (k, igmp_values[k])
600                            for k, v in inspect.getmembers(self.g)
601                            if k in igmp_values])
602         g_str = '%s(%s)' % (igmpv3_report.__name__, _g_str)
603
604         eq_(str(self.g), g_str)
605         eq_(repr(self.g), g_str)
606
607     def test_to_string_with_records(self):
608         self.setUp_with_records()
609         self.test_to_string()
610
611     @raises(Exception)
612     def test_record_num_larger_than_records(self):
613         self.record1 = igmpv3_report_group(
614             MODE_IS_INCLUDE, 0, 0, '225.0.0.1')
615         self.record2 = igmpv3_report_group(
616             MODE_IS_INCLUDE, 0, 2, '225.0.0.2',
617             ['172.16.10.10', '172.16.10.27'])
618         self.record3 = igmpv3_report_group(
619             MODE_IS_INCLUDE, 1, 0, '225.0.0.3', [], b'abc\x00')
620         self.record4 = igmpv3_report_group(
621             MODE_IS_INCLUDE, 1, 2, '225.0.0.4',
622             ['172.16.10.10', '172.16.10.27'], b'abc\x00')
623         self.records = [self.record1, self.record2, self.record3,
624                         self.record4]
625         self.record_num = len(self.records) + 1
626         self.buf = pack(igmpv3_report._PACK_STR, self.msgtype,
627                         self.csum, self.record_num)
628         self.buf += self.record1.serialize()
629         self.buf += self.record2.serialize()
630         self.buf += self.record3.serialize()
631         self.buf += self.record4.serialize()
632         self.g = igmpv3_report(
633             self.msgtype, self.csum, self.record_num, self.records)
634         self.test_parser()
635
636     @raises(Exception)
637     def test_record_num_smaller_than_records(self):
638         self.record1 = igmpv3_report_group(
639             MODE_IS_INCLUDE, 0, 0, '225.0.0.1')
640         self.record2 = igmpv3_report_group(
641             MODE_IS_INCLUDE, 0, 2, '225.0.0.2',
642             ['172.16.10.10', '172.16.10.27'])
643         self.record3 = igmpv3_report_group(
644             MODE_IS_INCLUDE, 1, 0, '225.0.0.3', [], b'abc\x00')
645         self.record4 = igmpv3_report_group(
646             MODE_IS_INCLUDE, 1, 2, '225.0.0.4',
647             ['172.16.10.10', '172.16.10.27'], b'abc\x00')
648         self.records = [self.record1, self.record2, self.record3,
649                         self.record4]
650         self.record_num = len(self.records) - 1
651         self.buf = pack(igmpv3_report._PACK_STR, self.msgtype,
652                         self.csum, self.record_num)
653         self.buf += self.record1.serialize()
654         self.buf += self.record2.serialize()
655         self.buf += self.record3.serialize()
656         self.buf += self.record4.serialize()
657         self.g = igmpv3_report(
658             self.msgtype, self.csum, self.record_num, self.records)
659         self.test_parser()
660
661     def test_default_args(self):
662         prev = ipv4(proto=inet.IPPROTO_IGMP)
663         g = igmpv3_report()
664         prev.serialize(g, None)
665         buf = g.serialize(bytearray(), prev)
666         res = unpack_from(igmpv3_report._PACK_STR, six.binary_type(buf))
667         buf = bytearray(buf)
668         pack_into('!H', buf, 2, 0)
669
670         eq_(res[0], IGMP_TYPE_REPORT_V3)
671         eq_(res[1], checksum(buf))
672         eq_(res[2], 0)
673
674         # records without record_num
675         prev = ipv4(proto=inet.IPPROTO_IGMP)
676         record1 = igmpv3_report_group(
677             MODE_IS_INCLUDE, 0, 0, '225.0.0.1')
678         record2 = igmpv3_report_group(
679             MODE_IS_INCLUDE, 0, 2, '225.0.0.2',
680             ['172.16.10.10', '172.16.10.27'])
681         record3 = igmpv3_report_group(
682             MODE_IS_INCLUDE, 1, 0, '225.0.0.3', [], b'abc\x00')
683         record4 = igmpv3_report_group(
684             MODE_IS_INCLUDE, 1, 2, '225.0.0.4',
685             ['172.16.10.10', '172.16.10.27'], b'abc\x00')
686         records = [record1, record2, record3, record4]
687         g = igmpv3_report(records=records)
688         prev.serialize(g, None)
689         buf = g.serialize(bytearray(), prev)
690         res = unpack_from(igmpv3_report._PACK_STR, six.binary_type(buf))
691         buf = bytearray(buf)
692         pack_into('!H', buf, 2, 0)
693
694         eq_(res[0], IGMP_TYPE_REPORT_V3)
695         eq_(res[1], checksum(buf))
696         eq_(res[2], len(records))
697
698     def test_json(self):
699         jsondict = self.g.to_jsondict()
700         g = igmpv3_report.from_jsondict(jsondict['igmpv3_report'])
701         eq_(str(self.g), str(g))
702
703     def test_json_with_records(self):
704         self.setUp_with_records()
705         self.test_json()
706
707
708 class Test_igmpv3_report_group(unittest.TestCase):
709     """Test case for Group Records of
710     Internet Group Management Protocol v3 Membership Report Message"""
711
712     def setUp(self):
713         self.type_ = MODE_IS_INCLUDE
714         self.aux_len = 0
715         self.num = 0
716         self.address = '225.0.0.1'
717         self.srcs = []
718         self.aux = None
719
720         self.buf = pack(igmpv3_report_group._PACK_STR, self.type_,
721                         self.aux_len, self.num,
722                         addrconv.ipv4.text_to_bin(self.address))
723
724         self.g = igmpv3_report_group(
725             self.type_, self.aux_len, self.num, self.address,
726             self.srcs, self.aux)
727
728     def setUp_with_srcs(self):
729         self.srcs = ['192.168.1.1', '192.168.1.2', '192.168.1.3']
730         self.num = len(self.srcs)
731         self.buf = pack(igmpv3_report_group._PACK_STR, self.type_,
732                         self.aux_len, self.num,
733                         addrconv.ipv4.text_to_bin(self.address))
734         for src in self.srcs:
735             self.buf += pack('4s', addrconv.ipv4.text_to_bin(src))
736         self.g = igmpv3_report_group(
737             self.type_, self.aux_len, self.num, self.address,
738             self.srcs, self.aux)
739
740     def setUp_with_aux(self):
741         self.aux = b'\x01\x02\x03\x04\x05\x00\x00\x00'
742         self.aux_len = len(self.aux) // 4
743         self.buf = pack(igmpv3_report_group._PACK_STR, self.type_,
744                         self.aux_len, self.num,
745                         addrconv.ipv4.text_to_bin(self.address))
746         self.buf += self.aux
747         self.g = igmpv3_report_group(
748             self.type_, self.aux_len, self.num, self.address,
749             self.srcs, self.aux)
750
751     def setUp_with_srcs_and_aux(self):
752         self.srcs = ['192.168.1.1', '192.168.1.2', '192.168.1.3']
753         self.num = len(self.srcs)
754         self.aux = b'\x01\x02\x03\x04\x05\x00\x00\x00'
755         self.aux_len = len(self.aux) // 4
756         self.buf = pack(igmpv3_report_group._PACK_STR, self.type_,
757                         self.aux_len, self.num,
758                         addrconv.ipv4.text_to_bin(self.address))
759         for src in self.srcs:
760             self.buf += pack('4s', addrconv.ipv4.text_to_bin(src))
761         self.buf += self.aux
762         self.g = igmpv3_report_group(
763             self.type_, self.aux_len, self.num, self.address,
764             self.srcs, self.aux)
765
766     def tearDown(self):
767         pass
768
769     def test_init(self):
770         eq_(self.type_, self.g.type_)
771         eq_(self.aux_len, self.g.aux_len)
772         eq_(self.num, self.g.num)
773         eq_(self.address, self.g.address)
774         eq_(self.srcs, self.g.srcs)
775         eq_(self.aux, self.g.aux)
776
777     def test_init_with_srcs(self):
778         self.setUp_with_srcs()
779         self.test_init()
780
781     def test_init_with_aux(self):
782         self.setUp_with_aux()
783         self.test_init()
784
785     def test_init_with_srcs_and_aux(self):
786         self.setUp_with_srcs_and_aux()
787         self.test_init()
788
789     def test_parser(self):
790         _res = self.g.parser(self.buf)
791         if type(_res) is tuple:
792             res = _res[0]
793         else:
794             res = _res
795
796         eq_(res.type_, self.type_)
797         eq_(res.aux_len, self.aux_len)
798         eq_(res.num, self.num)
799         eq_(res.address, self.address)
800         eq_(res.srcs, self.srcs)
801         eq_(res.aux, self.aux)
802
803     def test_parser_with_srcs(self):
804         self.setUp_with_srcs()
805         self.test_parser()
806
807     def test_parser_with_aux(self):
808         self.setUp_with_aux()
809         self.test_parser()
810
811     def test_parser_with_srcs_and_aux(self):
812         self.setUp_with_srcs_and_aux()
813         self.test_parser()
814
815     def test_serialize(self):
816         buf = self.g.serialize()
817         res = unpack_from(igmpv3_report_group._PACK_STR, six.binary_type(buf))
818
819         eq_(res[0], self.type_)
820         eq_(res[1], self.aux_len)
821         eq_(res[2], self.num)
822         eq_(res[3], addrconv.ipv4.text_to_bin(self.address))
823
824     def test_serialize_with_srcs(self):
825         self.setUp_with_srcs()
826         buf = self.g.serialize()
827         res = unpack_from(igmpv3_report_group._PACK_STR, six.binary_type(buf))
828         (src1, src2, src3) = unpack_from('4s4s4s', six.binary_type(buf),
829                                          igmpv3_report_group._MIN_LEN)
830         eq_(res[0], self.type_)
831         eq_(res[1], self.aux_len)
832         eq_(res[2], self.num)
833         eq_(res[3], addrconv.ipv4.text_to_bin(self.address))
834         eq_(src1, addrconv.ipv4.text_to_bin(self.srcs[0]))
835         eq_(src2, addrconv.ipv4.text_to_bin(self.srcs[1]))
836         eq_(src3, addrconv.ipv4.text_to_bin(self.srcs[2]))
837
838     def test_serialize_with_aux(self):
839         self.setUp_with_aux()
840         buf = self.g.serialize()
841         res = unpack_from(igmpv3_report_group._PACK_STR, six.binary_type(buf))
842         (aux, ) = unpack_from('%ds' % (self.aux_len * 4), six.binary_type(buf),
843                               igmpv3_report_group._MIN_LEN)
844         eq_(res[0], self.type_)
845         eq_(res[1], self.aux_len)
846         eq_(res[2], self.num)
847         eq_(res[3], addrconv.ipv4.text_to_bin(self.address))
848         eq_(aux, self.aux)
849
850     def test_serialize_with_srcs_and_aux(self):
851         self.setUp_with_srcs_and_aux()
852         buf = self.g.serialize()
853         res = unpack_from(igmpv3_report_group._PACK_STR, six.binary_type(buf))
854         (src1, src2, src3) = unpack_from('4s4s4s', six.binary_type(buf),
855                                          igmpv3_report_group._MIN_LEN)
856         (aux, ) = unpack_from('%ds' % (self.aux_len * 4), six.binary_type(buf),
857                               igmpv3_report_group._MIN_LEN + 12)
858         eq_(res[0], self.type_)
859         eq_(res[1], self.aux_len)
860         eq_(res[2], self.num)
861         eq_(res[3], addrconv.ipv4.text_to_bin(self.address))
862         eq_(src1, addrconv.ipv4.text_to_bin(self.srcs[0]))
863         eq_(src2, addrconv.ipv4.text_to_bin(self.srcs[1]))
864         eq_(src3, addrconv.ipv4.text_to_bin(self.srcs[2]))
865         eq_(aux, self.aux)
866
867     def test_to_string(self):
868         igmp_values = {'type_': repr(self.type_),
869                        'aux_len': repr(self.aux_len),
870                        'num': repr(self.num),
871                        'address': repr(self.address),
872                        'srcs': repr(self.srcs),
873                        'aux': repr(self.aux)}
874         _g_str = ','.join(['%s=%s' % (k, igmp_values[k])
875                            for k, v in inspect.getmembers(self.g)
876                            if k in igmp_values])
877         g_str = '%s(%s)' % (igmpv3_report_group.__name__, _g_str)
878
879         eq_(str(self.g), g_str)
880         eq_(repr(self.g), g_str)
881
882     def test_to_string_with_srcs(self):
883         self.setUp_with_srcs()
884         self.test_to_string()
885
886     def test_to_string_with_aux(self):
887         self.setUp_with_aux()
888         self.test_to_string()
889
890     def test_to_string_with_srcs_and_aux(self):
891         self.setUp_with_srcs_and_aux()
892         self.test_to_string()
893
894     def test_len(self):
895         eq_(len(self.g), 8)
896
897     def test_len_with_srcs(self):
898         self.setUp_with_srcs()
899         eq_(len(self.g), 20)
900
901     def test_len_with_aux(self):
902         self.setUp_with_aux()
903         eq_(len(self.g), 16)
904
905     def test_len_with_srcs_and_aux(self):
906         self.setUp_with_srcs_and_aux()
907         eq_(len(self.g), 28)
908
909     @raises
910     def test_num_larger_than_srcs(self):
911         self.srcs = ['192.168.1.1', '192.168.1.2', '192.168.1.3']
912         self.num = len(self.srcs) + 1
913         self.buf = pack(igmpv3_report_group._PACK_STR, self.type_,
914                         self.aux_len, self.num,
915                         addrconv.ipv4.text_to_bin(self.address))
916         for src in self.srcs:
917             self.buf += pack('4s', addrconv.ipv4.text_to_bin(src))
918         self.g = igmpv3_report_group(
919             self.type_, self.aux_len, self.num, self.address,
920             self.srcs, self.aux)
921         self.test_parser()
922
923     @raises
924     def test_num_smaller_than_srcs(self):
925         self.srcs = ['192.168.1.1', '192.168.1.2', '192.168.1.3']
926         self.num = len(self.srcs) - 1
927         self.buf = pack(igmpv3_report_group._PACK_STR, self.type_,
928                         self.aux_len, self.num,
929                         addrconv.ipv4.text_to_bin(self.address))
930         for src in self.srcs:
931             self.buf += pack('4s', addrconv.ipv4.text_to_bin(src))
932         self.g = igmpv3_report_group(
933             self.type_, self.aux_len, self.num, self.address,
934             self.srcs, self.aux)
935         self.test_parser()
936
937     @raises
938     def test_aux_len_larger_than_aux(self):
939         self.aux = b'\x01\x02\x03\x04\x05\x00\x00\x00'
940         self.aux_len = len(self.aux) // 4 + 1
941         self.buf = pack(igmpv3_report_group._PACK_STR, self.type_,
942                         self.aux_len, self.num,
943                         addrconv.ipv4.text_to_bin(self.address))
944         self.buf += self.aux
945         self.g = igmpv3_report_group(
946             self.type_, self.aux_len, self.num, self.address,
947             self.srcs, self.aux)
948         self.test_parser()
949
950     @raises
951     def test_aux_len_smaller_than_aux(self):
952         self.aux = b'\x01\x02\x03\x04\x05\x00\x00\x00'
953         self.aux_len = len(self.aux) // 4 - 1
954         self.buf = pack(igmpv3_report_group._PACK_STR, self.type_,
955                         self.aux_len, self.num,
956                         addrconv.ipv4.text_to_bin(self.address))
957         self.buf += self.aux
958         self.g = igmpv3_report_group(
959             self.type_, self.aux_len, self.num, self.address,
960             self.srcs, self.aux)
961         self.test_parser()
962
963     def test_default_args(self):
964         rep = igmpv3_report_group()
965         buf = rep.serialize()
966         res = unpack_from(igmpv3_report_group._PACK_STR, six.binary_type(buf))
967
968         eq_(res[0], 0)
969         eq_(res[1], 0)
970         eq_(res[2], 0)
971         eq_(res[3], addrconv.ipv4.text_to_bin('0.0.0.0'))
972
973         # srcs without num
974         srcs = ['192.168.1.1', '192.168.1.2', '192.168.1.3']
975         rep = igmpv3_report_group(srcs=srcs)
976         buf = rep.serialize()
977         res = unpack_from(igmpv3_report_group._PACK_STR, six.binary_type(buf))
978
979         eq_(res[0], 0)
980         eq_(res[1], 0)
981         eq_(res[2], len(srcs))
982         eq_(res[3], addrconv.ipv4.text_to_bin('0.0.0.0'))
983
984         res = unpack_from('4s4s4s', six.binary_type(buf),
985                           igmpv3_report_group._MIN_LEN)
986
987         eq_(res[0], addrconv.ipv4.text_to_bin(srcs[0]))
988         eq_(res[1], addrconv.ipv4.text_to_bin(srcs[1]))
989         eq_(res[2], addrconv.ipv4.text_to_bin(srcs[2]))
990
991         # aux without aux_len
992         aux = b'abcde'
993         rep = igmpv3_report_group(aux=aux)
994         buf = rep.serialize()
995         res = unpack_from(igmpv3_report_group._PACK_STR, six.binary_type(buf))
996
997         eq_(res[0], 0)
998         eq_(res[1], 2)
999         eq_(res[2], 0)
1000         eq_(res[3], addrconv.ipv4.text_to_bin('0.0.0.0'))
1001         eq_(buf[igmpv3_report_group._MIN_LEN:], b'abcde\x00\x00\x00')