1 # Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 # vim: tabstop=4 shiftwidth=4 softtabstop=4
24 from nose.tools import ok_, eq_, nottest, raises
25 from ryu.ofproto import ether, inet
26 from ryu.lib.packet.ethernet import ethernet
27 from ryu.lib.packet.packet import Packet
28 from ryu.lib.packet import icmpv6
29 from ryu.lib.packet.ipv6 import ipv6
30 from ryu.lib.packet import packet_utils
31 from ryu.lib import addrconv
34 LOG = logging.getLogger(__name__)
37 def icmpv6_csum(prev, buf):
38 ph = struct.pack('!16s16sI3xB',
39 addrconv.ipv6.text_to_bin(prev.src),
40 addrconv.ipv6.text_to_bin(prev.dst),
41 prev.payload_length, prev.nxt)
43 struct.pack_into('!H', h, 2, 0)
45 return packet_utils.checksum(ph + h)
48 class Test_icmpv6_header(unittest.TestCase):
52 buf = b'\xff\x00\x00\xcf'
53 icmp = icmpv6.icmpv6(type_, code, 0)
62 eq_(self.type_, self.icmp.type_)
63 eq_(self.code, self.icmp.code)
64 eq_(0, self.icmp.csum)
66 def test_parser(self):
67 msg, n, _ = self.icmp.parser(self.buf)
69 eq_(msg.type_, self.type_)
70 eq_(msg.code, self.code)
71 eq_(msg.csum, self.csum)
75 def test_serialize(self):
76 src_ipv6 = 'fe80::200:ff:fe00:ef'
77 dst_ipv6 = 'fe80::200:ff:fe00:1'
78 prev = ipv6(6, 0, 0, 4, 58, 255, src_ipv6, dst_ipv6)
80 buf = self.icmp.serialize(bytearray(), prev)
81 (type_, code, csum) = struct.unpack(self.icmp._PACK_STR, six.binary_type(buf))
83 eq_(type_, self.type_)
88 def test_malformed_icmpv6(self):
89 m_short_buf = self.buf[1:self.icmp._MIN_LEN]
90 self.icmp.parser(m_short_buf)
92 def test_default_args(self):
93 prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
95 prev.serialize(ic, None)
96 buf = ic.serialize(bytearray(), prev)
97 res = struct.unpack(icmpv6.icmpv6._PACK_STR, six.binary_type(buf))
101 eq_(res[2], icmpv6_csum(prev, buf))
104 jsondict = self.icmp.to_jsondict()
105 icmp = icmpv6.icmpv6.from_jsondict(jsondict['icmpv6'])
106 eq_(str(self.icmp), str(icmp))
109 class Test_icmpv6_echo_request(unittest.TestCase):
115 data = b'\x01\xc9\xe7\x36\xd3\x39\x06\x00'
116 buf = b'\x80\x00\xa5\x72\x76\x20\x00\x00'
125 echo = icmpv6.echo(0, 0)
130 def _test_parser(self, data=None):
131 buf = self.buf + (data or b'')
132 msg, n, _ = icmpv6.icmpv6.parser(buf)
134 eq_(msg.type_, self.type_)
135 eq_(msg.code, self.code)
136 eq_(msg.csum, self.csum)
137 eq_(msg.data.id, self.id_)
138 eq_(msg.data.seq, self.seq)
139 eq_(msg.data.data, data)
142 def test_parser_without_data(self):
145 def test_parser_with_data(self):
146 self._test_parser(self.data)
148 def _test_serialize(self, echo_data=None):
149 buf = self.buf + (echo_data or b'')
150 src_ipv6 = '3ffe:507:0:1:200:86ff:fe05:80da'
151 dst_ipv6 = '3ffe:501:0:1001::2'
152 prev = ipv6(6, 0, 0, len(buf), 64, 255, src_ipv6, dst_ipv6)
153 echo_csum = icmpv6_csum(prev, buf)
155 echo = icmpv6.echo(self.id_, self.seq, echo_data)
156 icmp = icmpv6.icmpv6(self.type_, self.code, 0, echo)
157 buf = six.binary_type(icmp.serialize(bytearray(), prev))
159 (type_, code, csum) = struct.unpack_from(icmp._PACK_STR, buf, 0)
160 (id_, seq) = struct.unpack_from(echo._PACK_STR, buf, icmp._MIN_LEN)
161 data = buf[(icmp._MIN_LEN + echo._MIN_LEN):]
162 data = data if len(data) != 0 else None
164 eq_(type_, self.type_)
171 def test_serialize_without_data(self):
172 self._test_serialize()
174 def test_serialize_with_data(self):
175 self._test_serialize(self.data)
177 def test_to_string(self):
178 ec = icmpv6.echo(self.id_, self.seq, self.data)
179 ic = icmpv6.icmpv6(self.type_, self.code, self.csum, ec)
181 echo_values = {'id': self.id_,
184 _echo_str = ','.join(['%s=%s' % (k, repr(echo_values[k]))
185 for k, v in inspect.getmembers(ec)
186 if k in echo_values])
187 echo_str = '%s(%s)' % (icmpv6.echo.__name__, _echo_str)
189 icmp_values = {'type_': repr(self.type_),
190 'code': repr(self.code),
191 'csum': repr(self.csum),
193 _ic_str = ','.join(['%s=%s' % (k, icmp_values[k])
194 for k, v in inspect.getmembers(ic)
195 if k in icmp_values])
196 ic_str = '%s(%s)' % (icmpv6.icmpv6.__name__, _ic_str)
199 eq_(repr(ic), ic_str)
201 def test_default_args(self):
202 prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
204 type_=icmpv6.ICMPV6_ECHO_REQUEST, data=icmpv6.echo())
205 prev.serialize(ic, None)
206 buf = ic.serialize(bytearray(), prev)
207 res = struct.unpack(icmpv6.icmpv6._PACK_STR, six.binary_type(buf[:4]))
209 eq_(res[0], icmpv6.ICMPV6_ECHO_REQUEST)
211 eq_(res[2], icmpv6_csum(prev, buf))
213 res = struct.unpack(icmpv6.echo._PACK_STR, six.binary_type(buf[4:]))
219 ec = icmpv6.echo(self.id_, self.seq, self.data)
220 ic1 = icmpv6.icmpv6(self.type_, self.code, self.csum, ec)
221 jsondict = ic1.to_jsondict()
222 ic2 = icmpv6.icmpv6.from_jsondict(jsondict['icmpv6'])
223 eq_(str(ic1), str(ic2))
226 class Test_icmpv6_echo_reply(Test_icmpv6_echo_request):
230 self.buf = b'\x81\x00\xa4\x72\x76\x20\x00\x00'
232 def test_default_args(self):
233 prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
235 type_=icmpv6.ICMPV6_ECHO_REPLY, data=icmpv6.echo())
236 prev.serialize(ic, None)
237 buf = ic.serialize(bytearray(), prev)
238 res = struct.unpack(icmpv6.icmpv6._PACK_STR, six.binary_type(buf[:4]))
240 eq_(res[0], icmpv6.ICMPV6_ECHO_REPLY)
242 eq_(res[2], icmpv6_csum(prev, buf))
244 res = struct.unpack(icmpv6.echo._PACK_STR, six.binary_type(buf[4:]))
250 class Test_icmpv6_neighbor_solicit(unittest.TestCase):
255 dst = '3ffe:507:0:1:200:86ff:fe05:80da'
258 nd_hw_src = '00:60:97:07:69:ea'
259 data = b'\x01\x01\x00\x60\x97\x07\x69\xea'
260 buf = b'\x87\x00\x95\x2d\x00\x00\x00\x00' \
261 + b'\x3f\xfe\x05\x07\x00\x00\x00\x01' \
262 + b'\x02\x00\x86\xff\xfe\x05\x80\xda'
263 src_ipv6 = '3ffe:507:0:1:200:86ff:fe05:80da'
264 dst_ipv6 = '3ffe:501:0:1001::2'
273 nd = icmpv6.nd_neighbor(self.res, self.dst)
274 eq_(nd.res, self.res)
275 eq_(nd.dst, self.dst)
278 def _test_parser(self, data=None):
279 buf = self.buf + (data or b'')
280 msg, n, _ = icmpv6.icmpv6.parser(buf)
282 eq_(msg.type_, self.type_)
283 eq_(msg.code, self.code)
284 eq_(msg.csum, self.csum)
285 eq_(msg.data.res, self.res)
286 eq_(addrconv.ipv6.text_to_bin(msg.data.dst),
287 addrconv.ipv6.text_to_bin(self.dst))
291 eq_(nd.length, self.nd_length)
292 eq_(nd.hw_src, self.nd_hw_src)
295 def test_parser_without_data(self):
298 def test_parser_with_data(self):
299 self._test_parser(self.data)
301 def test_serialize_without_data(self):
302 nd = icmpv6.nd_neighbor(self.res, self.dst)
303 prev = ipv6(6, 0, 0, 24, 64, 255, self.src_ipv6, self.dst_ipv6)
304 nd_csum = icmpv6_csum(prev, self.buf)
306 icmp = icmpv6.icmpv6(self.type_, self.code, 0, nd)
307 buf = six.binary_type(icmp.serialize(bytearray(), prev))
309 (type_, code, csum) = struct.unpack_from(icmp._PACK_STR, buf, 0)
310 (res, dst) = struct.unpack_from(nd._PACK_STR, buf, icmp._MIN_LEN)
311 data = buf[(icmp._MIN_LEN + nd._MIN_LEN):]
313 eq_(type_, self.type_)
316 eq_(res >> 29, self.res)
317 eq_(dst, addrconv.ipv6.text_to_bin(self.dst))
320 def test_serialize_with_data(self):
321 nd_opt = icmpv6.nd_option_sla(self.nd_length, self.nd_hw_src)
322 nd = icmpv6.nd_neighbor(self.res, self.dst, nd_opt)
323 prev = ipv6(6, 0, 0, 32, 64, 255, self.src_ipv6, self.dst_ipv6)
324 nd_csum = icmpv6_csum(prev, self.buf + self.data)
326 icmp = icmpv6.icmpv6(self.type_, self.code, 0, nd)
327 buf = six.binary_type(icmp.serialize(bytearray(), prev))
329 (type_, code, csum) = struct.unpack_from(icmp._PACK_STR, buf, 0)
330 (res, dst) = struct.unpack_from(nd._PACK_STR, buf, icmp._MIN_LEN)
331 (nd_type, nd_length, nd_hw_src) = struct.unpack_from(
332 nd_opt._PACK_STR, buf, icmp._MIN_LEN + nd._MIN_LEN)
333 data = buf[(icmp._MIN_LEN + nd._MIN_LEN + 8):]
335 eq_(type_, self.type_)
338 eq_(res >> 29, self.res)
339 eq_(dst, addrconv.ipv6.text_to_bin(self.dst))
340 eq_(nd_type, self.nd_type)
341 eq_(nd_length, self.nd_length)
342 eq_(nd_hw_src, addrconv.mac.text_to_bin(self.nd_hw_src))
344 def test_to_string(self):
345 nd_opt = icmpv6.nd_option_sla(self.nd_length, self.nd_hw_src)
346 nd = icmpv6.nd_neighbor(self.res, self.dst, nd_opt)
347 ic = icmpv6.icmpv6(self.type_, self.code, self.csum, nd)
349 nd_opt_values = {'length': self.nd_length,
350 'hw_src': self.nd_hw_src,
352 _nd_opt_str = ','.join(['%s=%s' % (k, repr(nd_opt_values[k]))
353 for k, v in inspect.getmembers(nd_opt)
354 if k in nd_opt_values])
355 nd_opt_str = '%s(%s)' % (icmpv6.nd_option_sla.__name__, _nd_opt_str)
357 nd_values = {'res': repr(nd.res),
358 'dst': repr(self.dst),
359 'option': nd_opt_str}
360 _nd_str = ','.join(['%s=%s' % (k, nd_values[k])
361 for k, v in inspect.getmembers(nd)
363 nd_str = '%s(%s)' % (icmpv6.nd_neighbor.__name__, _nd_str)
365 icmp_values = {'type_': repr(self.type_),
366 'code': repr(self.code),
367 'csum': repr(self.csum),
369 _ic_str = ','.join(['%s=%s' % (k, icmp_values[k])
370 for k, v in inspect.getmembers(ic)
371 if k in icmp_values])
372 ic_str = '%s(%s)' % (icmpv6.icmpv6.__name__, _ic_str)
375 eq_(repr(ic), ic_str)
377 def test_default_args(self):
378 prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
380 type_=icmpv6.ND_NEIGHBOR_SOLICIT, data=icmpv6.nd_neighbor())
381 prev.serialize(ic, None)
382 buf = ic.serialize(bytearray(), prev)
383 res = struct.unpack(icmpv6.icmpv6._PACK_STR, six.binary_type(buf[:4]))
385 eq_(res[0], icmpv6.ND_NEIGHBOR_SOLICIT)
387 eq_(res[2], icmpv6_csum(prev, buf))
389 res = struct.unpack(icmpv6.nd_neighbor._PACK_STR, six.binary_type(buf[4:]))
392 eq_(res[1], addrconv.ipv6.text_to_bin('::'))
395 prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
397 type_=icmpv6.ND_NEIGHBOR_SOLICIT,
398 data=icmpv6.nd_neighbor(
399 option=icmpv6.nd_option_sla()))
400 prev.serialize(ic, None)
401 buf = ic.serialize(bytearray(), prev)
402 res = struct.unpack(icmpv6.icmpv6._PACK_STR, six.binary_type(buf[:4]))
404 eq_(res[0], icmpv6.ND_NEIGHBOR_SOLICIT)
406 eq_(res[2], icmpv6_csum(prev, buf))
408 res = struct.unpack(icmpv6.nd_neighbor._PACK_STR,
409 six.binary_type(buf[4:24]))
412 eq_(res[1], addrconv.ipv6.text_to_bin('::'))
414 res = struct.unpack(icmpv6.nd_option_sla._PACK_STR,
415 six.binary_type(buf[24:]))
417 eq_(res[0], icmpv6.ND_OPTION_SLA)
418 eq_(res[1], len(icmpv6.nd_option_sla()) // 8)
419 eq_(res[2], addrconv.mac.text_to_bin('00:00:00:00:00:00'))
422 nd_opt = icmpv6.nd_option_sla(self.nd_length, self.nd_hw_src)
423 nd = icmpv6.nd_neighbor(self.res, self.dst, nd_opt)
424 ic1 = icmpv6.icmpv6(self.type_, self.code, self.csum, nd)
425 jsondict = ic1.to_jsondict()
426 ic2 = icmpv6.icmpv6.from_jsondict(jsondict['icmpv6'])
427 eq_(str(ic1), str(ic2))
430 class Test_icmpv6_neighbor_advert(Test_icmpv6_neighbor_solicit):
435 self.dst = '3ffe:507:0:1:260:97ff:fe07:69ea'
439 self.nd_hw_src = '00:60:97:07:69:ea'
440 self.data = b'\x02\x01\x00\x60\x97\x07\x69\xea'
441 self.buf = b'\x88\x00\xb8\xba\xe0\x00\x00\x00' \
442 + b'\x3f\xfe\x05\x07\x00\x00\x00\x01' \
443 + b'\x02\x60\x97\xff\xfe\x07\x69\xea'
445 def test_serialize_with_data(self):
446 nd_opt = icmpv6.nd_option_tla(self.nd_length, self.nd_hw_src)
447 nd = icmpv6.nd_neighbor(self.res, self.dst, nd_opt)
448 prev = ipv6(6, 0, 0, 32, 64, 255, self.src_ipv6, self.dst_ipv6)
449 nd_csum = icmpv6_csum(prev, self.buf + self.data)
451 icmp = icmpv6.icmpv6(self.type_, self.code, 0, nd)
452 buf = six.binary_type(icmp.serialize(bytearray(), prev))
454 (type_, code, csum) = struct.unpack_from(icmp._PACK_STR, buf, 0)
455 (res, dst) = struct.unpack_from(nd._PACK_STR, buf, icmp._MIN_LEN)
456 (nd_type, nd_length, nd_hw_src) = struct.unpack_from(
457 nd_opt._PACK_STR, buf, icmp._MIN_LEN + nd._MIN_LEN)
458 data = buf[(icmp._MIN_LEN + nd._MIN_LEN + 8):]
460 eq_(type_, self.type_)
463 eq_(res >> 29, self.res)
464 eq_(dst, addrconv.ipv6.text_to_bin(self.dst))
465 eq_(nd_type, self.nd_type)
466 eq_(nd_length, self.nd_length)
467 eq_(nd_hw_src, addrconv.mac.text_to_bin(self.nd_hw_src))
469 def test_to_string(self):
470 nd_opt = icmpv6.nd_option_tla(self.nd_length, self.nd_hw_src)
471 nd = icmpv6.nd_neighbor(self.res, self.dst, nd_opt)
472 ic = icmpv6.icmpv6(self.type_, self.code, self.csum, nd)
474 nd_opt_values = {'length': self.nd_length,
475 'hw_src': self.nd_hw_src,
477 _nd_opt_str = ','.join(['%s=%s' % (k, repr(nd_opt_values[k]))
478 for k, v in inspect.getmembers(nd_opt)
479 if k in nd_opt_values])
480 nd_opt_str = '%s(%s)' % (icmpv6.nd_option_tla.__name__, _nd_opt_str)
482 nd_values = {'res': repr(nd.res),
483 'dst': repr(self.dst),
484 'option': nd_opt_str}
485 _nd_str = ','.join(['%s=%s' % (k, nd_values[k])
486 for k, v in inspect.getmembers(nd)
488 nd_str = '%s(%s)' % (icmpv6.nd_neighbor.__name__, _nd_str)
490 icmp_values = {'type_': repr(self.type_),
491 'code': repr(self.code),
492 'csum': repr(self.csum),
494 _ic_str = ','.join(['%s=%s' % (k, icmp_values[k])
495 for k, v in inspect.getmembers(ic)
496 if k in icmp_values])
497 ic_str = '%s(%s)' % (icmpv6.icmpv6.__name__, _ic_str)
500 eq_(repr(ic), ic_str)
502 def test_default_args(self):
503 prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
505 type_=icmpv6.ND_NEIGHBOR_ADVERT, data=icmpv6.nd_neighbor())
506 prev.serialize(ic, None)
507 buf = ic.serialize(bytearray(), prev)
508 res = struct.unpack(icmpv6.icmpv6._PACK_STR, six.binary_type(buf[:4]))
510 eq_(res[0], icmpv6.ND_NEIGHBOR_ADVERT)
512 eq_(res[2], icmpv6_csum(prev, buf))
514 res = struct.unpack(icmpv6.nd_neighbor._PACK_STR, six.binary_type(buf[4:]))
517 eq_(res[1], addrconv.ipv6.text_to_bin('::'))
520 prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
522 type_=icmpv6.ND_NEIGHBOR_ADVERT,
523 data=icmpv6.nd_neighbor(
524 option=icmpv6.nd_option_tla()))
525 prev.serialize(ic, None)
526 buf = ic.serialize(bytearray(), prev)
527 res = struct.unpack(icmpv6.icmpv6._PACK_STR, six.binary_type(buf[:4]))
529 eq_(res[0], icmpv6.ND_NEIGHBOR_ADVERT)
531 eq_(res[2], icmpv6_csum(prev, buf))
533 res = struct.unpack(icmpv6.nd_neighbor._PACK_STR,
534 six.binary_type(buf[4:24]))
537 eq_(res[1], addrconv.ipv6.text_to_bin('::'))
539 res = struct.unpack(icmpv6.nd_option_tla._PACK_STR,
540 six.binary_type(buf[24:]))
542 eq_(res[0], icmpv6.ND_OPTION_TLA)
543 eq_(res[1], len(icmpv6.nd_option_tla()) // 8)
544 eq_(res[2], addrconv.mac.text_to_bin('00:00:00:00:00:00'))
547 class Test_icmpv6_router_solicit(unittest.TestCase):
554 nd_hw_src = '12:2d:a5:6d:bc:0f'
555 data = b'\x00\x00\x00\x00\x01\x01\x12\x2d\xa5\x6d\xbc\x0f'
556 buf = b'\x85\x00\x97\xd9'
557 src_ipv6 = '3ffe:507:0:1:200:86ff:fe05:80da'
558 dst_ipv6 = '3ffe:501:0:1001::2'
567 rs = icmpv6.nd_router_solicit(self.res)
568 eq_(rs.res, self.res)
571 def _test_parser(self, data=None):
572 buf = self.buf + (data or b'')
573 msg, n, _ = icmpv6.icmpv6.parser(buf)
575 eq_(msg.type_, self.type_)
576 eq_(msg.code, self.code)
577 eq_(msg.csum, self.csum)
579 eq_(msg.data.res, self.res)
583 eq_(rs.length, self.nd_length)
584 eq_(rs.hw_src, self.nd_hw_src)
587 def test_parser_without_data(self):
590 def test_parser_with_data(self):
591 self._test_parser(self.data)
593 def test_serialize_without_data(self):
594 rs = icmpv6.nd_router_solicit(self.res)
595 prev = ipv6(6, 0, 0, 8, 64, 255, self.src_ipv6, self.dst_ipv6)
596 rs_csum = icmpv6_csum(prev, self.buf)
598 icmp = icmpv6.icmpv6(self.type_, self.code, 0, rs)
599 buf = six.binary_type(icmp.serialize(bytearray(), prev))
601 (type_, code, csum) = struct.unpack_from(icmp._PACK_STR, buf, 0)
602 res = struct.unpack_from(rs._PACK_STR, buf, icmp._MIN_LEN)
603 data = buf[(icmp._MIN_LEN + rs._MIN_LEN):]
605 eq_(type_, self.type_)
608 eq_(res[0], self.res)
611 def test_serialize_with_data(self):
612 nd_opt = icmpv6.nd_option_sla(self.nd_length, self.nd_hw_src)
613 rs = icmpv6.nd_router_solicit(self.res, nd_opt)
614 prev = ipv6(6, 0, 0, 16, 64, 255, self.src_ipv6, self.dst_ipv6)
615 rs_csum = icmpv6_csum(prev, self.buf + self.data)
617 icmp = icmpv6.icmpv6(self.type_, self.code, 0, rs)
618 buf = six.binary_type(icmp.serialize(bytearray(), prev))
620 (type_, code, csum) = struct.unpack_from(icmp._PACK_STR, buf, 0)
621 res = struct.unpack_from(rs._PACK_STR, buf, icmp._MIN_LEN)
622 (nd_type, nd_length, nd_hw_src) = struct.unpack_from(
623 nd_opt._PACK_STR, buf, icmp._MIN_LEN + rs._MIN_LEN)
624 data = buf[(icmp._MIN_LEN + rs._MIN_LEN + 8):]
626 eq_(type_, self.type_)
629 eq_(res[0], self.res)
630 eq_(nd_type, self.nd_type)
631 eq_(nd_length, self.nd_length)
632 eq_(nd_hw_src, addrconv.mac.text_to_bin(self.nd_hw_src))
634 def test_to_string(self):
635 nd_opt = icmpv6.nd_option_sla(self.nd_length, self.nd_hw_src)
636 rs = icmpv6.nd_router_solicit(self.res, nd_opt)
637 ic = icmpv6.icmpv6(self.type_, self.code, self.csum, rs)
639 nd_opt_values = {'length': self.nd_length,
640 'hw_src': self.nd_hw_src,
642 _nd_opt_str = ','.join(['%s=%s' % (k, repr(nd_opt_values[k]))
643 for k, v in inspect.getmembers(nd_opt)
644 if k in nd_opt_values])
645 nd_opt_str = '%s(%s)' % (icmpv6.nd_option_sla.__name__, _nd_opt_str)
647 rs_values = {'res': repr(rs.res),
648 'option': nd_opt_str}
649 _rs_str = ','.join(['%s=%s' % (k, rs_values[k])
650 for k, v in inspect.getmembers(rs)
652 rs_str = '%s(%s)' % (icmpv6.nd_router_solicit.__name__, _rs_str)
654 icmp_values = {'type_': repr(self.type_),
655 'code': repr(self.code),
656 'csum': repr(self.csum),
658 _ic_str = ','.join(['%s=%s' % (k, icmp_values[k])
659 for k, v in inspect.getmembers(ic)
660 if k in icmp_values])
661 ic_str = '%s(%s)' % (icmpv6.icmpv6.__name__, _ic_str)
664 eq_(repr(ic), ic_str)
666 def test_default_args(self):
667 prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
669 type_=icmpv6.ND_ROUTER_SOLICIT, data=icmpv6.nd_router_solicit())
670 prev.serialize(ic, None)
671 buf = ic.serialize(bytearray(), prev)
672 res = struct.unpack(icmpv6.icmpv6._PACK_STR, six.binary_type(buf[:4]))
674 eq_(res[0], icmpv6.ND_ROUTER_SOLICIT)
676 eq_(res[2], icmpv6_csum(prev, buf))
678 res = struct.unpack(icmpv6.nd_router_solicit._PACK_STR,
679 six.binary_type(buf[4:]))
684 prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
686 type_=icmpv6.ND_ROUTER_SOLICIT,
687 data=icmpv6.nd_router_solicit(
688 option=icmpv6.nd_option_sla()))
689 prev.serialize(ic, None)
690 buf = ic.serialize(bytearray(), prev)
691 res = struct.unpack(icmpv6.icmpv6._PACK_STR, six.binary_type(buf[:4]))
693 eq_(res[0], icmpv6.ND_ROUTER_SOLICIT)
695 eq_(res[2], icmpv6_csum(prev, buf))
697 res = struct.unpack(icmpv6.nd_router_solicit._PACK_STR,
698 six.binary_type(buf[4:8]))
702 res = struct.unpack(icmpv6.nd_option_sla._PACK_STR,
703 six.binary_type(buf[8:]))
705 eq_(res[0], icmpv6.ND_OPTION_SLA)
706 eq_(res[1], len(icmpv6.nd_option_sla()) // 8)
707 eq_(res[2], addrconv.mac.text_to_bin('00:00:00:00:00:00'))
710 nd_opt = icmpv6.nd_option_sla(self.nd_length, self.nd_hw_src)
711 rs = icmpv6.nd_router_solicit(self.res, nd_opt)
712 ic1 = icmpv6.icmpv6(self.type_, self.code, self.csum, rs)
713 jsondict = ic1.to_jsondict()
714 ic2 = icmpv6.icmpv6.from_jsondict(jsondict['icmpv6'])
715 eq_(str(ic1), str(ic2))
718 class Test_icmpv6_router_advert(unittest.TestCase):
726 def test_default_args(self):
727 prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
729 type_=icmpv6.ND_ROUTER_ADVERT, data=icmpv6.nd_router_advert())
730 prev.serialize(ic, None)
731 buf = ic.serialize(bytearray(), prev)
732 res = struct.unpack(icmpv6.icmpv6._PACK_STR, six.binary_type(buf[:4]))
734 eq_(res[0], icmpv6.ND_ROUTER_ADVERT)
736 eq_(res[2], icmpv6_csum(prev, buf))
738 res = struct.unpack(icmpv6.nd_router_advert._PACK_STR,
739 six.binary_type(buf[4:]))
748 prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
750 type_=icmpv6.ND_ROUTER_ADVERT,
751 data=icmpv6.nd_router_advert(
752 options=[icmpv6.nd_option_sla()]))
753 prev.serialize(ic, None)
754 buf = ic.serialize(bytearray(), prev)
755 res = struct.unpack(icmpv6.icmpv6._PACK_STR, six.binary_type(buf[:4]))
757 eq_(res[0], icmpv6.ND_ROUTER_ADVERT)
759 eq_(res[2], icmpv6_csum(prev, buf))
761 res = struct.unpack(icmpv6.nd_router_advert._PACK_STR,
762 six.binary_type(buf[4:16]))
770 res = struct.unpack(icmpv6.nd_option_sla._PACK_STR,
771 six.binary_type(buf[16:]))
773 eq_(res[0], icmpv6.ND_OPTION_SLA)
774 eq_(res[1], len(icmpv6.nd_option_sla()) // 8)
775 eq_(res[2], addrconv.mac.text_to_bin('00:00:00:00:00:00'))
778 prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
780 type_=icmpv6.ND_ROUTER_ADVERT,
781 data=icmpv6.nd_router_advert(
782 options=[icmpv6.nd_option_pi()]))
783 prev.serialize(ic, None)
784 buf = ic.serialize(bytearray(), prev)
785 res = struct.unpack(icmpv6.icmpv6._PACK_STR, six.binary_type(buf[:4]))
787 eq_(res[0], icmpv6.ND_ROUTER_ADVERT)
789 eq_(res[2], icmpv6_csum(prev, buf))
791 res = struct.unpack(icmpv6.nd_router_advert._PACK_STR,
792 six.binary_type(buf[4:16]))
800 res = struct.unpack(icmpv6.nd_option_pi._PACK_STR,
801 six.binary_type(buf[16:]))
803 eq_(res[0], icmpv6.ND_OPTION_PI)
810 eq_(res[7], addrconv.ipv6.text_to_bin('::'))
812 # with nd_option_sla and nd_option_pi
813 prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
815 type_=icmpv6.ND_ROUTER_ADVERT,
816 data=icmpv6.nd_router_advert(
817 options=[icmpv6.nd_option_sla(), icmpv6.nd_option_pi()]))
818 prev.serialize(ic, None)
819 buf = ic.serialize(bytearray(), prev)
820 res = struct.unpack(icmpv6.icmpv6._PACK_STR, six.binary_type(buf[:4]))
822 eq_(res[0], icmpv6.ND_ROUTER_ADVERT)
824 eq_(res[2], icmpv6_csum(prev, buf))
826 res = struct.unpack(icmpv6.nd_router_advert._PACK_STR,
827 six.binary_type(buf[4:16]))
835 res = struct.unpack(icmpv6.nd_option_sla._PACK_STR,
836 six.binary_type(buf[16:24]))
838 eq_(res[0], icmpv6.ND_OPTION_SLA)
839 eq_(res[1], len(icmpv6.nd_option_sla()) // 8)
840 eq_(res[2], addrconv.mac.text_to_bin('00:00:00:00:00:00'))
842 res = struct.unpack(icmpv6.nd_option_pi._PACK_STR,
843 six.binary_type(buf[24:]))
845 eq_(res[0], icmpv6.ND_OPTION_PI)
846 eq_(res[1], len(icmpv6.nd_option_pi()) // 8)
852 eq_(res[7], addrconv.ipv6.text_to_bin('::'))
856 type_=icmpv6.ND_ROUTER_ADVERT,
857 data=icmpv6.nd_router_advert(
858 options=[icmpv6.nd_option_sla(), icmpv6.nd_option_pi()]))
859 jsondict = ic1.to_jsondict()
860 ic2 = icmpv6.icmpv6.from_jsondict(jsondict['icmpv6'])
861 eq_(str(ic1), str(ic2))
864 class Test_icmpv6_nd_option_la(unittest.TestCase):
872 def test_default_args(self):
873 la = icmpv6.nd_option_sla()
875 res = struct.unpack(icmpv6.nd_option_sla._PACK_STR, six.binary_type(buf))
877 eq_(res[0], icmpv6.ND_OPTION_SLA)
878 eq_(res[1], len(icmpv6.nd_option_sla()) // 8)
879 eq_(res[2], addrconv.mac.text_to_bin('00:00:00:00:00:00'))
882 prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
884 type_=icmpv6.ND_NEIGHBOR_ADVERT,
885 data=icmpv6.nd_neighbor(
886 option=icmpv6.nd_option_tla()))
887 prev.serialize(ic, None)
888 buf = ic.serialize(bytearray(), prev)
889 res = struct.unpack(icmpv6.icmpv6._PACK_STR, six.binary_type(buf[:4]))
891 eq_(res[0], icmpv6.ND_NEIGHBOR_ADVERT)
893 eq_(res[2], icmpv6_csum(prev, buf))
895 res = struct.unpack(icmpv6.nd_neighbor._PACK_STR,
896 six.binary_type(buf[4:24]))
899 eq_(res[1], addrconv.ipv6.text_to_bin('::'))
901 res = struct.unpack(icmpv6.nd_option_tla._PACK_STR,
902 six.binary_type(buf[24:]))
904 eq_(res[0], icmpv6.ND_OPTION_TLA)
905 eq_(res[1], len(icmpv6.nd_option_tla()) // 8)
906 eq_(res[2], addrconv.mac.text_to_bin('00:00:00:00:00:00'))
908 # with nd_router_solicit
909 prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
911 type_=icmpv6.ND_ROUTER_SOLICIT,
912 data=icmpv6.nd_router_solicit(
913 option=icmpv6.nd_option_sla()))
914 prev.serialize(ic, None)
915 buf = ic.serialize(bytearray(), prev)
916 res = struct.unpack(icmpv6.icmpv6._PACK_STR, six.binary_type(buf[:4]))
918 eq_(res[0], icmpv6.ND_ROUTER_SOLICIT)
920 eq_(res[2], icmpv6_csum(prev, buf))
922 res = struct.unpack(icmpv6.nd_router_solicit._PACK_STR,
923 six.binary_type(buf[4:8]))
927 res = struct.unpack(icmpv6.nd_option_sla._PACK_STR,
928 six.binary_type(buf[8:]))
930 eq_(res[0], icmpv6.ND_OPTION_SLA)
931 eq_(res[1], len(icmpv6.nd_option_sla()) // 8)
932 eq_(res[2], addrconv.mac.text_to_bin('00:00:00:00:00:00'))
935 class Test_icmpv6_nd_option_pi(unittest.TestCase):
943 def test_default_args(self):
944 pi = icmpv6.nd_option_pi()
946 res = struct.unpack(icmpv6.nd_option_pi._PACK_STR, six.binary_type(buf))
948 eq_(res[0], icmpv6.ND_OPTION_PI)
949 eq_(res[1], len(icmpv6.nd_option_pi()) // 8)
955 eq_(res[7], addrconv.ipv6.text_to_bin('::'))
957 # with nd_router_advert
958 prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
960 type_=icmpv6.ND_ROUTER_ADVERT,
961 data=icmpv6.nd_router_advert(
962 options=[icmpv6.nd_option_pi()]))
963 prev.serialize(ic, None)
964 buf = ic.serialize(bytearray(), prev)
965 res = struct.unpack(icmpv6.icmpv6._PACK_STR, six.binary_type(buf[:4]))
967 eq_(res[0], icmpv6.ND_ROUTER_ADVERT)
969 eq_(res[2], icmpv6_csum(prev, buf))
971 res = struct.unpack(icmpv6.nd_router_advert._PACK_STR,
972 six.binary_type(buf[4:16]))
980 res = struct.unpack(icmpv6.nd_option_pi._PACK_STR,
981 six.binary_type(buf[16:]))
983 eq_(res[0], icmpv6.ND_OPTION_PI)
990 eq_(res[7], addrconv.ipv6.text_to_bin('::'))
993 class Test_icmpv6_membership_query(unittest.TestCase):
999 buf = b'\x82\x00\xb5\xa4\x27\x10\x00\x00' \
1000 + b'\xff\x08\x00\x00\x00\x00\x00\x00' \
1001 + b'\x00\x00\x00\x00\x00\x00\x00\x01'
1009 def test_init(self):
1010 mld = icmpv6.mld(self.maxresp, self.address)
1011 eq_(mld.maxresp, self.maxresp)
1012 eq_(mld.address, self.address)
1014 def test_parser(self):
1015 msg, n, _ = icmpv6.icmpv6.parser(self.buf)
1017 eq_(msg.type_, self.type_)
1018 eq_(msg.code, self.code)
1019 eq_(msg.csum, self.csum)
1020 eq_(msg.data.maxresp, self.maxresp)
1021 eq_(msg.data.address, self.address)
1024 def test_serialize(self):
1025 src_ipv6 = '3ffe:507:0:1:200:86ff:fe05:80da'
1026 dst_ipv6 = '3ffe:501:0:1001::2'
1027 prev = ipv6(6, 0, 0, len(self.buf), 64, 255, src_ipv6, dst_ipv6)
1028 mld_csum = icmpv6_csum(prev, self.buf)
1030 mld = icmpv6.mld(self.maxresp, self.address)
1031 icmp = icmpv6.icmpv6(self.type_, self.code, 0, mld)
1032 buf = six.binary_type(icmp.serialize(bytearray(), prev))
1034 (type_, code, csum) = struct.unpack_from(icmp._PACK_STR, buf, 0)
1035 (maxresp, address) = struct.unpack_from(
1036 mld._PACK_STR, buf, icmp._MIN_LEN)
1038 eq_(type_, self.type_)
1039 eq_(code, self.code)
1041 eq_(maxresp, self.maxresp)
1042 eq_(address, addrconv.ipv6.text_to_bin(self.address))
1044 def test_to_string(self):
1045 ml = icmpv6.mld(self.maxresp, self.address)
1046 ic = icmpv6.icmpv6(self.type_, self.code, self.csum, ml)
1048 mld_values = {'maxresp': self.maxresp,
1049 'address': self.address}
1050 _mld_str = ','.join(['%s=%s' % (k, repr(mld_values[k]))
1051 for k, v in inspect.getmembers(ml)
1052 if k in mld_values])
1053 mld_str = '%s(%s)' % (icmpv6.mld.__name__, _mld_str)
1055 icmp_values = {'type_': repr(self.type_),
1056 'code': repr(self.code),
1057 'csum': repr(self.csum),
1059 _ic_str = ','.join(['%s=%s' % (k, icmp_values[k])
1060 for k, v in inspect.getmembers(ic)
1061 if k in icmp_values])
1062 ic_str = '%s(%s)' % (icmpv6.icmpv6.__name__, _ic_str)
1064 eq_(str(ic), ic_str)
1065 eq_(repr(ic), ic_str)
1067 def test_default_args(self):
1068 prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
1070 type_=icmpv6.MLD_LISTENER_QUERY, data=icmpv6.mld())
1071 prev.serialize(ic, None)
1072 buf = ic.serialize(bytearray(), prev)
1073 res = struct.unpack(icmpv6.icmpv6._PACK_STR, six.binary_type(buf[:4]))
1075 eq_(res[0], icmpv6.MLD_LISTENER_QUERY)
1077 eq_(res[2], icmpv6_csum(prev, buf))
1079 res = struct.unpack(icmpv6.mld._PACK_STR, six.binary_type(buf[4:]))
1082 eq_(res[1], addrconv.ipv6.text_to_bin('::'))
1084 def test_json(self):
1085 ic1 = icmpv6.icmpv6(
1086 type_=icmpv6.MLD_LISTENER_QUERY,
1088 jsondict = ic1.to_jsondict()
1089 ic2 = icmpv6.icmpv6.from_jsondict(jsondict['icmpv6'])
1090 eq_(str(ic1), str(ic2))
1093 class Test_icmpv6_membership_report(Test_icmpv6_membership_query):
1099 buf = b'\x83\x00\xb4\xa4\x27\x10\x00\x00' \
1100 + b'\xff\x08\x00\x00\x00\x00\x00\x00' \
1101 + b'\x00\x00\x00\x00\x00\x00\x00\x01'
1103 def test_json(self):
1104 ic1 = icmpv6.icmpv6(
1105 type_=icmpv6.MLD_LISTENER_REPOR,
1107 jsondict = ic1.to_jsondict()
1108 ic2 = icmpv6.icmpv6.from_jsondict(jsondict['icmpv6'])
1109 eq_(str(ic1), str(ic2))
1112 class Test_icmpv6_membership_done(Test_icmpv6_membership_query):
1118 buf = b'\x84\x00\xb3\xa4\x27\x10\x00\x00' \
1119 + b'\xff\x08\x00\x00\x00\x00\x00\x00' \
1120 + b'\x00\x00\x00\x00\x00\x00\x00\x01'
1122 def test_json(self):
1123 ic1 = icmpv6.icmpv6(
1124 type_=icmpv6.MLD_LISTENER_DONE,
1126 jsondict = ic1.to_jsondict()
1127 ic2 = icmpv6.icmpv6.from_jsondict(jsondict['icmpv6'])
1128 eq_(str(ic1), str(ic2))
1131 class Test_mldv2_query(unittest.TestCase):
1139 s_qrv = s_flg << 3 | qrv
1144 mld = icmpv6.mldv2_query(
1145 maxresp, address, s_flg, qrv, qqic, num, srcs)
1147 buf = b'\x82\x00\xb5\xa4\x27\x10\x00\x00' \
1148 + b'\xff\x08\x00\x00\x00\x00\x00\x00' \
1149 + b'\x00\x00\x00\x00\x00\x00\x00\x01' \
1150 + b'\x02\x0a\x00\x00'
1155 def setUp_with_srcs(self):
1157 self.srcs = ['ff80::1', 'ff80::2']
1158 self.mld = icmpv6.mldv2_query(
1159 self.maxresp, self.address, self.s_flg, self.qrv, self.qqic,
1160 self.num, self.srcs)
1161 self.buf = b'\x82\x00\xb5\xa4\x27\x10\x00\x00' \
1162 + b'\xff\x08\x00\x00\x00\x00\x00\x00' \
1163 + b'\x00\x00\x00\x00\x00\x00\x00\x01' \
1164 + b'\x02\x0a\x00\x02' \
1165 + b'\xff\x80\x00\x00\x00\x00\x00\x00' \
1166 + b'\x00\x00\x00\x00\x00\x00\x00\x01' \
1167 + b'\xff\x80\x00\x00\x00\x00\x00\x00' \
1168 + b'\x00\x00\x00\x00\x00\x00\x00\x02'
1173 def find_protocol(self, pkt, name):
1174 for p in pkt.protocols:
1175 if p.protocol_name == name:
1178 def test_init(self):
1179 eq_(self.mld.maxresp, self.maxresp)
1180 eq_(self.mld.address, self.address)
1181 eq_(self.mld.s_flg, self.s_flg)
1182 eq_(self.mld.qrv, self.qrv)
1183 eq_(self.mld.qqic, self.qqic)
1184 eq_(self.mld.num, self.num)
1185 eq_(self.mld.srcs, self.srcs)
1187 def test_init_with_srcs(self):
1188 self.setUp_with_srcs()
1191 def test_parser(self):
1192 msg, n, _ = icmpv6.icmpv6.parser(self.buf)
1194 eq_(msg.type_, self.type_)
1195 eq_(msg.code, self.code)
1196 eq_(msg.csum, self.csum)
1197 eq_(msg.data.maxresp, self.maxresp)
1198 eq_(msg.data.address, self.address)
1199 eq_(msg.data.s_flg, self.s_flg)
1200 eq_(msg.data.qrv, self.qrv)
1201 eq_(msg.data.qqic, self.qqic)
1202 eq_(msg.data.num, self.num)
1203 eq_(msg.data.srcs, self.srcs)
1206 def test_parser_with_srcs(self):
1207 self.setUp_with_srcs()
1210 def test_serialize(self):
1211 src_ipv6 = '3ffe:507:0:1:200:86ff:fe05:80da'
1212 dst_ipv6 = '3ffe:501:0:1001::2'
1213 prev = ipv6(6, 0, 0, len(self.buf), 64, 255, src_ipv6, dst_ipv6)
1214 mld_csum = icmpv6_csum(prev, self.buf)
1216 icmp = icmpv6.icmpv6(self.type_, self.code, 0, self.mld)
1217 buf = icmp.serialize(bytearray(), prev)
1219 (type_, code, csum) = struct.unpack_from(icmp._PACK_STR,
1220 six.binary_type(buf))
1221 (maxresp, address, s_qrv, qqic, num) = struct.unpack_from(
1222 self.mld._PACK_STR, six.binary_type(buf), icmp._MIN_LEN)
1224 eq_(type_, self.type_)
1225 eq_(code, self.code)
1227 eq_(maxresp, self.maxresp)
1228 eq_(address, addrconv.ipv6.text_to_bin(self.address))
1229 s_flg = (s_qrv >> 3) & 0b1
1231 eq_(s_flg, self.s_flg)
1233 eq_(qqic, self.qqic)
1236 def test_serialize_with_srcs(self):
1237 self.setUp_with_srcs()
1238 src_ipv6 = '3ffe:507:0:1:200:86ff:fe05:80da'
1239 dst_ipv6 = '3ffe:501:0:1001::2'
1240 prev = ipv6(6, 0, 0, len(self.buf), 64, 255, src_ipv6, dst_ipv6)
1241 mld_csum = icmpv6_csum(prev, self.buf)
1243 icmp = icmpv6.icmpv6(self.type_, self.code, 0, self.mld)
1244 buf = icmp.serialize(bytearray(), prev)
1246 (type_, code, csum) = struct.unpack_from(icmp._PACK_STR,
1247 six.binary_type(buf))
1248 (maxresp, address, s_qrv, qqic, num) = struct.unpack_from(
1249 self.mld._PACK_STR, six.binary_type(buf), icmp._MIN_LEN)
1250 (addr1, addr2) = struct.unpack_from(
1251 '!16s16s', six.binary_type(buf), icmp._MIN_LEN + self.mld._MIN_LEN)
1253 eq_(type_, self.type_)
1254 eq_(code, self.code)
1256 eq_(maxresp, self.maxresp)
1257 eq_(address, addrconv.ipv6.text_to_bin(self.address))
1258 s_flg = (s_qrv >> 3) & 0b1
1260 eq_(s_flg, self.s_flg)
1262 eq_(qqic, self.qqic)
1264 eq_(addr1, addrconv.ipv6.text_to_bin(self.srcs[0]))
1265 eq_(addr2, addrconv.ipv6.text_to_bin(self.srcs[1]))
1267 def _build_mldv2_query(self):
1268 e = ethernet(ethertype=ether.ETH_TYPE_IPV6)
1269 i = ipv6(nxt=inet.IPPROTO_ICMPV6)
1270 ic = icmpv6.icmpv6(type_=icmpv6.MLD_LISTENER_QUERY,
1275 def test_build_mldv2_query(self):
1276 p = self._build_mldv2_query()
1278 e = self.find_protocol(p, "ethernet")
1280 eq_(e.ethertype, ether.ETH_TYPE_IPV6)
1282 i = self.find_protocol(p, "ipv6")
1284 eq_(i.nxt, inet.IPPROTO_ICMPV6)
1286 ic = self.find_protocol(p, "icmpv6")
1288 eq_(ic.type_, icmpv6.MLD_LISTENER_QUERY)
1290 eq_(ic.data.maxresp, self.maxresp)
1291 eq_(ic.data.address, self.address)
1292 eq_(ic.data.s_flg, self.s_flg)
1293 eq_(ic.data.qrv, self.qrv)
1294 eq_(ic.data.num, self.num)
1295 eq_(ic.data.srcs, self.srcs)
1297 def test_build_mldv2_query_with_srcs(self):
1298 self.setUp_with_srcs()
1299 self.test_build_mldv2_query()
1301 def test_to_string(self):
1302 ic = icmpv6.icmpv6(self.type_, self.code, self.csum, self.mld)
1304 mld_values = {'maxresp': self.maxresp,
1305 'address': self.address,
1306 's_flg': self.s_flg,
1311 _mld_str = ','.join(['%s=%s' % (k, repr(mld_values[k]))
1312 for k, v in inspect.getmembers(self.mld)
1313 if k in mld_values])
1314 mld_str = '%s(%s)' % (icmpv6.mldv2_query.__name__, _mld_str)
1316 icmp_values = {'type_': repr(self.type_),
1317 'code': repr(self.code),
1318 'csum': repr(self.csum),
1320 _ic_str = ','.join(['%s=%s' % (k, icmp_values[k])
1321 for k, v in inspect.getmembers(ic)
1322 if k in icmp_values])
1323 ic_str = '%s(%s)' % (icmpv6.icmpv6.__name__, _ic_str)
1325 eq_(str(ic), ic_str)
1326 eq_(repr(ic), ic_str)
1328 def test_to_string_with_srcs(self):
1329 self.setUp_with_srcs()
1330 self.test_to_string()
1332 @raises(AssertionError)
1333 def test_num_larger_than_srcs(self):
1334 self.srcs = ['ff80::1', 'ff80::2', 'ff80::3']
1335 self.num = len(self.srcs) + 1
1336 self.buf = struct.pack(
1337 icmpv6.mldv2_query._PACK_STR,
1338 self.maxresp, addrconv.ipv6.text_to_bin(self.address),
1339 self.s_qrv, self.qqic, self.num)
1340 for src in self.srcs:
1341 self.buf += struct.pack('16s', addrconv.ipv6.text_to_bin(src))
1342 self.mld = icmpv6.mldv2_query(
1343 self.maxresp, self.address, self.s_flg, self.qrv, self.qqic,
1344 self.num, self.srcs)
1347 @raises(AssertionError)
1348 def test_num_smaller_than_srcs(self):
1349 self.srcs = ['ff80::1', 'ff80::2', 'ff80::3']
1350 self.num = len(self.srcs) - 1
1351 self.buf = struct.pack(
1352 icmpv6.mldv2_query._PACK_STR,
1353 self.maxresp, addrconv.ipv6.text_to_bin(self.address),
1354 self.s_qrv, self.qqic, self.num)
1355 for src in self.srcs:
1356 self.buf += struct.pack('16s', addrconv.ipv6.text_to_bin(src))
1357 self.mld = icmpv6.mldv2_query(
1358 self.maxresp, self.address, self.s_flg, self.qrv, self.qqic,
1359 self.num, self.srcs)
1362 def test_default_args(self):
1363 prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
1365 type_=icmpv6.MLD_LISTENER_QUERY, data=icmpv6.mldv2_query())
1366 prev.serialize(ic, None)
1367 buf = ic.serialize(bytearray(), prev)
1368 res = struct.unpack(icmpv6.icmpv6._PACK_STR, six.binary_type(buf[:4]))
1370 eq_(res[0], icmpv6.MLD_LISTENER_QUERY)
1372 eq_(res[2], icmpv6_csum(prev, buf))
1374 res = struct.unpack(icmpv6.mldv2_query._PACK_STR, six.binary_type(buf[4:]))
1377 eq_(res[1], addrconv.ipv6.text_to_bin('::'))
1383 srcs = ['ff80::1', 'ff80::2', 'ff80::3']
1384 que = icmpv6.mldv2_query(srcs=srcs)
1385 buf = que.serialize()
1386 res = struct.unpack_from(
1387 icmpv6.mldv2_query._PACK_STR, six.binary_type(buf))
1390 eq_(res[1], addrconv.ipv6.text_to_bin('::'))
1393 eq_(res[4], len(srcs))
1395 (src1, src2, src3) = struct.unpack_from(
1396 '16s16s16s', six.binary_type(buf), icmpv6.mldv2_query._MIN_LEN)
1398 eq_(src1, addrconv.ipv6.text_to_bin(srcs[0]))
1399 eq_(src2, addrconv.ipv6.text_to_bin(srcs[1]))
1400 eq_(src3, addrconv.ipv6.text_to_bin(srcs[2]))
1402 def test_json(self):
1403 jsondict = self.mld.to_jsondict()
1404 mld = icmpv6.mldv2_query.from_jsondict(jsondict['mldv2_query'])
1405 eq_(str(self.mld), str(mld))
1407 def test_json_with_srcs(self):
1408 self.setUp_with_srcs()
1412 class Test_mldv2_report(unittest.TestCase):
1419 mld = icmpv6.mldv2_report(record_num, records)
1421 buf = b'\x8f\x00\xb5\xa4\x00\x00\x00\x00'
1426 def setUp_with_records(self):
1427 self.record1 = icmpv6.mldv2_report_group(
1428 icmpv6.MODE_IS_INCLUDE, 0, 0, 'ff00::1')
1429 self.record2 = icmpv6.mldv2_report_group(
1430 icmpv6.MODE_IS_INCLUDE, 0, 2, 'ff00::2',
1431 ['fe80::1', 'fe80::2'])
1432 self.record3 = icmpv6.mldv2_report_group(
1433 icmpv6.MODE_IS_INCLUDE, 1, 0, 'ff00::3', [], b'abc\x00')
1434 self.record4 = icmpv6.mldv2_report_group(
1435 icmpv6.MODE_IS_INCLUDE, 2, 2, 'ff00::4',
1436 ['fe80::1', 'fe80::2'], b'abcde\x00\x00\x00')
1437 self.records = [self.record1, self.record2, self.record3,
1439 self.record_num = len(self.records)
1440 self.mld = icmpv6.mldv2_report(self.record_num, self.records)
1441 self.buf = b'\x8f\x00\xb5\xa4\x00\x00\x00\x04' \
1442 + b'\x01\x00\x00\x00' \
1443 + b'\xff\x00\x00\x00\x00\x00\x00\x00' \
1444 + b'\x00\x00\x00\x00\x00\x00\x00\x01' \
1445 + b'\x01\x00\x00\x02' \
1446 + b'\xff\x00\x00\x00\x00\x00\x00\x00' \
1447 + b'\x00\x00\x00\x00\x00\x00\x00\x02' \
1448 + b'\xfe\x80\x00\x00\x00\x00\x00\x00' \
1449 + b'\x00\x00\x00\x00\x00\x00\x00\x01' \
1450 + b'\xfe\x80\x00\x00\x00\x00\x00\x00' \
1451 + b'\x00\x00\x00\x00\x00\x00\x00\x02' \
1452 + b'\x01\x01\x00\x00' \
1453 + b'\xff\x00\x00\x00\x00\x00\x00\x00' \
1454 + b'\x00\x00\x00\x00\x00\x00\x00\x03' \
1455 + b'\x61\x62\x63\x00' \
1456 + b'\x01\x02\x00\x02' \
1457 + b'\xff\x00\x00\x00\x00\x00\x00\x00' \
1458 + b'\x00\x00\x00\x00\x00\x00\x00\x04' \
1459 + b'\xfe\x80\x00\x00\x00\x00\x00\x00' \
1460 + b'\x00\x00\x00\x00\x00\x00\x00\x01' \
1461 + b'\xfe\x80\x00\x00\x00\x00\x00\x00' \
1462 + b'\x00\x00\x00\x00\x00\x00\x00\x02' \
1463 + b'\x61\x62\x63\x64\x65\x00\x00\x00'
1468 def find_protocol(self, pkt, name):
1469 for p in pkt.protocols:
1470 if p.protocol_name == name:
1473 def test_init(self):
1474 eq_(self.mld.record_num, self.record_num)
1475 eq_(self.mld.records, self.records)
1477 def test_init_with_records(self):
1478 self.setUp_with_records()
1481 def test_parser(self):
1482 msg, n, _ = icmpv6.icmpv6.parser(self.buf)
1484 eq_(msg.type_, self.type_)
1485 eq_(msg.code, self.code)
1486 eq_(msg.csum, self.csum)
1487 eq_(msg.data.record_num, self.record_num)
1488 eq_(repr(msg.data.records), repr(self.records))
1490 def test_parser_with_records(self):
1491 self.setUp_with_records()
1494 def test_serialize(self):
1495 src_ipv6 = '3ffe:507:0:1:200:86ff:fe05:80da'
1496 dst_ipv6 = '3ffe:501:0:1001::2'
1497 prev = ipv6(6, 0, 0, len(self.buf), 64, 255, src_ipv6, dst_ipv6)
1498 mld_csum = icmpv6_csum(prev, self.buf)
1500 icmp = icmpv6.icmpv6(self.type_, self.code, 0, self.mld)
1501 buf = icmp.serialize(bytearray(), prev)
1503 (type_, code, csum) = struct.unpack_from(icmp._PACK_STR,
1504 six.binary_type(buf))
1505 (record_num, ) = struct.unpack_from(
1506 self.mld._PACK_STR, six.binary_type(buf), icmp._MIN_LEN)
1508 eq_(type_, self.type_)
1509 eq_(code, self.code)
1511 eq_(record_num, self.record_num)
1513 def test_serialize_with_records(self):
1514 self.setUp_with_records()
1515 src_ipv6 = '3ffe:507:0:1:200:86ff:fe05:80da'
1516 dst_ipv6 = '3ffe:501:0:1001::2'
1517 prev = ipv6(6, 0, 0, len(self.buf), 64, 255, src_ipv6, dst_ipv6)
1518 mld_csum = icmpv6_csum(prev, self.buf)
1520 icmp = icmpv6.icmpv6(self.type_, self.code, 0, self.mld)
1521 buf = six.binary_type(icmp.serialize(bytearray(), prev))
1523 (type_, code, csum) = struct.unpack_from(icmp._PACK_STR,
1524 six.binary_type(buf))
1525 (record_num, ) = struct.unpack_from(
1526 self.mld._PACK_STR, six.binary_type(buf), icmp._MIN_LEN)
1527 offset = icmp._MIN_LEN + self.mld._MIN_LEN
1528 rec1 = icmpv6.mldv2_report_group.parser(buf[offset:])
1530 rec2 = icmpv6.mldv2_report_group.parser(buf[offset:])
1532 rec3 = icmpv6.mldv2_report_group.parser(buf[offset:])
1534 rec4 = icmpv6.mldv2_report_group.parser(buf[offset:])
1536 eq_(type_, self.type_)
1537 eq_(code, self.code)
1539 eq_(record_num, self.record_num)
1540 eq_(repr(rec1), repr(self.record1))
1541 eq_(repr(rec2), repr(self.record2))
1542 eq_(repr(rec3), repr(self.record3))
1543 eq_(repr(rec4), repr(self.record4))
1545 def _build_mldv2_report(self):
1546 e = ethernet(ethertype=ether.ETH_TYPE_IPV6)
1547 i = ipv6(nxt=inet.IPPROTO_ICMPV6)
1548 ic = icmpv6.icmpv6(type_=icmpv6.MLDV2_LISTENER_REPORT,
1553 def test_build_mldv2_report(self):
1554 p = self._build_mldv2_report()
1556 e = self.find_protocol(p, "ethernet")
1558 eq_(e.ethertype, ether.ETH_TYPE_IPV6)
1560 i = self.find_protocol(p, "ipv6")
1562 eq_(i.nxt, inet.IPPROTO_ICMPV6)
1564 ic = self.find_protocol(p, "icmpv6")
1566 eq_(ic.type_, icmpv6.MLDV2_LISTENER_REPORT)
1568 eq_(ic.data.record_num, self.record_num)
1569 eq_(ic.data.records, self.records)
1571 def test_build_mldv2_report_with_records(self):
1572 self.setUp_with_records()
1573 self.test_build_mldv2_report()
1575 def test_to_string(self):
1576 ic = icmpv6.icmpv6(self.type_, self.code, self.csum, self.mld)
1578 mld_values = {'record_num': self.record_num,
1579 'records': self.records}
1580 _mld_str = ','.join(['%s=%s' % (k, repr(mld_values[k]))
1581 for k, v in inspect.getmembers(self.mld)
1582 if k in mld_values])
1583 mld_str = '%s(%s)' % (icmpv6.mldv2_report.__name__, _mld_str)
1585 icmp_values = {'type_': repr(self.type_),
1586 'code': repr(self.code),
1587 'csum': repr(self.csum),
1589 _ic_str = ','.join(['%s=%s' % (k, icmp_values[k])
1590 for k, v in inspect.getmembers(ic)
1591 if k in icmp_values])
1592 ic_str = '%s(%s)' % (icmpv6.icmpv6.__name__, _ic_str)
1594 eq_(str(ic), ic_str)
1595 eq_(repr(ic), ic_str)
1597 def test_to_string_with_records(self):
1598 self.setUp_with_records()
1599 self.test_to_string()
1601 @raises(AssertionError)
1602 def test_record_num_larger_than_records(self):
1603 self.record1 = icmpv6.mldv2_report_group(
1604 icmpv6.MODE_IS_INCLUDE, 0, 0, 'ff00::1')
1605 self.record2 = icmpv6.mldv2_report_group(
1606 icmpv6.MODE_IS_INCLUDE, 0, 2, 'ff00::2',
1607 ['fe80::1', 'fe80::2'])
1608 self.record3 = icmpv6.mldv2_report_group(
1609 icmpv6.MODE_IS_INCLUDE, 1, 0, 'ff00::3', [], b'abc\x00')
1610 self.record4 = icmpv6.mldv2_report_group(
1611 icmpv6.MODE_IS_INCLUDE, 2, 2, 'ff00::4',
1612 ['fe80::1', 'fe80::2'], b'abcde\x00\x00\x00')
1613 self.records = [self.record1, self.record2, self.record3,
1615 self.record_num = len(self.records) + 1
1616 self.buf = struct.pack(
1617 icmpv6.mldv2_report._PACK_STR, self.record_num)
1618 self.buf += self.record1.serialize()
1619 self.buf += self.record2.serialize()
1620 self.buf += self.record3.serialize()
1621 self.buf += self.record4.serialize()
1622 self.mld = icmpv6.mldv2_report(self.record_num, self.records)
1625 @raises(AssertionError)
1626 def test_record_num_smaller_than_records(self):
1627 self.record1 = icmpv6.mldv2_report_group(
1628 icmpv6.MODE_IS_INCLUDE, 0, 0, 'ff00::1')
1629 self.record2 = icmpv6.mldv2_report_group(
1630 icmpv6.MODE_IS_INCLUDE, 0, 2, 'ff00::2',
1631 ['fe80::1', 'fe80::2'])
1632 self.record3 = icmpv6.mldv2_report_group(
1633 icmpv6.MODE_IS_INCLUDE, 1, 0, 'ff00::3', [], b'abc\x00')
1634 self.record4 = icmpv6.mldv2_report_group(
1635 icmpv6.MODE_IS_INCLUDE, 2, 2, 'ff00::4',
1636 ['fe80::1', 'fe80::2'], b'abcde\x00\x00\x00')
1637 self.records = [self.record1, self.record2, self.record3,
1639 self.record_num = len(self.records) - 1
1640 self.buf = struct.pack(
1641 icmpv6.mldv2_report._PACK_STR, self.record_num)
1642 self.buf += self.record1.serialize()
1643 self.buf += self.record2.serialize()
1644 self.buf += self.record3.serialize()
1645 self.buf += self.record4.serialize()
1646 self.mld = icmpv6.mldv2_report(self.record_num, self.records)
1649 def test_default_args(self):
1650 prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
1652 type_=icmpv6.MLDV2_LISTENER_REPORT, data=icmpv6.mldv2_report())
1653 prev.serialize(ic, None)
1654 buf = ic.serialize(bytearray(), prev)
1655 res = struct.unpack(icmpv6.icmpv6._PACK_STR, six.binary_type(buf[:4]))
1657 eq_(res[0], icmpv6.MLDV2_LISTENER_REPORT)
1659 eq_(res[2], icmpv6_csum(prev, buf))
1661 res = struct.unpack(icmpv6.mldv2_report._PACK_STR, six.binary_type(buf[4:]))
1665 # records without record_num
1666 record1 = icmpv6.mldv2_report_group(
1667 icmpv6.MODE_IS_INCLUDE, 0, 0, 'ff00::1')
1668 record2 = icmpv6.mldv2_report_group(
1669 icmpv6.MODE_IS_INCLUDE, 0, 2, 'ff00::2',
1670 ['fe80::1', 'fe80::2'])
1671 records = [record1, record2]
1672 rep = icmpv6.mldv2_report(records=records)
1673 buf = rep.serialize()
1674 res = struct.unpack_from(
1675 icmpv6.mldv2_report._PACK_STR, six.binary_type(buf))
1677 eq_(res[0], len(records))
1679 res = struct.unpack_from(
1680 icmpv6.mldv2_report_group._PACK_STR, six.binary_type(buf),
1681 icmpv6.mldv2_report._MIN_LEN)
1683 eq_(res[0], icmpv6.MODE_IS_INCLUDE)
1686 eq_(res[3], addrconv.ipv6.text_to_bin('ff00::1'))
1688 res = struct.unpack_from(
1689 icmpv6.mldv2_report_group._PACK_STR, six.binary_type(buf),
1690 icmpv6.mldv2_report._MIN_LEN +
1691 icmpv6.mldv2_report_group._MIN_LEN)
1693 eq_(res[0], icmpv6.MODE_IS_INCLUDE)
1696 eq_(res[3], addrconv.ipv6.text_to_bin('ff00::2'))
1698 res = struct.unpack_from(
1699 '16s16s', six.binary_type(buf),
1700 icmpv6.mldv2_report._MIN_LEN +
1701 icmpv6.mldv2_report_group._MIN_LEN +
1702 icmpv6.mldv2_report_group._MIN_LEN)
1704 eq_(res[0], addrconv.ipv6.text_to_bin('fe80::1'))
1705 eq_(res[1], addrconv.ipv6.text_to_bin('fe80::2'))
1707 def test_json(self):
1708 jsondict = self.mld.to_jsondict()
1709 mld = icmpv6.mldv2_report.from_jsondict(jsondict['mldv2_report'])
1710 eq_(str(self.mld), str(mld))
1712 def test_json_with_records(self):
1713 self.setUp_with_records()
1717 class Test_mldv2_report_group(unittest.TestCase):
1718 type_ = icmpv6.MODE_IS_INCLUDE
1724 mld = icmpv6.mldv2_report_group(
1725 type_, aux_len, num, address, srcs, aux)
1726 buf = b'\x01\x00\x00\x00' \
1727 + b'\xff\x00\x00\x00\x00\x00\x00\x00' \
1728 + b'\x00\x00\x00\x00\x00\x00\x00\x01'
1733 def setUp_with_srcs(self):
1734 self.srcs = ['fe80::1', 'fe80::2', 'fe80::3']
1735 self.num = len(self.srcs)
1736 self.mld = icmpv6.mldv2_report_group(
1737 self.type_, self.aux_len, self.num, self.address, self.srcs,
1739 self.buf = b'\x01\x00\x00\x03' \
1740 + b'\xff\x00\x00\x00\x00\x00\x00\x00' \
1741 + b'\x00\x00\x00\x00\x00\x00\x00\x01' \
1742 + b'\xfe\x80\x00\x00\x00\x00\x00\x00' \
1743 + b'\x00\x00\x00\x00\x00\x00\x00\x01' \
1744 + b'\xfe\x80\x00\x00\x00\x00\x00\x00' \
1745 + b'\x00\x00\x00\x00\x00\x00\x00\x02' \
1746 + b'\xfe\x80\x00\x00\x00\x00\x00\x00' \
1747 + b'\x00\x00\x00\x00\x00\x00\x00\x03'
1749 def setUp_with_aux(self):
1750 self.aux = b'\x01\x02\x03\x04\x05\x06\x07\x08'
1751 self.aux_len = len(self.aux) // 4
1752 self.mld = icmpv6.mldv2_report_group(
1753 self.type_, self.aux_len, self.num, self.address, self.srcs,
1755 self.buf = b'\x01\x02\x00\x00' \
1756 + b'\xff\x00\x00\x00\x00\x00\x00\x00' \
1757 + b'\x00\x00\x00\x00\x00\x00\x00\x01' \
1758 + b'\x01\x02\x03\x04\x05\x06\x07\x08'
1760 def setUp_with_srcs_and_aux(self):
1761 self.srcs = ['fe80::1', 'fe80::2', 'fe80::3']
1762 self.num = len(self.srcs)
1763 self.aux = b'\x01\x02\x03\x04\x05\x06\x07\x08'
1764 self.aux_len = len(self.aux) // 4
1765 self.mld = icmpv6.mldv2_report_group(
1766 self.type_, self.aux_len, self.num, self.address, self.srcs,
1768 self.buf = b'\x01\x02\x00\x03' \
1769 + b'\xff\x00\x00\x00\x00\x00\x00\x00' \
1770 + b'\x00\x00\x00\x00\x00\x00\x00\x01' \
1771 + b'\xfe\x80\x00\x00\x00\x00\x00\x00' \
1772 + b'\x00\x00\x00\x00\x00\x00\x00\x01' \
1773 + b'\xfe\x80\x00\x00\x00\x00\x00\x00' \
1774 + b'\x00\x00\x00\x00\x00\x00\x00\x02' \
1775 + b'\xfe\x80\x00\x00\x00\x00\x00\x00' \
1776 + b'\x00\x00\x00\x00\x00\x00\x00\x03' \
1777 + b'\x01\x02\x03\x04\x05\x06\x07\x08'
1782 def test_init(self):
1783 eq_(self.mld.type_, self.type_)
1784 eq_(self.mld.aux_len, self.aux_len)
1785 eq_(self.mld.num, self.num)
1786 eq_(self.mld.address, self.address)
1787 eq_(self.mld.srcs, self.srcs)
1788 eq_(self.mld.aux, self.aux)
1790 def test_init_with_srcs(self):
1791 self.setUp_with_srcs()
1794 def test_init_with_aux(self):
1795 self.setUp_with_aux()
1798 def test_init_with_srcs_and_aux(self):
1799 self.setUp_with_srcs_and_aux()
1802 def test_parser(self):
1803 _res = icmpv6.mldv2_report_group.parser(self.buf)
1804 if type(_res) is tuple:
1809 eq_(res.type_, self.type_)
1810 eq_(res.aux_len, self.aux_len)
1811 eq_(res.num, self.num)
1812 eq_(res.address, self.address)
1813 eq_(res.srcs, self.srcs)
1814 eq_(res.aux, self.aux)
1816 def test_parser_with_srcs(self):
1817 self.setUp_with_srcs()
1820 def test_parser_with_aux(self):
1821 self.setUp_with_aux()
1824 def test_parser_with_srcs_and_aux(self):
1825 self.setUp_with_srcs_and_aux()
1828 def test_serialize(self):
1829 buf = self.mld.serialize()
1830 res = struct.unpack_from(
1831 icmpv6.mldv2_report_group._PACK_STR, six.binary_type(buf))
1833 eq_(res[0], self.type_)
1834 eq_(res[1], self.aux_len)
1835 eq_(res[2], self.num)
1836 eq_(res[3], addrconv.ipv6.text_to_bin(self.address))
1838 def test_serialize_with_srcs(self):
1839 self.setUp_with_srcs()
1840 buf = self.mld.serialize()
1841 res = struct.unpack_from(
1842 icmpv6.mldv2_report_group._PACK_STR, six.binary_type(buf))
1843 (src1, src2, src3) = struct.unpack_from(
1844 '16s16s16s', six.binary_type(buf), icmpv6.mldv2_report_group._MIN_LEN)
1845 eq_(res[0], self.type_)
1846 eq_(res[1], self.aux_len)
1847 eq_(res[2], self.num)
1848 eq_(res[3], addrconv.ipv6.text_to_bin(self.address))
1849 eq_(src1, addrconv.ipv6.text_to_bin(self.srcs[0]))
1850 eq_(src2, addrconv.ipv6.text_to_bin(self.srcs[1]))
1851 eq_(src3, addrconv.ipv6.text_to_bin(self.srcs[2]))
1853 def test_serialize_with_aux(self):
1854 self.setUp_with_aux()
1855 buf = self.mld.serialize()
1856 res = struct.unpack_from(
1857 icmpv6.mldv2_report_group._PACK_STR, six.binary_type(buf))
1858 (aux, ) = struct.unpack_from(
1859 '%ds' % (self.aux_len * 4), six.binary_type(buf),
1860 icmpv6.mldv2_report_group._MIN_LEN)
1861 eq_(res[0], self.type_)
1862 eq_(res[1], self.aux_len)
1863 eq_(res[2], self.num)
1864 eq_(res[3], addrconv.ipv6.text_to_bin(self.address))
1867 def test_serialize_with_srcs_and_aux(self):
1868 self.setUp_with_srcs_and_aux()
1869 buf = self.mld.serialize()
1870 res = struct.unpack_from(
1871 icmpv6.mldv2_report_group._PACK_STR, six.binary_type(buf))
1872 (src1, src2, src3) = struct.unpack_from(
1873 '16s16s16s', six.binary_type(buf), icmpv6.mldv2_report_group._MIN_LEN)
1874 (aux, ) = struct.unpack_from(
1875 '%ds' % (self.aux_len * 4), six.binary_type(buf),
1876 icmpv6.mldv2_report_group._MIN_LEN + 16 * 3)
1877 eq_(res[0], self.type_)
1878 eq_(res[1], self.aux_len)
1879 eq_(res[2], self.num)
1880 eq_(res[3], addrconv.ipv6.text_to_bin(self.address))
1881 eq_(src1, addrconv.ipv6.text_to_bin(self.srcs[0]))
1882 eq_(src2, addrconv.ipv6.text_to_bin(self.srcs[1]))
1883 eq_(src3, addrconv.ipv6.text_to_bin(self.srcs[2]))
1886 def test_to_string(self):
1887 igmp_values = {'type_': repr(self.type_),
1888 'aux_len': repr(self.aux_len),
1889 'num': repr(self.num),
1890 'address': repr(self.address),
1891 'srcs': repr(self.srcs),
1892 'aux': repr(self.aux)}
1893 _g_str = ','.join(['%s=%s' % (k, igmp_values[k])
1894 for k, v in inspect.getmembers(self.mld)
1895 if k in igmp_values])
1896 g_str = '%s(%s)' % (icmpv6.mldv2_report_group.__name__, _g_str)
1898 eq_(str(self.mld), g_str)
1899 eq_(repr(self.mld), g_str)
1901 def test_to_string_with_srcs(self):
1902 self.setUp_with_srcs()
1903 self.test_to_string()
1905 def test_to_string_with_aux(self):
1906 self.setUp_with_aux()
1907 self.test_to_string()
1909 def test_to_string_with_srcs_and_aux(self):
1910 self.setUp_with_srcs_and_aux()
1911 self.test_to_string()
1914 eq_(len(self.mld), 20)
1916 def test_len_with_srcs(self):
1917 self.setUp_with_srcs()
1918 eq_(len(self.mld), 68)
1920 def test_len_with_aux(self):
1921 self.setUp_with_aux()
1922 eq_(len(self.mld), 28)
1924 def test_len_with_srcs_and_aux(self):
1925 self.setUp_with_srcs_and_aux()
1926 eq_(len(self.mld), 76)
1928 @raises(AssertionError)
1929 def test_num_larger_than_srcs(self):
1930 self.srcs = ['fe80::1', 'fe80::2', 'fe80::3']
1931 self.num = len(self.srcs) + 1
1932 self.buf = struct.pack(
1933 icmpv6.mldv2_report_group._PACK_STR, self.type_, self.aux_len,
1934 self.num, addrconv.ipv6.text_to_bin(self.address))
1935 for src in self.srcs:
1936 self.buf += struct.pack('16s', addrconv.ipv6.text_to_bin(src))
1937 self.mld = icmpv6.mldv2_report_group(
1938 self.type_, self.aux_len, self.num, self.address,
1939 self.srcs, self.aux)
1942 @raises(AssertionError)
1943 def test_num_smaller_than_srcs(self):
1944 self.srcs = ['fe80::1', 'fe80::2', 'fe80::3']
1945 self.num = len(self.srcs) - 1
1946 self.buf = struct.pack(
1947 icmpv6.mldv2_report_group._PACK_STR, self.type_, self.aux_len,
1948 self.num, addrconv.ipv6.text_to_bin(self.address))
1949 for src in self.srcs:
1950 self.buf += struct.pack('16s', addrconv.ipv6.text_to_bin(src))
1951 self.mld = icmpv6.mldv2_report_group(
1952 self.type_, self.aux_len, self.num, self.address,
1953 self.srcs, self.aux)
1956 @raises(struct.error)
1957 def test_aux_len_larger_than_aux(self):
1958 self.aux = b'\x01\x02\x03\x04\x05\x06\x07\x08'
1959 self.aux_len = len(self.aux) // 4 + 1
1960 self.buf = struct.pack(
1961 icmpv6.mldv2_report_group._PACK_STR, self.type_, self.aux_len,
1962 self.num, addrconv.ipv6.text_to_bin(self.address))
1963 self.buf += self.aux
1964 self.mld = icmpv6.mldv2_report_group(
1965 self.type_, self.aux_len, self.num, self.address,
1966 self.srcs, self.aux)
1969 @raises(AssertionError)
1970 def test_aux_len_smaller_than_aux(self):
1971 self.aux = b'\x01\x02\x03\x04\x05\x06\x07\x08'
1972 self.aux_len = len(self.aux) // 4 - 1
1973 self.buf = struct.pack(
1974 icmpv6.mldv2_report_group._PACK_STR, self.type_, self.aux_len,
1975 self.num, addrconv.ipv6.text_to_bin(self.address))
1976 self.buf += self.aux
1977 self.mld = icmpv6.mldv2_report_group(
1978 self.type_, self.aux_len, self.num, self.address,
1979 self.srcs, self.aux)
1982 def test_default_args(self):
1983 rep = icmpv6.mldv2_report_group()
1984 buf = rep.serialize()
1985 res = struct.unpack_from(
1986 icmpv6.mldv2_report_group._PACK_STR, six.binary_type(buf))
1991 eq_(res[3], addrconv.ipv6.text_to_bin('::'))
1994 srcs = ['fe80::1', 'fe80::2', 'fe80::3']
1995 rep = icmpv6.mldv2_report_group(srcs=srcs)
1996 buf = rep.serialize()
1998 res = struct.unpack_from(
1999 icmpv6.mldv2_report_group._PACK_STR, six.binary_type(buf))
2003 eq_(res[2], len(srcs))
2004 eq_(res[3], addrconv.ipv6.text_to_bin('::'))
2006 (src1, src2, src3) = struct.unpack_from(
2007 '16s16s16s', six.binary_type(buf), icmpv6.mldv2_report_group._MIN_LEN)
2009 eq_(src1, addrconv.ipv6.text_to_bin(srcs[0]))
2010 eq_(src2, addrconv.ipv6.text_to_bin(srcs[1]))
2011 eq_(src3, addrconv.ipv6.text_to_bin(srcs[2]))
2013 # aux without aux_len
2014 rep = icmpv6.mldv2_report_group(aux=b'\x01\x02\x03')
2015 buf = rep.serialize()
2016 res = struct.unpack_from(
2017 icmpv6.mldv2_report_group._PACK_STR, six.binary_type(buf))
2022 eq_(res[3], addrconv.ipv6.text_to_bin('::'))
2023 eq_(buf[icmpv6.mldv2_report_group._MIN_LEN:], b'\x01\x02\x03\x00')
2025 def test_json(self):
2026 jsondict = self.mld.to_jsondict()
2027 mld = icmpv6.mldv2_report_group.from_jsondict(
2028 jsondict['mldv2_report_group'])
2029 eq_(str(self.mld), str(mld))
2031 def test_json_with_srcs(self):
2032 self.setUp_with_srcs()
2035 def test_json_with_aux(self):
2036 self.setUp_with_aux()
2039 def test_json_with_srcs_and_aux(self):
2040 self.setUp_with_srcs_and_aux()