backing up
[vsorcdistro/.git] / ryu / build / lib.linux-armv7l-2.7 / ryu / tests / unit / packet / test_icmpv6.py
1 # Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #    http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 # vim: tabstop=4 shiftwidth=4 softtabstop=4
17
18 import unittest
19 import logging
20 import six
21 import struct
22 import inspect
23
24 from nose.tools import ok_, eq_, nottest, raises
25 from ryu.ofproto import ether, inet
26 from ryu.lib.packet.ethernet import ethernet
27 from ryu.lib.packet.packet import Packet
28 from ryu.lib.packet import icmpv6
29 from ryu.lib.packet.ipv6 import ipv6
30 from ryu.lib.packet import packet_utils
31 from ryu.lib import addrconv
32
33
34 LOG = logging.getLogger(__name__)
35
36
37 def icmpv6_csum(prev, buf):
38     ph = struct.pack('!16s16sI3xB',
39                      addrconv.ipv6.text_to_bin(prev.src),
40                      addrconv.ipv6.text_to_bin(prev.dst),
41                      prev.payload_length, prev.nxt)
42     h = bytearray(buf)
43     struct.pack_into('!H', h, 2, 0)
44
45     return packet_utils.checksum(ph + h)
46
47
48 class Test_icmpv6_header(unittest.TestCase):
49     type_ = 255
50     code = 0
51     csum = 207
52     buf = b'\xff\x00\x00\xcf'
53     icmp = icmpv6.icmpv6(type_, code, 0)
54
55     def setUp(self):
56         pass
57
58     def tearDown(self):
59         pass
60
61     def test_init(self):
62         eq_(self.type_, self.icmp.type_)
63         eq_(self.code, self.icmp.code)
64         eq_(0, self.icmp.csum)
65
66     def test_parser(self):
67         msg, n, _ = self.icmp.parser(self.buf)
68
69         eq_(msg.type_, self.type_)
70         eq_(msg.code, self.code)
71         eq_(msg.csum, self.csum)
72         eq_(msg.data, b'')
73         eq_(n, None)
74
75     def test_serialize(self):
76         src_ipv6 = 'fe80::200:ff:fe00:ef'
77         dst_ipv6 = 'fe80::200:ff:fe00:1'
78         prev = ipv6(6, 0, 0, 4, 58, 255, src_ipv6, dst_ipv6)
79
80         buf = self.icmp.serialize(bytearray(), prev)
81         (type_, code, csum) = struct.unpack(self.icmp._PACK_STR, six.binary_type(buf))
82
83         eq_(type_, self.type_)
84         eq_(code, self.code)
85         eq_(csum, self.csum)
86
87     @raises(struct.error)
88     def test_malformed_icmpv6(self):
89         m_short_buf = self.buf[1:self.icmp._MIN_LEN]
90         self.icmp.parser(m_short_buf)
91
92     def test_default_args(self):
93         prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
94         ic = icmpv6.icmpv6()
95         prev.serialize(ic, None)
96         buf = ic.serialize(bytearray(), prev)
97         res = struct.unpack(icmpv6.icmpv6._PACK_STR, six.binary_type(buf))
98
99         eq_(res[0], 0)
100         eq_(res[1], 0)
101         eq_(res[2], icmpv6_csum(prev, buf))
102
103     def test_json(self):
104         jsondict = self.icmp.to_jsondict()
105         icmp = icmpv6.icmpv6.from_jsondict(jsondict['icmpv6'])
106         eq_(str(self.icmp), str(icmp))
107
108
109 class Test_icmpv6_echo_request(unittest.TestCase):
110     type_ = 128
111     code = 0
112     csum = 0xa572
113     id_ = 0x7620
114     seq = 0
115     data = b'\x01\xc9\xe7\x36\xd3\x39\x06\x00'
116     buf = b'\x80\x00\xa5\x72\x76\x20\x00\x00'
117
118     def setUp(self):
119         pass
120
121     def tearDown(self):
122         pass
123
124     def test_init(self):
125         echo = icmpv6.echo(0, 0)
126         eq_(echo.id, 0)
127         eq_(echo.seq, 0)
128         eq_(echo.data, None)
129
130     def _test_parser(self, data=None):
131         buf = self.buf + (data or b'')
132         msg, n, _ = icmpv6.icmpv6.parser(buf)
133
134         eq_(msg.type_, self.type_)
135         eq_(msg.code, self.code)
136         eq_(msg.csum, self.csum)
137         eq_(msg.data.id, self.id_)
138         eq_(msg.data.seq, self.seq)
139         eq_(msg.data.data, data)
140         eq_(n, None)
141
142     def test_parser_without_data(self):
143         self._test_parser()
144
145     def test_parser_with_data(self):
146         self._test_parser(self.data)
147
148     def _test_serialize(self, echo_data=None):
149         buf = self.buf + (echo_data or b'')
150         src_ipv6 = '3ffe:507:0:1:200:86ff:fe05:80da'
151         dst_ipv6 = '3ffe:501:0:1001::2'
152         prev = ipv6(6, 0, 0, len(buf), 64, 255, src_ipv6, dst_ipv6)
153         echo_csum = icmpv6_csum(prev, buf)
154
155         echo = icmpv6.echo(self.id_, self.seq, echo_data)
156         icmp = icmpv6.icmpv6(self.type_, self.code, 0, echo)
157         buf = six.binary_type(icmp.serialize(bytearray(), prev))
158
159         (type_, code, csum) = struct.unpack_from(icmp._PACK_STR, buf, 0)
160         (id_, seq) = struct.unpack_from(echo._PACK_STR, buf, icmp._MIN_LEN)
161         data = buf[(icmp._MIN_LEN + echo._MIN_LEN):]
162         data = data if len(data) != 0 else None
163
164         eq_(type_, self.type_)
165         eq_(code, self.code)
166         eq_(csum, echo_csum)
167         eq_(id_, self.id_)
168         eq_(seq, self.seq)
169         eq_(data, echo_data)
170
171     def test_serialize_without_data(self):
172         self._test_serialize()
173
174     def test_serialize_with_data(self):
175         self._test_serialize(self.data)
176
177     def test_to_string(self):
178         ec = icmpv6.echo(self.id_, self.seq, self.data)
179         ic = icmpv6.icmpv6(self.type_, self.code, self.csum, ec)
180
181         echo_values = {'id': self.id_,
182                        'seq': self.seq,
183                        'data': self.data}
184         _echo_str = ','.join(['%s=%s' % (k, repr(echo_values[k]))
185                               for k, v in inspect.getmembers(ec)
186                               if k in echo_values])
187         echo_str = '%s(%s)' % (icmpv6.echo.__name__, _echo_str)
188
189         icmp_values = {'type_': repr(self.type_),
190                        'code': repr(self.code),
191                        'csum': repr(self.csum),
192                        'data': echo_str}
193         _ic_str = ','.join(['%s=%s' % (k, icmp_values[k])
194                             for k, v in inspect.getmembers(ic)
195                             if k in icmp_values])
196         ic_str = '%s(%s)' % (icmpv6.icmpv6.__name__, _ic_str)
197
198         eq_(str(ic), ic_str)
199         eq_(repr(ic), ic_str)
200
201     def test_default_args(self):
202         prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
203         ic = icmpv6.icmpv6(
204             type_=icmpv6.ICMPV6_ECHO_REQUEST, data=icmpv6.echo())
205         prev.serialize(ic, None)
206         buf = ic.serialize(bytearray(), prev)
207         res = struct.unpack(icmpv6.icmpv6._PACK_STR, six.binary_type(buf[:4]))
208
209         eq_(res[0], icmpv6.ICMPV6_ECHO_REQUEST)
210         eq_(res[1], 0)
211         eq_(res[2], icmpv6_csum(prev, buf))
212
213         res = struct.unpack(icmpv6.echo._PACK_STR, six.binary_type(buf[4:]))
214
215         eq_(res[0], 0)
216         eq_(res[1], 0)
217
218     def test_json(self):
219         ec = icmpv6.echo(self.id_, self.seq, self.data)
220         ic1 = icmpv6.icmpv6(self.type_, self.code, self.csum, ec)
221         jsondict = ic1.to_jsondict()
222         ic2 = icmpv6.icmpv6.from_jsondict(jsondict['icmpv6'])
223         eq_(str(ic1), str(ic2))
224
225
226 class Test_icmpv6_echo_reply(Test_icmpv6_echo_request):
227     def setUp(self):
228         self.type_ = 129
229         self.csum = 0xa472
230         self.buf = b'\x81\x00\xa4\x72\x76\x20\x00\x00'
231
232     def test_default_args(self):
233         prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
234         ic = icmpv6.icmpv6(
235             type_=icmpv6.ICMPV6_ECHO_REPLY, data=icmpv6.echo())
236         prev.serialize(ic, None)
237         buf = ic.serialize(bytearray(), prev)
238         res = struct.unpack(icmpv6.icmpv6._PACK_STR, six.binary_type(buf[:4]))
239
240         eq_(res[0], icmpv6.ICMPV6_ECHO_REPLY)
241         eq_(res[1], 0)
242         eq_(res[2], icmpv6_csum(prev, buf))
243
244         res = struct.unpack(icmpv6.echo._PACK_STR, six.binary_type(buf[4:]))
245
246         eq_(res[0], 0)
247         eq_(res[1], 0)
248
249
250 class Test_icmpv6_neighbor_solicit(unittest.TestCase):
251     type_ = 135
252     code = 0
253     csum = 0x952d
254     res = 0
255     dst = '3ffe:507:0:1:200:86ff:fe05:80da'
256     nd_type = 1
257     nd_length = 1
258     nd_hw_src = '00:60:97:07:69:ea'
259     data = b'\x01\x01\x00\x60\x97\x07\x69\xea'
260     buf = b'\x87\x00\x95\x2d\x00\x00\x00\x00' \
261         + b'\x3f\xfe\x05\x07\x00\x00\x00\x01' \
262         + b'\x02\x00\x86\xff\xfe\x05\x80\xda'
263     src_ipv6 = '3ffe:507:0:1:200:86ff:fe05:80da'
264     dst_ipv6 = '3ffe:501:0:1001::2'
265
266     def setUp(self):
267         pass
268
269     def tearDown(self):
270         pass
271
272     def test_init(self):
273         nd = icmpv6.nd_neighbor(self.res, self.dst)
274         eq_(nd.res, self.res)
275         eq_(nd.dst, self.dst)
276         eq_(nd.option, None)
277
278     def _test_parser(self, data=None):
279         buf = self.buf + (data or b'')
280         msg, n, _ = icmpv6.icmpv6.parser(buf)
281
282         eq_(msg.type_, self.type_)
283         eq_(msg.code, self.code)
284         eq_(msg.csum, self.csum)
285         eq_(msg.data.res, self.res)
286         eq_(addrconv.ipv6.text_to_bin(msg.data.dst),
287             addrconv.ipv6.text_to_bin(self.dst))
288         eq_(n, None)
289         if data:
290             nd = msg.data.option
291             eq_(nd.length, self.nd_length)
292             eq_(nd.hw_src, self.nd_hw_src)
293             eq_(nd.data, None)
294
295     def test_parser_without_data(self):
296         self._test_parser()
297
298     def test_parser_with_data(self):
299         self._test_parser(self.data)
300
301     def test_serialize_without_data(self):
302         nd = icmpv6.nd_neighbor(self.res, self.dst)
303         prev = ipv6(6, 0, 0, 24, 64, 255, self.src_ipv6, self.dst_ipv6)
304         nd_csum = icmpv6_csum(prev, self.buf)
305
306         icmp = icmpv6.icmpv6(self.type_, self.code, 0, nd)
307         buf = six.binary_type(icmp.serialize(bytearray(), prev))
308
309         (type_, code, csum) = struct.unpack_from(icmp._PACK_STR, buf, 0)
310         (res, dst) = struct.unpack_from(nd._PACK_STR, buf, icmp._MIN_LEN)
311         data = buf[(icmp._MIN_LEN + nd._MIN_LEN):]
312
313         eq_(type_, self.type_)
314         eq_(code, self.code)
315         eq_(csum, nd_csum)
316         eq_(res >> 29, self.res)
317         eq_(dst, addrconv.ipv6.text_to_bin(self.dst))
318         eq_(data, b'')
319
320     def test_serialize_with_data(self):
321         nd_opt = icmpv6.nd_option_sla(self.nd_length, self.nd_hw_src)
322         nd = icmpv6.nd_neighbor(self.res, self.dst, nd_opt)
323         prev = ipv6(6, 0, 0, 32, 64, 255, self.src_ipv6, self.dst_ipv6)
324         nd_csum = icmpv6_csum(prev, self.buf + self.data)
325
326         icmp = icmpv6.icmpv6(self.type_, self.code, 0, nd)
327         buf = six.binary_type(icmp.serialize(bytearray(), prev))
328
329         (type_, code, csum) = struct.unpack_from(icmp._PACK_STR, buf, 0)
330         (res, dst) = struct.unpack_from(nd._PACK_STR, buf, icmp._MIN_LEN)
331         (nd_type, nd_length, nd_hw_src) = struct.unpack_from(
332             nd_opt._PACK_STR, buf, icmp._MIN_LEN + nd._MIN_LEN)
333         data = buf[(icmp._MIN_LEN + nd._MIN_LEN + 8):]
334
335         eq_(type_, self.type_)
336         eq_(code, self.code)
337         eq_(csum, nd_csum)
338         eq_(res >> 29, self.res)
339         eq_(dst, addrconv.ipv6.text_to_bin(self.dst))
340         eq_(nd_type, self.nd_type)
341         eq_(nd_length, self.nd_length)
342         eq_(nd_hw_src, addrconv.mac.text_to_bin(self.nd_hw_src))
343
344     def test_to_string(self):
345         nd_opt = icmpv6.nd_option_sla(self.nd_length, self.nd_hw_src)
346         nd = icmpv6.nd_neighbor(self.res, self.dst, nd_opt)
347         ic = icmpv6.icmpv6(self.type_, self.code, self.csum, nd)
348
349         nd_opt_values = {'length': self.nd_length,
350                          'hw_src': self.nd_hw_src,
351                          'data': None}
352         _nd_opt_str = ','.join(['%s=%s' % (k, repr(nd_opt_values[k]))
353                                 for k, v in inspect.getmembers(nd_opt)
354                                 if k in nd_opt_values])
355         nd_opt_str = '%s(%s)' % (icmpv6.nd_option_sla.__name__, _nd_opt_str)
356
357         nd_values = {'res': repr(nd.res),
358                      'dst': repr(self.dst),
359                      'option': nd_opt_str}
360         _nd_str = ','.join(['%s=%s' % (k, nd_values[k])
361                             for k, v in inspect.getmembers(nd)
362                             if k in nd_values])
363         nd_str = '%s(%s)' % (icmpv6.nd_neighbor.__name__, _nd_str)
364
365         icmp_values = {'type_': repr(self.type_),
366                        'code': repr(self.code),
367                        'csum': repr(self.csum),
368                        'data': nd_str}
369         _ic_str = ','.join(['%s=%s' % (k, icmp_values[k])
370                             for k, v in inspect.getmembers(ic)
371                             if k in icmp_values])
372         ic_str = '%s(%s)' % (icmpv6.icmpv6.__name__, _ic_str)
373
374         eq_(str(ic), ic_str)
375         eq_(repr(ic), ic_str)
376
377     def test_default_args(self):
378         prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
379         ic = icmpv6.icmpv6(
380             type_=icmpv6.ND_NEIGHBOR_SOLICIT, data=icmpv6.nd_neighbor())
381         prev.serialize(ic, None)
382         buf = ic.serialize(bytearray(), prev)
383         res = struct.unpack(icmpv6.icmpv6._PACK_STR, six.binary_type(buf[:4]))
384
385         eq_(res[0], icmpv6.ND_NEIGHBOR_SOLICIT)
386         eq_(res[1], 0)
387         eq_(res[2], icmpv6_csum(prev, buf))
388
389         res = struct.unpack(icmpv6.nd_neighbor._PACK_STR, six.binary_type(buf[4:]))
390
391         eq_(res[0], 0)
392         eq_(res[1], addrconv.ipv6.text_to_bin('::'))
393
394         # with nd_option_sla
395         prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
396         ic = icmpv6.icmpv6(
397             type_=icmpv6.ND_NEIGHBOR_SOLICIT,
398             data=icmpv6.nd_neighbor(
399                 option=icmpv6.nd_option_sla()))
400         prev.serialize(ic, None)
401         buf = ic.serialize(bytearray(), prev)
402         res = struct.unpack(icmpv6.icmpv6._PACK_STR, six.binary_type(buf[:4]))
403
404         eq_(res[0], icmpv6.ND_NEIGHBOR_SOLICIT)
405         eq_(res[1], 0)
406         eq_(res[2], icmpv6_csum(prev, buf))
407
408         res = struct.unpack(icmpv6.nd_neighbor._PACK_STR,
409                             six.binary_type(buf[4:24]))
410
411         eq_(res[0], 0)
412         eq_(res[1], addrconv.ipv6.text_to_bin('::'))
413
414         res = struct.unpack(icmpv6.nd_option_sla._PACK_STR,
415                             six.binary_type(buf[24:]))
416
417         eq_(res[0], icmpv6.ND_OPTION_SLA)
418         eq_(res[1], len(icmpv6.nd_option_sla()) // 8)
419         eq_(res[2], addrconv.mac.text_to_bin('00:00:00:00:00:00'))
420
421     def test_json(self):
422         nd_opt = icmpv6.nd_option_sla(self.nd_length, self.nd_hw_src)
423         nd = icmpv6.nd_neighbor(self.res, self.dst, nd_opt)
424         ic1 = icmpv6.icmpv6(self.type_, self.code, self.csum, nd)
425         jsondict = ic1.to_jsondict()
426         ic2 = icmpv6.icmpv6.from_jsondict(jsondict['icmpv6'])
427         eq_(str(ic1), str(ic2))
428
429
430 class Test_icmpv6_neighbor_advert(Test_icmpv6_neighbor_solicit):
431     def setUp(self):
432         self.type_ = 136
433         self.csum = 0xb8ba
434         self.res = 7
435         self.dst = '3ffe:507:0:1:260:97ff:fe07:69ea'
436         self.nd_type = 2
437         self.nd_length = 1
438         self.nd_data = None
439         self.nd_hw_src = '00:60:97:07:69:ea'
440         self.data = b'\x02\x01\x00\x60\x97\x07\x69\xea'
441         self.buf = b'\x88\x00\xb8\xba\xe0\x00\x00\x00' \
442             + b'\x3f\xfe\x05\x07\x00\x00\x00\x01' \
443             + b'\x02\x60\x97\xff\xfe\x07\x69\xea'
444
445     def test_serialize_with_data(self):
446         nd_opt = icmpv6.nd_option_tla(self.nd_length, self.nd_hw_src)
447         nd = icmpv6.nd_neighbor(self.res, self.dst, nd_opt)
448         prev = ipv6(6, 0, 0, 32, 64, 255, self.src_ipv6, self.dst_ipv6)
449         nd_csum = icmpv6_csum(prev, self.buf + self.data)
450
451         icmp = icmpv6.icmpv6(self.type_, self.code, 0, nd)
452         buf = six.binary_type(icmp.serialize(bytearray(), prev))
453
454         (type_, code, csum) = struct.unpack_from(icmp._PACK_STR, buf, 0)
455         (res, dst) = struct.unpack_from(nd._PACK_STR, buf, icmp._MIN_LEN)
456         (nd_type, nd_length, nd_hw_src) = struct.unpack_from(
457             nd_opt._PACK_STR, buf, icmp._MIN_LEN + nd._MIN_LEN)
458         data = buf[(icmp._MIN_LEN + nd._MIN_LEN + 8):]
459
460         eq_(type_, self.type_)
461         eq_(code, self.code)
462         eq_(csum, nd_csum)
463         eq_(res >> 29, self.res)
464         eq_(dst, addrconv.ipv6.text_to_bin(self.dst))
465         eq_(nd_type, self.nd_type)
466         eq_(nd_length, self.nd_length)
467         eq_(nd_hw_src, addrconv.mac.text_to_bin(self.nd_hw_src))
468
469     def test_to_string(self):
470         nd_opt = icmpv6.nd_option_tla(self.nd_length, self.nd_hw_src)
471         nd = icmpv6.nd_neighbor(self.res, self.dst, nd_opt)
472         ic = icmpv6.icmpv6(self.type_, self.code, self.csum, nd)
473
474         nd_opt_values = {'length': self.nd_length,
475                          'hw_src': self.nd_hw_src,
476                          'data': None}
477         _nd_opt_str = ','.join(['%s=%s' % (k, repr(nd_opt_values[k]))
478                                 for k, v in inspect.getmembers(nd_opt)
479                                 if k in nd_opt_values])
480         nd_opt_str = '%s(%s)' % (icmpv6.nd_option_tla.__name__, _nd_opt_str)
481
482         nd_values = {'res': repr(nd.res),
483                      'dst': repr(self.dst),
484                      'option': nd_opt_str}
485         _nd_str = ','.join(['%s=%s' % (k, nd_values[k])
486                             for k, v in inspect.getmembers(nd)
487                             if k in nd_values])
488         nd_str = '%s(%s)' % (icmpv6.nd_neighbor.__name__, _nd_str)
489
490         icmp_values = {'type_': repr(self.type_),
491                        'code': repr(self.code),
492                        'csum': repr(self.csum),
493                        'data': nd_str}
494         _ic_str = ','.join(['%s=%s' % (k, icmp_values[k])
495                             for k, v in inspect.getmembers(ic)
496                             if k in icmp_values])
497         ic_str = '%s(%s)' % (icmpv6.icmpv6.__name__, _ic_str)
498
499         eq_(str(ic), ic_str)
500         eq_(repr(ic), ic_str)
501
502     def test_default_args(self):
503         prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
504         ic = icmpv6.icmpv6(
505             type_=icmpv6.ND_NEIGHBOR_ADVERT, data=icmpv6.nd_neighbor())
506         prev.serialize(ic, None)
507         buf = ic.serialize(bytearray(), prev)
508         res = struct.unpack(icmpv6.icmpv6._PACK_STR, six.binary_type(buf[:4]))
509
510         eq_(res[0], icmpv6.ND_NEIGHBOR_ADVERT)
511         eq_(res[1], 0)
512         eq_(res[2], icmpv6_csum(prev, buf))
513
514         res = struct.unpack(icmpv6.nd_neighbor._PACK_STR, six.binary_type(buf[4:]))
515
516         eq_(res[0], 0)
517         eq_(res[1], addrconv.ipv6.text_to_bin('::'))
518
519         # with nd_option_tla
520         prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
521         ic = icmpv6.icmpv6(
522             type_=icmpv6.ND_NEIGHBOR_ADVERT,
523             data=icmpv6.nd_neighbor(
524                 option=icmpv6.nd_option_tla()))
525         prev.serialize(ic, None)
526         buf = ic.serialize(bytearray(), prev)
527         res = struct.unpack(icmpv6.icmpv6._PACK_STR, six.binary_type(buf[:4]))
528
529         eq_(res[0], icmpv6.ND_NEIGHBOR_ADVERT)
530         eq_(res[1], 0)
531         eq_(res[2], icmpv6_csum(prev, buf))
532
533         res = struct.unpack(icmpv6.nd_neighbor._PACK_STR,
534                             six.binary_type(buf[4:24]))
535
536         eq_(res[0], 0)
537         eq_(res[1], addrconv.ipv6.text_to_bin('::'))
538
539         res = struct.unpack(icmpv6.nd_option_tla._PACK_STR,
540                             six.binary_type(buf[24:]))
541
542         eq_(res[0], icmpv6.ND_OPTION_TLA)
543         eq_(res[1], len(icmpv6.nd_option_tla()) // 8)
544         eq_(res[2], addrconv.mac.text_to_bin('00:00:00:00:00:00'))
545
546
547 class Test_icmpv6_router_solicit(unittest.TestCase):
548     type_ = 133
549     code = 0
550     csum = 0x97d9
551     res = 0
552     nd_type = 1
553     nd_length = 1
554     nd_hw_src = '12:2d:a5:6d:bc:0f'
555     data = b'\x00\x00\x00\x00\x01\x01\x12\x2d\xa5\x6d\xbc\x0f'
556     buf = b'\x85\x00\x97\xd9'
557     src_ipv6 = '3ffe:507:0:1:200:86ff:fe05:80da'
558     dst_ipv6 = '3ffe:501:0:1001::2'
559
560     def setUp(self):
561         pass
562
563     def tearDown(self):
564         pass
565
566     def test_init(self):
567         rs = icmpv6.nd_router_solicit(self.res)
568         eq_(rs.res, self.res)
569         eq_(rs.option, None)
570
571     def _test_parser(self, data=None):
572         buf = self.buf + (data or b'')
573         msg, n, _ = icmpv6.icmpv6.parser(buf)
574
575         eq_(msg.type_, self.type_)
576         eq_(msg.code, self.code)
577         eq_(msg.csum, self.csum)
578         if data is not None:
579             eq_(msg.data.res, self.res)
580         eq_(n, None)
581         if data:
582             rs = msg.data.option
583             eq_(rs.length, self.nd_length)
584             eq_(rs.hw_src, self.nd_hw_src)
585             eq_(rs.data, None)
586
587     def test_parser_without_data(self):
588         self._test_parser()
589
590     def test_parser_with_data(self):
591         self._test_parser(self.data)
592
593     def test_serialize_without_data(self):
594         rs = icmpv6.nd_router_solicit(self.res)
595         prev = ipv6(6, 0, 0, 8, 64, 255, self.src_ipv6, self.dst_ipv6)
596         rs_csum = icmpv6_csum(prev, self.buf)
597
598         icmp = icmpv6.icmpv6(self.type_, self.code, 0, rs)
599         buf = six.binary_type(icmp.serialize(bytearray(), prev))
600
601         (type_, code, csum) = struct.unpack_from(icmp._PACK_STR, buf, 0)
602         res = struct.unpack_from(rs._PACK_STR, buf, icmp._MIN_LEN)
603         data = buf[(icmp._MIN_LEN + rs._MIN_LEN):]
604
605         eq_(type_, self.type_)
606         eq_(code, self.code)
607         eq_(csum, rs_csum)
608         eq_(res[0], self.res)
609         eq_(data, b'')
610
611     def test_serialize_with_data(self):
612         nd_opt = icmpv6.nd_option_sla(self.nd_length, self.nd_hw_src)
613         rs = icmpv6.nd_router_solicit(self.res, nd_opt)
614         prev = ipv6(6, 0, 0, 16, 64, 255, self.src_ipv6, self.dst_ipv6)
615         rs_csum = icmpv6_csum(prev, self.buf + self.data)
616
617         icmp = icmpv6.icmpv6(self.type_, self.code, 0, rs)
618         buf = six.binary_type(icmp.serialize(bytearray(), prev))
619
620         (type_, code, csum) = struct.unpack_from(icmp._PACK_STR, buf, 0)
621         res = struct.unpack_from(rs._PACK_STR, buf, icmp._MIN_LEN)
622         (nd_type, nd_length, nd_hw_src) = struct.unpack_from(
623             nd_opt._PACK_STR, buf, icmp._MIN_LEN + rs._MIN_LEN)
624         data = buf[(icmp._MIN_LEN + rs._MIN_LEN + 8):]
625
626         eq_(type_, self.type_)
627         eq_(code, self.code)
628         eq_(csum, rs_csum)
629         eq_(res[0], self.res)
630         eq_(nd_type, self.nd_type)
631         eq_(nd_length, self.nd_length)
632         eq_(nd_hw_src, addrconv.mac.text_to_bin(self.nd_hw_src))
633
634     def test_to_string(self):
635         nd_opt = icmpv6.nd_option_sla(self.nd_length, self.nd_hw_src)
636         rs = icmpv6.nd_router_solicit(self.res, nd_opt)
637         ic = icmpv6.icmpv6(self.type_, self.code, self.csum, rs)
638
639         nd_opt_values = {'length': self.nd_length,
640                          'hw_src': self.nd_hw_src,
641                          'data': None}
642         _nd_opt_str = ','.join(['%s=%s' % (k, repr(nd_opt_values[k]))
643                                 for k, v in inspect.getmembers(nd_opt)
644                                 if k in nd_opt_values])
645         nd_opt_str = '%s(%s)' % (icmpv6.nd_option_sla.__name__, _nd_opt_str)
646
647         rs_values = {'res': repr(rs.res),
648                      'option': nd_opt_str}
649         _rs_str = ','.join(['%s=%s' % (k, rs_values[k])
650                             for k, v in inspect.getmembers(rs)
651                             if k in rs_values])
652         rs_str = '%s(%s)' % (icmpv6.nd_router_solicit.__name__, _rs_str)
653
654         icmp_values = {'type_': repr(self.type_),
655                        'code': repr(self.code),
656                        'csum': repr(self.csum),
657                        'data': rs_str}
658         _ic_str = ','.join(['%s=%s' % (k, icmp_values[k])
659                             for k, v in inspect.getmembers(ic)
660                             if k in icmp_values])
661         ic_str = '%s(%s)' % (icmpv6.icmpv6.__name__, _ic_str)
662
663         eq_(str(ic), ic_str)
664         eq_(repr(ic), ic_str)
665
666     def test_default_args(self):
667         prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
668         ic = icmpv6.icmpv6(
669             type_=icmpv6.ND_ROUTER_SOLICIT, data=icmpv6.nd_router_solicit())
670         prev.serialize(ic, None)
671         buf = ic.serialize(bytearray(), prev)
672         res = struct.unpack(icmpv6.icmpv6._PACK_STR, six.binary_type(buf[:4]))
673
674         eq_(res[0], icmpv6.ND_ROUTER_SOLICIT)
675         eq_(res[1], 0)
676         eq_(res[2], icmpv6_csum(prev, buf))
677
678         res = struct.unpack(icmpv6.nd_router_solicit._PACK_STR,
679                             six.binary_type(buf[4:]))
680
681         eq_(res[0], 0)
682
683         # with nd_option_sla
684         prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
685         ic = icmpv6.icmpv6(
686             type_=icmpv6.ND_ROUTER_SOLICIT,
687             data=icmpv6.nd_router_solicit(
688                 option=icmpv6.nd_option_sla()))
689         prev.serialize(ic, None)
690         buf = ic.serialize(bytearray(), prev)
691         res = struct.unpack(icmpv6.icmpv6._PACK_STR, six.binary_type(buf[:4]))
692
693         eq_(res[0], icmpv6.ND_ROUTER_SOLICIT)
694         eq_(res[1], 0)
695         eq_(res[2], icmpv6_csum(prev, buf))
696
697         res = struct.unpack(icmpv6.nd_router_solicit._PACK_STR,
698                             six.binary_type(buf[4:8]))
699
700         eq_(res[0], 0)
701
702         res = struct.unpack(icmpv6.nd_option_sla._PACK_STR,
703                             six.binary_type(buf[8:]))
704
705         eq_(res[0], icmpv6.ND_OPTION_SLA)
706         eq_(res[1], len(icmpv6.nd_option_sla()) // 8)
707         eq_(res[2], addrconv.mac.text_to_bin('00:00:00:00:00:00'))
708
709     def test_json(self):
710         nd_opt = icmpv6.nd_option_sla(self.nd_length, self.nd_hw_src)
711         rs = icmpv6.nd_router_solicit(self.res, nd_opt)
712         ic1 = icmpv6.icmpv6(self.type_, self.code, self.csum, rs)
713         jsondict = ic1.to_jsondict()
714         ic2 = icmpv6.icmpv6.from_jsondict(jsondict['icmpv6'])
715         eq_(str(ic1), str(ic2))
716
717
718 class Test_icmpv6_router_advert(unittest.TestCase):
719
720     def setUp(self):
721         pass
722
723     def tearDown(self):
724         pass
725
726     def test_default_args(self):
727         prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
728         ic = icmpv6.icmpv6(
729             type_=icmpv6.ND_ROUTER_ADVERT, data=icmpv6.nd_router_advert())
730         prev.serialize(ic, None)
731         buf = ic.serialize(bytearray(), prev)
732         res = struct.unpack(icmpv6.icmpv6._PACK_STR, six.binary_type(buf[:4]))
733
734         eq_(res[0], icmpv6.ND_ROUTER_ADVERT)
735         eq_(res[1], 0)
736         eq_(res[2], icmpv6_csum(prev, buf))
737
738         res = struct.unpack(icmpv6.nd_router_advert._PACK_STR,
739                             six.binary_type(buf[4:]))
740
741         eq_(res[0], 0)
742         eq_(res[1], 0)
743         eq_(res[2], 0)
744         eq_(res[3], 0)
745         eq_(res[4], 0)
746
747         # with nd_option_sla
748         prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
749         ic = icmpv6.icmpv6(
750             type_=icmpv6.ND_ROUTER_ADVERT,
751             data=icmpv6.nd_router_advert(
752                 options=[icmpv6.nd_option_sla()]))
753         prev.serialize(ic, None)
754         buf = ic.serialize(bytearray(), prev)
755         res = struct.unpack(icmpv6.icmpv6._PACK_STR, six.binary_type(buf[:4]))
756
757         eq_(res[0], icmpv6.ND_ROUTER_ADVERT)
758         eq_(res[1], 0)
759         eq_(res[2], icmpv6_csum(prev, buf))
760
761         res = struct.unpack(icmpv6.nd_router_advert._PACK_STR,
762                             six.binary_type(buf[4:16]))
763
764         eq_(res[0], 0)
765         eq_(res[1], 0)
766         eq_(res[2], 0)
767         eq_(res[3], 0)
768         eq_(res[4], 0)
769
770         res = struct.unpack(icmpv6.nd_option_sla._PACK_STR,
771                             six.binary_type(buf[16:]))
772
773         eq_(res[0], icmpv6.ND_OPTION_SLA)
774         eq_(res[1], len(icmpv6.nd_option_sla()) // 8)
775         eq_(res[2], addrconv.mac.text_to_bin('00:00:00:00:00:00'))
776
777         # with nd_option_pi
778         prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
779         ic = icmpv6.icmpv6(
780             type_=icmpv6.ND_ROUTER_ADVERT,
781             data=icmpv6.nd_router_advert(
782                 options=[icmpv6.nd_option_pi()]))
783         prev.serialize(ic, None)
784         buf = ic.serialize(bytearray(), prev)
785         res = struct.unpack(icmpv6.icmpv6._PACK_STR, six.binary_type(buf[:4]))
786
787         eq_(res[0], icmpv6.ND_ROUTER_ADVERT)
788         eq_(res[1], 0)
789         eq_(res[2], icmpv6_csum(prev, buf))
790
791         res = struct.unpack(icmpv6.nd_router_advert._PACK_STR,
792                             six.binary_type(buf[4:16]))
793
794         eq_(res[0], 0)
795         eq_(res[1], 0)
796         eq_(res[2], 0)
797         eq_(res[3], 0)
798         eq_(res[4], 0)
799
800         res = struct.unpack(icmpv6.nd_option_pi._PACK_STR,
801                             six.binary_type(buf[16:]))
802
803         eq_(res[0], icmpv6.ND_OPTION_PI)
804         eq_(res[1], 4)
805         eq_(res[2], 0)
806         eq_(res[3], 0)
807         eq_(res[4], 0)
808         eq_(res[5], 0)
809         eq_(res[6], 0)
810         eq_(res[7], addrconv.ipv6.text_to_bin('::'))
811
812         # with nd_option_sla and nd_option_pi
813         prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
814         ic = icmpv6.icmpv6(
815             type_=icmpv6.ND_ROUTER_ADVERT,
816             data=icmpv6.nd_router_advert(
817                 options=[icmpv6.nd_option_sla(), icmpv6.nd_option_pi()]))
818         prev.serialize(ic, None)
819         buf = ic.serialize(bytearray(), prev)
820         res = struct.unpack(icmpv6.icmpv6._PACK_STR, six.binary_type(buf[:4]))
821
822         eq_(res[0], icmpv6.ND_ROUTER_ADVERT)
823         eq_(res[1], 0)
824         eq_(res[2], icmpv6_csum(prev, buf))
825
826         res = struct.unpack(icmpv6.nd_router_advert._PACK_STR,
827                             six.binary_type(buf[4:16]))
828
829         eq_(res[0], 0)
830         eq_(res[1], 0)
831         eq_(res[2], 0)
832         eq_(res[3], 0)
833         eq_(res[4], 0)
834
835         res = struct.unpack(icmpv6.nd_option_sla._PACK_STR,
836                             six.binary_type(buf[16:24]))
837
838         eq_(res[0], icmpv6.ND_OPTION_SLA)
839         eq_(res[1], len(icmpv6.nd_option_sla()) // 8)
840         eq_(res[2], addrconv.mac.text_to_bin('00:00:00:00:00:00'))
841
842         res = struct.unpack(icmpv6.nd_option_pi._PACK_STR,
843                             six.binary_type(buf[24:]))
844
845         eq_(res[0], icmpv6.ND_OPTION_PI)
846         eq_(res[1], len(icmpv6.nd_option_pi()) // 8)
847         eq_(res[2], 0)
848         eq_(res[3], 0)
849         eq_(res[4], 0)
850         eq_(res[5], 0)
851         eq_(res[6], 0)
852         eq_(res[7], addrconv.ipv6.text_to_bin('::'))
853
854     def test_json(self):
855         ic1 = icmpv6.icmpv6(
856             type_=icmpv6.ND_ROUTER_ADVERT,
857             data=icmpv6.nd_router_advert(
858                 options=[icmpv6.nd_option_sla(), icmpv6.nd_option_pi()]))
859         jsondict = ic1.to_jsondict()
860         ic2 = icmpv6.icmpv6.from_jsondict(jsondict['icmpv6'])
861         eq_(str(ic1), str(ic2))
862
863
864 class Test_icmpv6_nd_option_la(unittest.TestCase):
865
866     def setUp(self):
867         pass
868
869     def tearDown(self):
870         pass
871
872     def test_default_args(self):
873         la = icmpv6.nd_option_sla()
874         buf = la.serialize()
875         res = struct.unpack(icmpv6.nd_option_sla._PACK_STR, six.binary_type(buf))
876
877         eq_(res[0], icmpv6.ND_OPTION_SLA)
878         eq_(res[1], len(icmpv6.nd_option_sla()) // 8)
879         eq_(res[2], addrconv.mac.text_to_bin('00:00:00:00:00:00'))
880
881         # with nd_neighbor
882         prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
883         ic = icmpv6.icmpv6(
884             type_=icmpv6.ND_NEIGHBOR_ADVERT,
885             data=icmpv6.nd_neighbor(
886                 option=icmpv6.nd_option_tla()))
887         prev.serialize(ic, None)
888         buf = ic.serialize(bytearray(), prev)
889         res = struct.unpack(icmpv6.icmpv6._PACK_STR, six.binary_type(buf[:4]))
890
891         eq_(res[0], icmpv6.ND_NEIGHBOR_ADVERT)
892         eq_(res[1], 0)
893         eq_(res[2], icmpv6_csum(prev, buf))
894
895         res = struct.unpack(icmpv6.nd_neighbor._PACK_STR,
896                             six.binary_type(buf[4:24]))
897
898         eq_(res[0], 0)
899         eq_(res[1], addrconv.ipv6.text_to_bin('::'))
900
901         res = struct.unpack(icmpv6.nd_option_tla._PACK_STR,
902                             six.binary_type(buf[24:]))
903
904         eq_(res[0], icmpv6.ND_OPTION_TLA)
905         eq_(res[1], len(icmpv6.nd_option_tla()) // 8)
906         eq_(res[2], addrconv.mac.text_to_bin('00:00:00:00:00:00'))
907
908         # with nd_router_solicit
909         prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
910         ic = icmpv6.icmpv6(
911             type_=icmpv6.ND_ROUTER_SOLICIT,
912             data=icmpv6.nd_router_solicit(
913                 option=icmpv6.nd_option_sla()))
914         prev.serialize(ic, None)
915         buf = ic.serialize(bytearray(), prev)
916         res = struct.unpack(icmpv6.icmpv6._PACK_STR, six.binary_type(buf[:4]))
917
918         eq_(res[0], icmpv6.ND_ROUTER_SOLICIT)
919         eq_(res[1], 0)
920         eq_(res[2], icmpv6_csum(prev, buf))
921
922         res = struct.unpack(icmpv6.nd_router_solicit._PACK_STR,
923                             six.binary_type(buf[4:8]))
924
925         eq_(res[0], 0)
926
927         res = struct.unpack(icmpv6.nd_option_sla._PACK_STR,
928                             six.binary_type(buf[8:]))
929
930         eq_(res[0], icmpv6.ND_OPTION_SLA)
931         eq_(res[1], len(icmpv6.nd_option_sla()) // 8)
932         eq_(res[2], addrconv.mac.text_to_bin('00:00:00:00:00:00'))
933
934
935 class Test_icmpv6_nd_option_pi(unittest.TestCase):
936
937     def setUp(self):
938         pass
939
940     def tearDown(self):
941         pass
942
943     def test_default_args(self):
944         pi = icmpv6.nd_option_pi()
945         buf = pi.serialize()
946         res = struct.unpack(icmpv6.nd_option_pi._PACK_STR, six.binary_type(buf))
947
948         eq_(res[0], icmpv6.ND_OPTION_PI)
949         eq_(res[1], len(icmpv6.nd_option_pi()) // 8)
950         eq_(res[2], 0)
951         eq_(res[3], 0)
952         eq_(res[4], 0)
953         eq_(res[5], 0)
954         eq_(res[6], 0)
955         eq_(res[7], addrconv.ipv6.text_to_bin('::'))
956
957         # with nd_router_advert
958         prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
959         ic = icmpv6.icmpv6(
960             type_=icmpv6.ND_ROUTER_ADVERT,
961             data=icmpv6.nd_router_advert(
962                 options=[icmpv6.nd_option_pi()]))
963         prev.serialize(ic, None)
964         buf = ic.serialize(bytearray(), prev)
965         res = struct.unpack(icmpv6.icmpv6._PACK_STR, six.binary_type(buf[:4]))
966
967         eq_(res[0], icmpv6.ND_ROUTER_ADVERT)
968         eq_(res[1], 0)
969         eq_(res[2], icmpv6_csum(prev, buf))
970
971         res = struct.unpack(icmpv6.nd_router_advert._PACK_STR,
972                             six.binary_type(buf[4:16]))
973
974         eq_(res[0], 0)
975         eq_(res[1], 0)
976         eq_(res[2], 0)
977         eq_(res[3], 0)
978         eq_(res[4], 0)
979
980         res = struct.unpack(icmpv6.nd_option_pi._PACK_STR,
981                             six.binary_type(buf[16:]))
982
983         eq_(res[0], icmpv6.ND_OPTION_PI)
984         eq_(res[1], 4)
985         eq_(res[2], 0)
986         eq_(res[3], 0)
987         eq_(res[4], 0)
988         eq_(res[5], 0)
989         eq_(res[6], 0)
990         eq_(res[7], addrconv.ipv6.text_to_bin('::'))
991
992
993 class Test_icmpv6_membership_query(unittest.TestCase):
994     type_ = 130
995     code = 0
996     csum = 0xb5a4
997     maxresp = 10000
998     address = 'ff08::1'
999     buf = b'\x82\x00\xb5\xa4\x27\x10\x00\x00' \
1000         + b'\xff\x08\x00\x00\x00\x00\x00\x00' \
1001         + b'\x00\x00\x00\x00\x00\x00\x00\x01'
1002
1003     def setUp(self):
1004         pass
1005
1006     def tearDown(self):
1007         pass
1008
1009     def test_init(self):
1010         mld = icmpv6.mld(self.maxresp, self.address)
1011         eq_(mld.maxresp, self.maxresp)
1012         eq_(mld.address, self.address)
1013
1014     def test_parser(self):
1015         msg, n, _ = icmpv6.icmpv6.parser(self.buf)
1016
1017         eq_(msg.type_, self.type_)
1018         eq_(msg.code, self.code)
1019         eq_(msg.csum, self.csum)
1020         eq_(msg.data.maxresp, self.maxresp)
1021         eq_(msg.data.address, self.address)
1022         eq_(n, None)
1023
1024     def test_serialize(self):
1025         src_ipv6 = '3ffe:507:0:1:200:86ff:fe05:80da'
1026         dst_ipv6 = '3ffe:501:0:1001::2'
1027         prev = ipv6(6, 0, 0, len(self.buf), 64, 255, src_ipv6, dst_ipv6)
1028         mld_csum = icmpv6_csum(prev, self.buf)
1029
1030         mld = icmpv6.mld(self.maxresp, self.address)
1031         icmp = icmpv6.icmpv6(self.type_, self.code, 0, mld)
1032         buf = six.binary_type(icmp.serialize(bytearray(), prev))
1033
1034         (type_, code, csum) = struct.unpack_from(icmp._PACK_STR, buf, 0)
1035         (maxresp, address) = struct.unpack_from(
1036             mld._PACK_STR, buf, icmp._MIN_LEN)
1037
1038         eq_(type_, self.type_)
1039         eq_(code, self.code)
1040         eq_(csum, mld_csum)
1041         eq_(maxresp, self.maxresp)
1042         eq_(address, addrconv.ipv6.text_to_bin(self.address))
1043
1044     def test_to_string(self):
1045         ml = icmpv6.mld(self.maxresp, self.address)
1046         ic = icmpv6.icmpv6(self.type_, self.code, self.csum, ml)
1047
1048         mld_values = {'maxresp': self.maxresp,
1049                       'address': self.address}
1050         _mld_str = ','.join(['%s=%s' % (k, repr(mld_values[k]))
1051                              for k, v in inspect.getmembers(ml)
1052                              if k in mld_values])
1053         mld_str = '%s(%s)' % (icmpv6.mld.__name__, _mld_str)
1054
1055         icmp_values = {'type_': repr(self.type_),
1056                        'code': repr(self.code),
1057                        'csum': repr(self.csum),
1058                        'data': mld_str}
1059         _ic_str = ','.join(['%s=%s' % (k, icmp_values[k])
1060                             for k, v in inspect.getmembers(ic)
1061                             if k in icmp_values])
1062         ic_str = '%s(%s)' % (icmpv6.icmpv6.__name__, _ic_str)
1063
1064         eq_(str(ic), ic_str)
1065         eq_(repr(ic), ic_str)
1066
1067     def test_default_args(self):
1068         prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
1069         ic = icmpv6.icmpv6(
1070             type_=icmpv6.MLD_LISTENER_QUERY, data=icmpv6.mld())
1071         prev.serialize(ic, None)
1072         buf = ic.serialize(bytearray(), prev)
1073         res = struct.unpack(icmpv6.icmpv6._PACK_STR, six.binary_type(buf[:4]))
1074
1075         eq_(res[0], icmpv6.MLD_LISTENER_QUERY)
1076         eq_(res[1], 0)
1077         eq_(res[2], icmpv6_csum(prev, buf))
1078
1079         res = struct.unpack(icmpv6.mld._PACK_STR, six.binary_type(buf[4:]))
1080
1081         eq_(res[0], 0)
1082         eq_(res[1], addrconv.ipv6.text_to_bin('::'))
1083
1084     def test_json(self):
1085         ic1 = icmpv6.icmpv6(
1086             type_=icmpv6.MLD_LISTENER_QUERY,
1087             data=icmpv6.mld())
1088         jsondict = ic1.to_jsondict()
1089         ic2 = icmpv6.icmpv6.from_jsondict(jsondict['icmpv6'])
1090         eq_(str(ic1), str(ic2))
1091
1092
1093 class Test_icmpv6_membership_report(Test_icmpv6_membership_query):
1094     type_ = 131
1095     code = 0
1096     csum = 0xb4a4
1097     maxresp = 10000
1098     address = 'ff08::1'
1099     buf = b'\x83\x00\xb4\xa4\x27\x10\x00\x00' \
1100         + b'\xff\x08\x00\x00\x00\x00\x00\x00' \
1101         + b'\x00\x00\x00\x00\x00\x00\x00\x01'
1102
1103     def test_json(self):
1104         ic1 = icmpv6.icmpv6(
1105             type_=icmpv6.MLD_LISTENER_REPOR,
1106             data=icmpv6.mld())
1107         jsondict = ic1.to_jsondict()
1108         ic2 = icmpv6.icmpv6.from_jsondict(jsondict['icmpv6'])
1109         eq_(str(ic1), str(ic2))
1110
1111
1112 class Test_icmpv6_membership_done(Test_icmpv6_membership_query):
1113     type_ = 132
1114     code = 0
1115     csum = 0xb3a4
1116     maxresp = 10000
1117     address = 'ff08::1'
1118     buf = b'\x84\x00\xb3\xa4\x27\x10\x00\x00' \
1119         + b'\xff\x08\x00\x00\x00\x00\x00\x00' \
1120         + b'\x00\x00\x00\x00\x00\x00\x00\x01'
1121
1122     def test_json(self):
1123         ic1 = icmpv6.icmpv6(
1124             type_=icmpv6.MLD_LISTENER_DONE,
1125             data=icmpv6.mld())
1126         jsondict = ic1.to_jsondict()
1127         ic2 = icmpv6.icmpv6.from_jsondict(jsondict['icmpv6'])
1128         eq_(str(ic1), str(ic2))
1129
1130
1131 class Test_mldv2_query(unittest.TestCase):
1132     type_ = 130
1133     code = 0
1134     csum = 0xb5a4
1135     maxresp = 10000
1136     address = 'ff08::1'
1137     s_flg = 0
1138     qrv = 2
1139     s_qrv = s_flg << 3 | qrv
1140     qqic = 10
1141     num = 0
1142     srcs = []
1143
1144     mld = icmpv6.mldv2_query(
1145         maxresp, address, s_flg, qrv, qqic, num, srcs)
1146
1147     buf = b'\x82\x00\xb5\xa4\x27\x10\x00\x00' \
1148         + b'\xff\x08\x00\x00\x00\x00\x00\x00' \
1149         + b'\x00\x00\x00\x00\x00\x00\x00\x01' \
1150         + b'\x02\x0a\x00\x00'
1151
1152     def setUp(self):
1153         pass
1154
1155     def setUp_with_srcs(self):
1156         self.num = 2
1157         self.srcs = ['ff80::1', 'ff80::2']
1158         self.mld = icmpv6.mldv2_query(
1159             self.maxresp, self.address, self.s_flg, self.qrv, self.qqic,
1160             self.num, self.srcs)
1161         self.buf = b'\x82\x00\xb5\xa4\x27\x10\x00\x00' \
1162             + b'\xff\x08\x00\x00\x00\x00\x00\x00' \
1163             + b'\x00\x00\x00\x00\x00\x00\x00\x01' \
1164             + b'\x02\x0a\x00\x02' \
1165             + b'\xff\x80\x00\x00\x00\x00\x00\x00' \
1166             + b'\x00\x00\x00\x00\x00\x00\x00\x01' \
1167             + b'\xff\x80\x00\x00\x00\x00\x00\x00' \
1168             + b'\x00\x00\x00\x00\x00\x00\x00\x02'
1169
1170     def tearDown(self):
1171         pass
1172
1173     def find_protocol(self, pkt, name):
1174         for p in pkt.protocols:
1175             if p.protocol_name == name:
1176                 return p
1177
1178     def test_init(self):
1179         eq_(self.mld.maxresp, self.maxresp)
1180         eq_(self.mld.address, self.address)
1181         eq_(self.mld.s_flg, self.s_flg)
1182         eq_(self.mld.qrv, self.qrv)
1183         eq_(self.mld.qqic, self.qqic)
1184         eq_(self.mld.num, self.num)
1185         eq_(self.mld.srcs, self.srcs)
1186
1187     def test_init_with_srcs(self):
1188         self.setUp_with_srcs()
1189         self.test_init()
1190
1191     def test_parser(self):
1192         msg, n, _ = icmpv6.icmpv6.parser(self.buf)
1193
1194         eq_(msg.type_, self.type_)
1195         eq_(msg.code, self.code)
1196         eq_(msg.csum, self.csum)
1197         eq_(msg.data.maxresp, self.maxresp)
1198         eq_(msg.data.address, self.address)
1199         eq_(msg.data.s_flg, self.s_flg)
1200         eq_(msg.data.qrv, self.qrv)
1201         eq_(msg.data.qqic, self.qqic)
1202         eq_(msg.data.num, self.num)
1203         eq_(msg.data.srcs, self.srcs)
1204         eq_(n, None)
1205
1206     def test_parser_with_srcs(self):
1207         self.setUp_with_srcs()
1208         self.test_parser()
1209
1210     def test_serialize(self):
1211         src_ipv6 = '3ffe:507:0:1:200:86ff:fe05:80da'
1212         dst_ipv6 = '3ffe:501:0:1001::2'
1213         prev = ipv6(6, 0, 0, len(self.buf), 64, 255, src_ipv6, dst_ipv6)
1214         mld_csum = icmpv6_csum(prev, self.buf)
1215
1216         icmp = icmpv6.icmpv6(self.type_, self.code, 0, self.mld)
1217         buf = icmp.serialize(bytearray(), prev)
1218
1219         (type_, code, csum) = struct.unpack_from(icmp._PACK_STR,
1220                                                  six.binary_type(buf))
1221         (maxresp, address, s_qrv, qqic, num) = struct.unpack_from(
1222             self.mld._PACK_STR, six.binary_type(buf), icmp._MIN_LEN)
1223
1224         eq_(type_, self.type_)
1225         eq_(code, self.code)
1226         eq_(csum, mld_csum)
1227         eq_(maxresp, self.maxresp)
1228         eq_(address, addrconv.ipv6.text_to_bin(self.address))
1229         s_flg = (s_qrv >> 3) & 0b1
1230         qrv = s_qrv & 0b111
1231         eq_(s_flg, self.s_flg)
1232         eq_(qrv, self.qrv)
1233         eq_(qqic, self.qqic)
1234         eq_(num, self.num)
1235
1236     def test_serialize_with_srcs(self):
1237         self.setUp_with_srcs()
1238         src_ipv6 = '3ffe:507:0:1:200:86ff:fe05:80da'
1239         dst_ipv6 = '3ffe:501:0:1001::2'
1240         prev = ipv6(6, 0, 0, len(self.buf), 64, 255, src_ipv6, dst_ipv6)
1241         mld_csum = icmpv6_csum(prev, self.buf)
1242
1243         icmp = icmpv6.icmpv6(self.type_, self.code, 0, self.mld)
1244         buf = icmp.serialize(bytearray(), prev)
1245
1246         (type_, code, csum) = struct.unpack_from(icmp._PACK_STR,
1247                                                  six.binary_type(buf))
1248         (maxresp, address, s_qrv, qqic, num) = struct.unpack_from(
1249             self.mld._PACK_STR, six.binary_type(buf), icmp._MIN_LEN)
1250         (addr1, addr2) = struct.unpack_from(
1251             '!16s16s', six.binary_type(buf), icmp._MIN_LEN + self.mld._MIN_LEN)
1252
1253         eq_(type_, self.type_)
1254         eq_(code, self.code)
1255         eq_(csum, mld_csum)
1256         eq_(maxresp, self.maxresp)
1257         eq_(address, addrconv.ipv6.text_to_bin(self.address))
1258         s_flg = (s_qrv >> 3) & 0b1
1259         qrv = s_qrv & 0b111
1260         eq_(s_flg, self.s_flg)
1261         eq_(qrv, self.qrv)
1262         eq_(qqic, self.qqic)
1263         eq_(num, self.num)
1264         eq_(addr1, addrconv.ipv6.text_to_bin(self.srcs[0]))
1265         eq_(addr2, addrconv.ipv6.text_to_bin(self.srcs[1]))
1266
1267     def _build_mldv2_query(self):
1268         e = ethernet(ethertype=ether.ETH_TYPE_IPV6)
1269         i = ipv6(nxt=inet.IPPROTO_ICMPV6)
1270         ic = icmpv6.icmpv6(type_=icmpv6.MLD_LISTENER_QUERY,
1271                            data=self.mld)
1272         p = e / i / ic
1273         return p
1274
1275     def test_build_mldv2_query(self):
1276         p = self._build_mldv2_query()
1277
1278         e = self.find_protocol(p, "ethernet")
1279         ok_(e)
1280         eq_(e.ethertype, ether.ETH_TYPE_IPV6)
1281
1282         i = self.find_protocol(p, "ipv6")
1283         ok_(i)
1284         eq_(i.nxt, inet.IPPROTO_ICMPV6)
1285
1286         ic = self.find_protocol(p, "icmpv6")
1287         ok_(ic)
1288         eq_(ic.type_, icmpv6.MLD_LISTENER_QUERY)
1289
1290         eq_(ic.data.maxresp, self.maxresp)
1291         eq_(ic.data.address, self.address)
1292         eq_(ic.data.s_flg, self.s_flg)
1293         eq_(ic.data.qrv, self.qrv)
1294         eq_(ic.data.num, self.num)
1295         eq_(ic.data.srcs, self.srcs)
1296
1297     def test_build_mldv2_query_with_srcs(self):
1298         self.setUp_with_srcs()
1299         self.test_build_mldv2_query()
1300
1301     def test_to_string(self):
1302         ic = icmpv6.icmpv6(self.type_, self.code, self.csum, self.mld)
1303
1304         mld_values = {'maxresp': self.maxresp,
1305                       'address': self.address,
1306                       's_flg': self.s_flg,
1307                       'qrv': self.qrv,
1308                       'qqic': self.qqic,
1309                       'num': self.num,
1310                       'srcs': self.srcs}
1311         _mld_str = ','.join(['%s=%s' % (k, repr(mld_values[k]))
1312                              for k, v in inspect.getmembers(self.mld)
1313                              if k in mld_values])
1314         mld_str = '%s(%s)' % (icmpv6.mldv2_query.__name__, _mld_str)
1315
1316         icmp_values = {'type_': repr(self.type_),
1317                        'code': repr(self.code),
1318                        'csum': repr(self.csum),
1319                        'data': mld_str}
1320         _ic_str = ','.join(['%s=%s' % (k, icmp_values[k])
1321                             for k, v in inspect.getmembers(ic)
1322                             if k in icmp_values])
1323         ic_str = '%s(%s)' % (icmpv6.icmpv6.__name__, _ic_str)
1324
1325         eq_(str(ic), ic_str)
1326         eq_(repr(ic), ic_str)
1327
1328     def test_to_string_with_srcs(self):
1329         self.setUp_with_srcs()
1330         self.test_to_string()
1331
1332     @raises(AssertionError)
1333     def test_num_larger_than_srcs(self):
1334         self.srcs = ['ff80::1', 'ff80::2', 'ff80::3']
1335         self.num = len(self.srcs) + 1
1336         self.buf = struct.pack(
1337             icmpv6.mldv2_query._PACK_STR,
1338             self.maxresp, addrconv.ipv6.text_to_bin(self.address),
1339             self.s_qrv, self.qqic, self.num)
1340         for src in self.srcs:
1341             self.buf += struct.pack('16s', addrconv.ipv6.text_to_bin(src))
1342         self.mld = icmpv6.mldv2_query(
1343             self.maxresp, self.address, self.s_flg, self.qrv, self.qqic,
1344             self.num, self.srcs)
1345         self.test_parser()
1346
1347     @raises(AssertionError)
1348     def test_num_smaller_than_srcs(self):
1349         self.srcs = ['ff80::1', 'ff80::2', 'ff80::3']
1350         self.num = len(self.srcs) - 1
1351         self.buf = struct.pack(
1352             icmpv6.mldv2_query._PACK_STR,
1353             self.maxresp, addrconv.ipv6.text_to_bin(self.address),
1354             self.s_qrv, self.qqic, self.num)
1355         for src in self.srcs:
1356             self.buf += struct.pack('16s', addrconv.ipv6.text_to_bin(src))
1357         self.mld = icmpv6.mldv2_query(
1358             self.maxresp, self.address, self.s_flg, self.qrv, self.qqic,
1359             self.num, self.srcs)
1360         self.test_parser()
1361
1362     def test_default_args(self):
1363         prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
1364         ic = icmpv6.icmpv6(
1365             type_=icmpv6.MLD_LISTENER_QUERY, data=icmpv6.mldv2_query())
1366         prev.serialize(ic, None)
1367         buf = ic.serialize(bytearray(), prev)
1368         res = struct.unpack(icmpv6.icmpv6._PACK_STR, six.binary_type(buf[:4]))
1369
1370         eq_(res[0], icmpv6.MLD_LISTENER_QUERY)
1371         eq_(res[1], 0)
1372         eq_(res[2], icmpv6_csum(prev, buf))
1373
1374         res = struct.unpack(icmpv6.mldv2_query._PACK_STR, six.binary_type(buf[4:]))
1375
1376         eq_(res[0], 0)
1377         eq_(res[1], addrconv.ipv6.text_to_bin('::'))
1378         eq_(res[2], 2)
1379         eq_(res[3], 0)
1380         eq_(res[4], 0)
1381
1382         # srcs without num
1383         srcs = ['ff80::1', 'ff80::2', 'ff80::3']
1384         que = icmpv6.mldv2_query(srcs=srcs)
1385         buf = que.serialize()
1386         res = struct.unpack_from(
1387             icmpv6.mldv2_query._PACK_STR, six.binary_type(buf))
1388
1389         eq_(res[0], 0)
1390         eq_(res[1], addrconv.ipv6.text_to_bin('::'))
1391         eq_(res[2], 2)
1392         eq_(res[3], 0)
1393         eq_(res[4], len(srcs))
1394
1395         (src1, src2, src3) = struct.unpack_from(
1396             '16s16s16s', six.binary_type(buf), icmpv6.mldv2_query._MIN_LEN)
1397
1398         eq_(src1, addrconv.ipv6.text_to_bin(srcs[0]))
1399         eq_(src2, addrconv.ipv6.text_to_bin(srcs[1]))
1400         eq_(src3, addrconv.ipv6.text_to_bin(srcs[2]))
1401
1402     def test_json(self):
1403         jsondict = self.mld.to_jsondict()
1404         mld = icmpv6.mldv2_query.from_jsondict(jsondict['mldv2_query'])
1405         eq_(str(self.mld), str(mld))
1406
1407     def test_json_with_srcs(self):
1408         self.setUp_with_srcs()
1409         self.test_json()
1410
1411
1412 class Test_mldv2_report(unittest.TestCase):
1413     type_ = 143
1414     code = 0
1415     csum = 0xb5a4
1416     record_num = 0
1417     records = []
1418
1419     mld = icmpv6.mldv2_report(record_num, records)
1420
1421     buf = b'\x8f\x00\xb5\xa4\x00\x00\x00\x00'
1422
1423     def setUp(self):
1424         pass
1425
1426     def setUp_with_records(self):
1427         self.record1 = icmpv6.mldv2_report_group(
1428             icmpv6.MODE_IS_INCLUDE, 0, 0, 'ff00::1')
1429         self.record2 = icmpv6.mldv2_report_group(
1430             icmpv6.MODE_IS_INCLUDE, 0, 2, 'ff00::2',
1431             ['fe80::1', 'fe80::2'])
1432         self.record3 = icmpv6.mldv2_report_group(
1433             icmpv6.MODE_IS_INCLUDE, 1, 0, 'ff00::3', [], b'abc\x00')
1434         self.record4 = icmpv6.mldv2_report_group(
1435             icmpv6.MODE_IS_INCLUDE, 2, 2, 'ff00::4',
1436             ['fe80::1', 'fe80::2'], b'abcde\x00\x00\x00')
1437         self.records = [self.record1, self.record2, self.record3,
1438                         self.record4]
1439         self.record_num = len(self.records)
1440         self.mld = icmpv6.mldv2_report(self.record_num, self.records)
1441         self.buf = b'\x8f\x00\xb5\xa4\x00\x00\x00\x04' \
1442             + b'\x01\x00\x00\x00' \
1443             + b'\xff\x00\x00\x00\x00\x00\x00\x00' \
1444             + b'\x00\x00\x00\x00\x00\x00\x00\x01' \
1445             + b'\x01\x00\x00\x02' \
1446             + b'\xff\x00\x00\x00\x00\x00\x00\x00' \
1447             + b'\x00\x00\x00\x00\x00\x00\x00\x02' \
1448             + b'\xfe\x80\x00\x00\x00\x00\x00\x00' \
1449             + b'\x00\x00\x00\x00\x00\x00\x00\x01' \
1450             + b'\xfe\x80\x00\x00\x00\x00\x00\x00' \
1451             + b'\x00\x00\x00\x00\x00\x00\x00\x02' \
1452             + b'\x01\x01\x00\x00' \
1453             + b'\xff\x00\x00\x00\x00\x00\x00\x00' \
1454             + b'\x00\x00\x00\x00\x00\x00\x00\x03' \
1455             + b'\x61\x62\x63\x00' \
1456             + b'\x01\x02\x00\x02' \
1457             + b'\xff\x00\x00\x00\x00\x00\x00\x00' \
1458             + b'\x00\x00\x00\x00\x00\x00\x00\x04' \
1459             + b'\xfe\x80\x00\x00\x00\x00\x00\x00' \
1460             + b'\x00\x00\x00\x00\x00\x00\x00\x01' \
1461             + b'\xfe\x80\x00\x00\x00\x00\x00\x00' \
1462             + b'\x00\x00\x00\x00\x00\x00\x00\x02' \
1463             + b'\x61\x62\x63\x64\x65\x00\x00\x00'
1464
1465     def tearDown(self):
1466         pass
1467
1468     def find_protocol(self, pkt, name):
1469         for p in pkt.protocols:
1470             if p.protocol_name == name:
1471                 return p
1472
1473     def test_init(self):
1474         eq_(self.mld.record_num, self.record_num)
1475         eq_(self.mld.records, self.records)
1476
1477     def test_init_with_records(self):
1478         self.setUp_with_records()
1479         self.test_init()
1480
1481     def test_parser(self):
1482         msg, n, _ = icmpv6.icmpv6.parser(self.buf)
1483
1484         eq_(msg.type_, self.type_)
1485         eq_(msg.code, self.code)
1486         eq_(msg.csum, self.csum)
1487         eq_(msg.data.record_num, self.record_num)
1488         eq_(repr(msg.data.records), repr(self.records))
1489
1490     def test_parser_with_records(self):
1491         self.setUp_with_records()
1492         self.test_parser()
1493
1494     def test_serialize(self):
1495         src_ipv6 = '3ffe:507:0:1:200:86ff:fe05:80da'
1496         dst_ipv6 = '3ffe:501:0:1001::2'
1497         prev = ipv6(6, 0, 0, len(self.buf), 64, 255, src_ipv6, dst_ipv6)
1498         mld_csum = icmpv6_csum(prev, self.buf)
1499
1500         icmp = icmpv6.icmpv6(self.type_, self.code, 0, self.mld)
1501         buf = icmp.serialize(bytearray(), prev)
1502
1503         (type_, code, csum) = struct.unpack_from(icmp._PACK_STR,
1504                                                  six.binary_type(buf))
1505         (record_num, ) = struct.unpack_from(
1506             self.mld._PACK_STR, six.binary_type(buf), icmp._MIN_LEN)
1507
1508         eq_(type_, self.type_)
1509         eq_(code, self.code)
1510         eq_(csum, mld_csum)
1511         eq_(record_num, self.record_num)
1512
1513     def test_serialize_with_records(self):
1514         self.setUp_with_records()
1515         src_ipv6 = '3ffe:507:0:1:200:86ff:fe05:80da'
1516         dst_ipv6 = '3ffe:501:0:1001::2'
1517         prev = ipv6(6, 0, 0, len(self.buf), 64, 255, src_ipv6, dst_ipv6)
1518         mld_csum = icmpv6_csum(prev, self.buf)
1519
1520         icmp = icmpv6.icmpv6(self.type_, self.code, 0, self.mld)
1521         buf = six.binary_type(icmp.serialize(bytearray(), prev))
1522
1523         (type_, code, csum) = struct.unpack_from(icmp._PACK_STR,
1524                                                  six.binary_type(buf))
1525         (record_num, ) = struct.unpack_from(
1526             self.mld._PACK_STR, six.binary_type(buf), icmp._MIN_LEN)
1527         offset = icmp._MIN_LEN + self.mld._MIN_LEN
1528         rec1 = icmpv6.mldv2_report_group.parser(buf[offset:])
1529         offset += len(rec1)
1530         rec2 = icmpv6.mldv2_report_group.parser(buf[offset:])
1531         offset += len(rec2)
1532         rec3 = icmpv6.mldv2_report_group.parser(buf[offset:])
1533         offset += len(rec3)
1534         rec4 = icmpv6.mldv2_report_group.parser(buf[offset:])
1535
1536         eq_(type_, self.type_)
1537         eq_(code, self.code)
1538         eq_(csum, mld_csum)
1539         eq_(record_num, self.record_num)
1540         eq_(repr(rec1), repr(self.record1))
1541         eq_(repr(rec2), repr(self.record2))
1542         eq_(repr(rec3), repr(self.record3))
1543         eq_(repr(rec4), repr(self.record4))
1544
1545     def _build_mldv2_report(self):
1546         e = ethernet(ethertype=ether.ETH_TYPE_IPV6)
1547         i = ipv6(nxt=inet.IPPROTO_ICMPV6)
1548         ic = icmpv6.icmpv6(type_=icmpv6.MLDV2_LISTENER_REPORT,
1549                            data=self.mld)
1550         p = e / i / ic
1551         return p
1552
1553     def test_build_mldv2_report(self):
1554         p = self._build_mldv2_report()
1555
1556         e = self.find_protocol(p, "ethernet")
1557         ok_(e)
1558         eq_(e.ethertype, ether.ETH_TYPE_IPV6)
1559
1560         i = self.find_protocol(p, "ipv6")
1561         ok_(i)
1562         eq_(i.nxt, inet.IPPROTO_ICMPV6)
1563
1564         ic = self.find_protocol(p, "icmpv6")
1565         ok_(ic)
1566         eq_(ic.type_, icmpv6.MLDV2_LISTENER_REPORT)
1567
1568         eq_(ic.data.record_num, self.record_num)
1569         eq_(ic.data.records, self.records)
1570
1571     def test_build_mldv2_report_with_records(self):
1572         self.setUp_with_records()
1573         self.test_build_mldv2_report()
1574
1575     def test_to_string(self):
1576         ic = icmpv6.icmpv6(self.type_, self.code, self.csum, self.mld)
1577
1578         mld_values = {'record_num': self.record_num,
1579                       'records': self.records}
1580         _mld_str = ','.join(['%s=%s' % (k, repr(mld_values[k]))
1581                              for k, v in inspect.getmembers(self.mld)
1582                              if k in mld_values])
1583         mld_str = '%s(%s)' % (icmpv6.mldv2_report.__name__, _mld_str)
1584
1585         icmp_values = {'type_': repr(self.type_),
1586                        'code': repr(self.code),
1587                        'csum': repr(self.csum),
1588                        'data': mld_str}
1589         _ic_str = ','.join(['%s=%s' % (k, icmp_values[k])
1590                             for k, v in inspect.getmembers(ic)
1591                             if k in icmp_values])
1592         ic_str = '%s(%s)' % (icmpv6.icmpv6.__name__, _ic_str)
1593
1594         eq_(str(ic), ic_str)
1595         eq_(repr(ic), ic_str)
1596
1597     def test_to_string_with_records(self):
1598         self.setUp_with_records()
1599         self.test_to_string()
1600
1601     @raises(AssertionError)
1602     def test_record_num_larger_than_records(self):
1603         self.record1 = icmpv6.mldv2_report_group(
1604             icmpv6.MODE_IS_INCLUDE, 0, 0, 'ff00::1')
1605         self.record2 = icmpv6.mldv2_report_group(
1606             icmpv6.MODE_IS_INCLUDE, 0, 2, 'ff00::2',
1607             ['fe80::1', 'fe80::2'])
1608         self.record3 = icmpv6.mldv2_report_group(
1609             icmpv6.MODE_IS_INCLUDE, 1, 0, 'ff00::3', [], b'abc\x00')
1610         self.record4 = icmpv6.mldv2_report_group(
1611             icmpv6.MODE_IS_INCLUDE, 2, 2, 'ff00::4',
1612             ['fe80::1', 'fe80::2'], b'abcde\x00\x00\x00')
1613         self.records = [self.record1, self.record2, self.record3,
1614                         self.record4]
1615         self.record_num = len(self.records) + 1
1616         self.buf = struct.pack(
1617             icmpv6.mldv2_report._PACK_STR, self.record_num)
1618         self.buf += self.record1.serialize()
1619         self.buf += self.record2.serialize()
1620         self.buf += self.record3.serialize()
1621         self.buf += self.record4.serialize()
1622         self.mld = icmpv6.mldv2_report(self.record_num, self.records)
1623         self.test_parser()
1624
1625     @raises(AssertionError)
1626     def test_record_num_smaller_than_records(self):
1627         self.record1 = icmpv6.mldv2_report_group(
1628             icmpv6.MODE_IS_INCLUDE, 0, 0, 'ff00::1')
1629         self.record2 = icmpv6.mldv2_report_group(
1630             icmpv6.MODE_IS_INCLUDE, 0, 2, 'ff00::2',
1631             ['fe80::1', 'fe80::2'])
1632         self.record3 = icmpv6.mldv2_report_group(
1633             icmpv6.MODE_IS_INCLUDE, 1, 0, 'ff00::3', [], b'abc\x00')
1634         self.record4 = icmpv6.mldv2_report_group(
1635             icmpv6.MODE_IS_INCLUDE, 2, 2, 'ff00::4',
1636             ['fe80::1', 'fe80::2'], b'abcde\x00\x00\x00')
1637         self.records = [self.record1, self.record2, self.record3,
1638                         self.record4]
1639         self.record_num = len(self.records) - 1
1640         self.buf = struct.pack(
1641             icmpv6.mldv2_report._PACK_STR, self.record_num)
1642         self.buf += self.record1.serialize()
1643         self.buf += self.record2.serialize()
1644         self.buf += self.record3.serialize()
1645         self.buf += self.record4.serialize()
1646         self.mld = icmpv6.mldv2_report(self.record_num, self.records)
1647         self.test_parser()
1648
1649     def test_default_args(self):
1650         prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
1651         ic = icmpv6.icmpv6(
1652             type_=icmpv6.MLDV2_LISTENER_REPORT, data=icmpv6.mldv2_report())
1653         prev.serialize(ic, None)
1654         buf = ic.serialize(bytearray(), prev)
1655         res = struct.unpack(icmpv6.icmpv6._PACK_STR, six.binary_type(buf[:4]))
1656
1657         eq_(res[0], icmpv6.MLDV2_LISTENER_REPORT)
1658         eq_(res[1], 0)
1659         eq_(res[2], icmpv6_csum(prev, buf))
1660
1661         res = struct.unpack(icmpv6.mldv2_report._PACK_STR, six.binary_type(buf[4:]))
1662
1663         eq_(res[0], 0)
1664
1665         # records without record_num
1666         record1 = icmpv6.mldv2_report_group(
1667             icmpv6.MODE_IS_INCLUDE, 0, 0, 'ff00::1')
1668         record2 = icmpv6.mldv2_report_group(
1669             icmpv6.MODE_IS_INCLUDE, 0, 2, 'ff00::2',
1670             ['fe80::1', 'fe80::2'])
1671         records = [record1, record2]
1672         rep = icmpv6.mldv2_report(records=records)
1673         buf = rep.serialize()
1674         res = struct.unpack_from(
1675             icmpv6.mldv2_report._PACK_STR, six.binary_type(buf))
1676
1677         eq_(res[0], len(records))
1678
1679         res = struct.unpack_from(
1680             icmpv6.mldv2_report_group._PACK_STR, six.binary_type(buf),
1681             icmpv6.mldv2_report._MIN_LEN)
1682
1683         eq_(res[0], icmpv6.MODE_IS_INCLUDE)
1684         eq_(res[1], 0)
1685         eq_(res[2], 0)
1686         eq_(res[3], addrconv.ipv6.text_to_bin('ff00::1'))
1687
1688         res = struct.unpack_from(
1689             icmpv6.mldv2_report_group._PACK_STR, six.binary_type(buf),
1690             icmpv6.mldv2_report._MIN_LEN +
1691             icmpv6.mldv2_report_group._MIN_LEN)
1692
1693         eq_(res[0], icmpv6.MODE_IS_INCLUDE)
1694         eq_(res[1], 0)
1695         eq_(res[2], 2)
1696         eq_(res[3], addrconv.ipv6.text_to_bin('ff00::2'))
1697
1698         res = struct.unpack_from(
1699             '16s16s', six.binary_type(buf),
1700             icmpv6.mldv2_report._MIN_LEN +
1701             icmpv6.mldv2_report_group._MIN_LEN +
1702             icmpv6.mldv2_report_group._MIN_LEN)
1703
1704         eq_(res[0], addrconv.ipv6.text_to_bin('fe80::1'))
1705         eq_(res[1], addrconv.ipv6.text_to_bin('fe80::2'))
1706
1707     def test_json(self):
1708         jsondict = self.mld.to_jsondict()
1709         mld = icmpv6.mldv2_report.from_jsondict(jsondict['mldv2_report'])
1710         eq_(str(self.mld), str(mld))
1711
1712     def test_json_with_records(self):
1713         self.setUp_with_records()
1714         self.test_json()
1715
1716
1717 class Test_mldv2_report_group(unittest.TestCase):
1718     type_ = icmpv6.MODE_IS_INCLUDE
1719     aux_len = 0
1720     num = 0
1721     address = 'ff00::1'
1722     srcs = []
1723     aux = None
1724     mld = icmpv6.mldv2_report_group(
1725         type_, aux_len, num, address, srcs, aux)
1726     buf = b'\x01\x00\x00\x00' \
1727         + b'\xff\x00\x00\x00\x00\x00\x00\x00' \
1728         + b'\x00\x00\x00\x00\x00\x00\x00\x01'
1729
1730     def setUp(self):
1731         pass
1732
1733     def setUp_with_srcs(self):
1734         self.srcs = ['fe80::1', 'fe80::2', 'fe80::3']
1735         self.num = len(self.srcs)
1736         self.mld = icmpv6.mldv2_report_group(
1737             self.type_, self.aux_len, self.num, self.address, self.srcs,
1738             self.aux)
1739         self.buf = b'\x01\x00\x00\x03' \
1740             + b'\xff\x00\x00\x00\x00\x00\x00\x00' \
1741             + b'\x00\x00\x00\x00\x00\x00\x00\x01' \
1742             + b'\xfe\x80\x00\x00\x00\x00\x00\x00' \
1743             + b'\x00\x00\x00\x00\x00\x00\x00\x01' \
1744             + b'\xfe\x80\x00\x00\x00\x00\x00\x00' \
1745             + b'\x00\x00\x00\x00\x00\x00\x00\x02' \
1746             + b'\xfe\x80\x00\x00\x00\x00\x00\x00' \
1747             + b'\x00\x00\x00\x00\x00\x00\x00\x03'
1748
1749     def setUp_with_aux(self):
1750         self.aux = b'\x01\x02\x03\x04\x05\x06\x07\x08'
1751         self.aux_len = len(self.aux) // 4
1752         self.mld = icmpv6.mldv2_report_group(
1753             self.type_, self.aux_len, self.num, self.address, self.srcs,
1754             self.aux)
1755         self.buf = b'\x01\x02\x00\x00' \
1756             + b'\xff\x00\x00\x00\x00\x00\x00\x00' \
1757             + b'\x00\x00\x00\x00\x00\x00\x00\x01' \
1758             + b'\x01\x02\x03\x04\x05\x06\x07\x08'
1759
1760     def setUp_with_srcs_and_aux(self):
1761         self.srcs = ['fe80::1', 'fe80::2', 'fe80::3']
1762         self.num = len(self.srcs)
1763         self.aux = b'\x01\x02\x03\x04\x05\x06\x07\x08'
1764         self.aux_len = len(self.aux) // 4
1765         self.mld = icmpv6.mldv2_report_group(
1766             self.type_, self.aux_len, self.num, self.address, self.srcs,
1767             self.aux)
1768         self.buf = b'\x01\x02\x00\x03' \
1769             + b'\xff\x00\x00\x00\x00\x00\x00\x00' \
1770             + b'\x00\x00\x00\x00\x00\x00\x00\x01' \
1771             + b'\xfe\x80\x00\x00\x00\x00\x00\x00' \
1772             + b'\x00\x00\x00\x00\x00\x00\x00\x01' \
1773             + b'\xfe\x80\x00\x00\x00\x00\x00\x00' \
1774             + b'\x00\x00\x00\x00\x00\x00\x00\x02' \
1775             + b'\xfe\x80\x00\x00\x00\x00\x00\x00' \
1776             + b'\x00\x00\x00\x00\x00\x00\x00\x03' \
1777             + b'\x01\x02\x03\x04\x05\x06\x07\x08'
1778
1779     def tearDown(self):
1780         pass
1781
1782     def test_init(self):
1783         eq_(self.mld.type_, self.type_)
1784         eq_(self.mld.aux_len, self.aux_len)
1785         eq_(self.mld.num, self.num)
1786         eq_(self.mld.address, self.address)
1787         eq_(self.mld.srcs, self.srcs)
1788         eq_(self.mld.aux, self.aux)
1789
1790     def test_init_with_srcs(self):
1791         self.setUp_with_srcs()
1792         self.test_init()
1793
1794     def test_init_with_aux(self):
1795         self.setUp_with_aux()
1796         self.test_init()
1797
1798     def test_init_with_srcs_and_aux(self):
1799         self.setUp_with_srcs_and_aux()
1800         self.test_init()
1801
1802     def test_parser(self):
1803         _res = icmpv6.mldv2_report_group.parser(self.buf)
1804         if type(_res) is tuple:
1805             res = _res[0]
1806         else:
1807             res = _res
1808
1809         eq_(res.type_, self.type_)
1810         eq_(res.aux_len, self.aux_len)
1811         eq_(res.num, self.num)
1812         eq_(res.address, self.address)
1813         eq_(res.srcs, self.srcs)
1814         eq_(res.aux, self.aux)
1815
1816     def test_parser_with_srcs(self):
1817         self.setUp_with_srcs()
1818         self.test_parser()
1819
1820     def test_parser_with_aux(self):
1821         self.setUp_with_aux()
1822         self.test_parser()
1823
1824     def test_parser_with_srcs_and_aux(self):
1825         self.setUp_with_srcs_and_aux()
1826         self.test_parser()
1827
1828     def test_serialize(self):
1829         buf = self.mld.serialize()
1830         res = struct.unpack_from(
1831             icmpv6.mldv2_report_group._PACK_STR, six.binary_type(buf))
1832
1833         eq_(res[0], self.type_)
1834         eq_(res[1], self.aux_len)
1835         eq_(res[2], self.num)
1836         eq_(res[3], addrconv.ipv6.text_to_bin(self.address))
1837
1838     def test_serialize_with_srcs(self):
1839         self.setUp_with_srcs()
1840         buf = self.mld.serialize()
1841         res = struct.unpack_from(
1842             icmpv6.mldv2_report_group._PACK_STR, six.binary_type(buf))
1843         (src1, src2, src3) = struct.unpack_from(
1844             '16s16s16s', six.binary_type(buf), icmpv6.mldv2_report_group._MIN_LEN)
1845         eq_(res[0], self.type_)
1846         eq_(res[1], self.aux_len)
1847         eq_(res[2], self.num)
1848         eq_(res[3], addrconv.ipv6.text_to_bin(self.address))
1849         eq_(src1, addrconv.ipv6.text_to_bin(self.srcs[0]))
1850         eq_(src2, addrconv.ipv6.text_to_bin(self.srcs[1]))
1851         eq_(src3, addrconv.ipv6.text_to_bin(self.srcs[2]))
1852
1853     def test_serialize_with_aux(self):
1854         self.setUp_with_aux()
1855         buf = self.mld.serialize()
1856         res = struct.unpack_from(
1857             icmpv6.mldv2_report_group._PACK_STR, six.binary_type(buf))
1858         (aux, ) = struct.unpack_from(
1859             '%ds' % (self.aux_len * 4), six.binary_type(buf),
1860             icmpv6.mldv2_report_group._MIN_LEN)
1861         eq_(res[0], self.type_)
1862         eq_(res[1], self.aux_len)
1863         eq_(res[2], self.num)
1864         eq_(res[3], addrconv.ipv6.text_to_bin(self.address))
1865         eq_(aux, self.aux)
1866
1867     def test_serialize_with_srcs_and_aux(self):
1868         self.setUp_with_srcs_and_aux()
1869         buf = self.mld.serialize()
1870         res = struct.unpack_from(
1871             icmpv6.mldv2_report_group._PACK_STR, six.binary_type(buf))
1872         (src1, src2, src3) = struct.unpack_from(
1873             '16s16s16s', six.binary_type(buf), icmpv6.mldv2_report_group._MIN_LEN)
1874         (aux, ) = struct.unpack_from(
1875             '%ds' % (self.aux_len * 4), six.binary_type(buf),
1876             icmpv6.mldv2_report_group._MIN_LEN + 16 * 3)
1877         eq_(res[0], self.type_)
1878         eq_(res[1], self.aux_len)
1879         eq_(res[2], self.num)
1880         eq_(res[3], addrconv.ipv6.text_to_bin(self.address))
1881         eq_(src1, addrconv.ipv6.text_to_bin(self.srcs[0]))
1882         eq_(src2, addrconv.ipv6.text_to_bin(self.srcs[1]))
1883         eq_(src3, addrconv.ipv6.text_to_bin(self.srcs[2]))
1884         eq_(aux, self.aux)
1885
1886     def test_to_string(self):
1887         igmp_values = {'type_': repr(self.type_),
1888                        'aux_len': repr(self.aux_len),
1889                        'num': repr(self.num),
1890                        'address': repr(self.address),
1891                        'srcs': repr(self.srcs),
1892                        'aux': repr(self.aux)}
1893         _g_str = ','.join(['%s=%s' % (k, igmp_values[k])
1894                            for k, v in inspect.getmembers(self.mld)
1895                            if k in igmp_values])
1896         g_str = '%s(%s)' % (icmpv6.mldv2_report_group.__name__, _g_str)
1897
1898         eq_(str(self.mld), g_str)
1899         eq_(repr(self.mld), g_str)
1900
1901     def test_to_string_with_srcs(self):
1902         self.setUp_with_srcs()
1903         self.test_to_string()
1904
1905     def test_to_string_with_aux(self):
1906         self.setUp_with_aux()
1907         self.test_to_string()
1908
1909     def test_to_string_with_srcs_and_aux(self):
1910         self.setUp_with_srcs_and_aux()
1911         self.test_to_string()
1912
1913     def test_len(self):
1914         eq_(len(self.mld), 20)
1915
1916     def test_len_with_srcs(self):
1917         self.setUp_with_srcs()
1918         eq_(len(self.mld), 68)
1919
1920     def test_len_with_aux(self):
1921         self.setUp_with_aux()
1922         eq_(len(self.mld), 28)
1923
1924     def test_len_with_srcs_and_aux(self):
1925         self.setUp_with_srcs_and_aux()
1926         eq_(len(self.mld), 76)
1927
1928     @raises(AssertionError)
1929     def test_num_larger_than_srcs(self):
1930         self.srcs = ['fe80::1', 'fe80::2', 'fe80::3']
1931         self.num = len(self.srcs) + 1
1932         self.buf = struct.pack(
1933             icmpv6.mldv2_report_group._PACK_STR, self.type_, self.aux_len,
1934             self.num, addrconv.ipv6.text_to_bin(self.address))
1935         for src in self.srcs:
1936             self.buf += struct.pack('16s', addrconv.ipv6.text_to_bin(src))
1937         self.mld = icmpv6.mldv2_report_group(
1938             self.type_, self.aux_len, self.num, self.address,
1939             self.srcs, self.aux)
1940         self.test_parser()
1941
1942     @raises(AssertionError)
1943     def test_num_smaller_than_srcs(self):
1944         self.srcs = ['fe80::1', 'fe80::2', 'fe80::3']
1945         self.num = len(self.srcs) - 1
1946         self.buf = struct.pack(
1947             icmpv6.mldv2_report_group._PACK_STR, self.type_, self.aux_len,
1948             self.num, addrconv.ipv6.text_to_bin(self.address))
1949         for src in self.srcs:
1950             self.buf += struct.pack('16s', addrconv.ipv6.text_to_bin(src))
1951         self.mld = icmpv6.mldv2_report_group(
1952             self.type_, self.aux_len, self.num, self.address,
1953             self.srcs, self.aux)
1954         self.test_parser()
1955
1956     @raises(struct.error)
1957     def test_aux_len_larger_than_aux(self):
1958         self.aux = b'\x01\x02\x03\x04\x05\x06\x07\x08'
1959         self.aux_len = len(self.aux) // 4 + 1
1960         self.buf = struct.pack(
1961             icmpv6.mldv2_report_group._PACK_STR, self.type_, self.aux_len,
1962             self.num, addrconv.ipv6.text_to_bin(self.address))
1963         self.buf += self.aux
1964         self.mld = icmpv6.mldv2_report_group(
1965             self.type_, self.aux_len, self.num, self.address,
1966             self.srcs, self.aux)
1967         self.test_parser()
1968
1969     @raises(AssertionError)
1970     def test_aux_len_smaller_than_aux(self):
1971         self.aux = b'\x01\x02\x03\x04\x05\x06\x07\x08'
1972         self.aux_len = len(self.aux) // 4 - 1
1973         self.buf = struct.pack(
1974             icmpv6.mldv2_report_group._PACK_STR, self.type_, self.aux_len,
1975             self.num, addrconv.ipv6.text_to_bin(self.address))
1976         self.buf += self.aux
1977         self.mld = icmpv6.mldv2_report_group(
1978             self.type_, self.aux_len, self.num, self.address,
1979             self.srcs, self.aux)
1980         self.test_parser()
1981
1982     def test_default_args(self):
1983         rep = icmpv6.mldv2_report_group()
1984         buf = rep.serialize()
1985         res = struct.unpack_from(
1986             icmpv6.mldv2_report_group._PACK_STR, six.binary_type(buf))
1987
1988         eq_(res[0], 0)
1989         eq_(res[1], 0)
1990         eq_(res[2], 0)
1991         eq_(res[3], addrconv.ipv6.text_to_bin('::'))
1992
1993         # srcs without num
1994         srcs = ['fe80::1', 'fe80::2', 'fe80::3']
1995         rep = icmpv6.mldv2_report_group(srcs=srcs)
1996         buf = rep.serialize()
1997         LOG.info(repr(buf))
1998         res = struct.unpack_from(
1999             icmpv6.mldv2_report_group._PACK_STR, six.binary_type(buf))
2000
2001         eq_(res[0], 0)
2002         eq_(res[1], 0)
2003         eq_(res[2], len(srcs))
2004         eq_(res[3], addrconv.ipv6.text_to_bin('::'))
2005
2006         (src1, src2, src3) = struct.unpack_from(
2007             '16s16s16s', six.binary_type(buf), icmpv6.mldv2_report_group._MIN_LEN)
2008
2009         eq_(src1, addrconv.ipv6.text_to_bin(srcs[0]))
2010         eq_(src2, addrconv.ipv6.text_to_bin(srcs[1]))
2011         eq_(src3, addrconv.ipv6.text_to_bin(srcs[2]))
2012
2013         # aux without aux_len
2014         rep = icmpv6.mldv2_report_group(aux=b'\x01\x02\x03')
2015         buf = rep.serialize()
2016         res = struct.unpack_from(
2017             icmpv6.mldv2_report_group._PACK_STR, six.binary_type(buf))
2018
2019         eq_(res[0], 0)
2020         eq_(res[1], 1)
2021         eq_(res[2], 0)
2022         eq_(res[3], addrconv.ipv6.text_to_bin('::'))
2023         eq_(buf[icmpv6.mldv2_report_group._MIN_LEN:], b'\x01\x02\x03\x00')
2024
2025     def test_json(self):
2026         jsondict = self.mld.to_jsondict()
2027         mld = icmpv6.mldv2_report_group.from_jsondict(
2028             jsondict['mldv2_report_group'])
2029         eq_(str(self.mld), str(mld))
2030
2031     def test_json_with_srcs(self):
2032         self.setUp_with_srcs()
2033         self.test_json()
2034
2035     def test_json_with_aux(self):
2036         self.setUp_with_aux()
2037         self.test_json()
2038
2039     def test_json_with_srcs_and_aux(self):
2040         self.setUp_with_srcs_and_aux()
2041         self.test_json()