1 # Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
23 from nose.tools import *
24 from ryu.lib import addrconv
25 from ryu.lib import ip
26 from ryu.lib.packet import ipv6
29 LOG = logging.getLogger(__name__)
32 class Test_ipv6(unittest.TestCase):
36 self.traffic_class = 0
38 self.payload_length = 817
41 self.src = '2002:4637:d5d3::4637:d5d3'
42 self.dst = '2001:4860:0:2001::68'
45 self.version, self.traffic_class, self.flow_label,
46 self.payload_length, self.nxt, self.hop_limit, self.src,
47 self.dst, self.ext_hdrs)
50 self.version << 28 | self.traffic_class << 20 |
51 self.flow_label << 12)
52 self.buf = struct.pack(
53 ipv6.ipv6._PACK_STR, self.v_tc_flow,
54 self.payload_length, self.nxt, self.hop_limit,
55 addrconv.ipv6.text_to_bin(self.src),
56 addrconv.ipv6.text_to_bin(self.dst))
58 def setUp_with_hop_opts(self):
61 self.opt1_data = b'\x00\x00'
66 ipv6.option(self.opt1_type, self.opt1_len, self.opt1_data),
67 ipv6.option(self.opt2_type, self.opt2_len, self.opt2_data),
70 self.hop_opts_size = 0
71 self.hop_opts = ipv6.hop_opts(
72 self.hop_opts_nxt, self.hop_opts_size, self.options)
73 self.ext_hdrs = [self.hop_opts]
74 self.payload_length += len(self.hop_opts)
75 self.nxt = ipv6.hop_opts.TYPE
77 self.version, self.traffic_class, self.flow_label,
78 self.payload_length, self.nxt, self.hop_limit, self.src,
79 self.dst, self.ext_hdrs)
80 self.buf = struct.pack(
81 ipv6.ipv6._PACK_STR, self.v_tc_flow,
82 self.payload_length, self.nxt, self.hop_limit,
83 addrconv.ipv6.text_to_bin(self.src),
84 addrconv.ipv6.text_to_bin(self.dst))
85 self.buf += self.hop_opts.serialize()
87 def setUp_with_dst_opts(self):
90 self.opt1_data = b'\x00\x00'
95 ipv6.option(self.opt1_type, self.opt1_len, self.opt1_data),
96 ipv6.option(self.opt2_type, self.opt2_len, self.opt2_data),
99 self.dst_opts_size = 0
100 self.dst_opts = ipv6.dst_opts(
101 self.dst_opts_nxt, self.dst_opts_size, self.options)
102 self.ext_hdrs = [self.dst_opts]
103 self.payload_length += len(self.dst_opts)
104 self.nxt = ipv6.dst_opts.TYPE
106 self.version, self.traffic_class, self.flow_label,
107 self.payload_length, self.nxt, self.hop_limit, self.src,
108 self.dst, self.ext_hdrs)
109 self.buf = struct.pack(
110 ipv6.ipv6._PACK_STR, self.v_tc_flow,
111 self.payload_length, self.nxt, self.hop_limit,
112 addrconv.ipv6.text_to_bin(self.src),
113 addrconv.ipv6.text_to_bin(self.dst))
114 self.buf += self.dst_opts.serialize()
116 def setUp_with_routing_type3(self):
118 self.routing_size = 6
119 self.routing_type = 3
121 self.routing_cmpi = 0
122 self.routing_cmpe = 0
123 self.routing_adrs = ["2001:db8:dead::1", "2001:db8:dead::2",
125 self.routing = ipv6.routing_type3(
126 self.routing_nxt, self.routing_size,
127 self.routing_type, self.routing_seg,
128 self.routing_cmpi, self.routing_cmpe,
130 self.ext_hdrs = [self.routing]
131 self.payload_length += len(self.routing)
132 self.nxt = ipv6.routing.TYPE
134 self.version, self.traffic_class, self.flow_label,
135 self.payload_length, self.nxt, self.hop_limit, self.src,
136 self.dst, self.ext_hdrs)
137 self.buf = struct.pack(
138 ipv6.ipv6._PACK_STR, self.v_tc_flow,
139 self.payload_length, self.nxt, self.hop_limit,
140 addrconv.ipv6.text_to_bin(self.src),
141 addrconv.ipv6.text_to_bin(self.dst))
142 self.buf += self.routing.serialize()
144 def setUp_with_fragment(self):
145 self.fragment_nxt = 6
146 self.fragment_offset = 50
147 self.fragment_more = 1
148 self.fragment_id = 123
149 self.fragment = ipv6.fragment(
150 self.fragment_nxt, self.fragment_offset, self.fragment_more,
152 self.ext_hdrs = [self.fragment]
153 self.payload_length += len(self.fragment)
154 self.nxt = ipv6.fragment.TYPE
156 self.version, self.traffic_class, self.flow_label,
157 self.payload_length, self.nxt, self.hop_limit, self.src,
158 self.dst, self.ext_hdrs)
159 self.buf = struct.pack(
160 ipv6.ipv6._PACK_STR, self.v_tc_flow,
161 self.payload_length, self.nxt, self.hop_limit,
162 addrconv.ipv6.text_to_bin(self.src),
163 addrconv.ipv6.text_to_bin(self.dst))
164 self.buf += self.fragment.serialize()
166 def setUp_with_auth(self):
171 self.auth_data = b'\xa0\xe7\xf8\xab\xf9\x69\x1a\x8b\xf3\x9f\x7c\xae'
172 self.auth = ipv6.auth(
173 self.auth_nxt, self.auth_size, self.auth_spi, self.auth_seq,
175 self.ext_hdrs = [self.auth]
176 self.payload_length += len(self.auth)
177 self.nxt = ipv6.auth.TYPE
179 self.version, self.traffic_class, self.flow_label,
180 self.payload_length, self.nxt, self.hop_limit, self.src,
181 self.dst, self.ext_hdrs)
182 self.buf = struct.pack(
183 ipv6.ipv6._PACK_STR, self.v_tc_flow,
184 self.payload_length, self.nxt, self.hop_limit,
185 addrconv.ipv6.text_to_bin(self.src),
186 addrconv.ipv6.text_to_bin(self.dst))
187 self.buf += self.auth.serialize()
189 def setUp_with_multi_headers(self):
192 self.opt1_data = b'\x00\x00'
195 self.opt2_data = None
197 ipv6.option(self.opt1_type, self.opt1_len, self.opt1_data),
198 ipv6.option(self.opt2_type, self.opt2_len, self.opt2_data),
200 self.hop_opts_nxt = ipv6.auth.TYPE
201 self.hop_opts_size = 0
202 self.hop_opts = ipv6.hop_opts(
203 self.hop_opts_nxt, self.hop_opts_size, self.options)
208 self.auth_data = b'\xa0\xe7\xf8\xab\xf9\x69\x1a\x8b\xf3\x9f\x7c\xae'
209 self.auth = ipv6.auth(
210 self.auth_nxt, self.auth_size, self.auth_spi, self.auth_seq,
212 self.ext_hdrs = [self.hop_opts, self.auth]
213 self.payload_length += len(self.hop_opts) + len(self.auth)
214 self.nxt = ipv6.hop_opts.TYPE
216 self.version, self.traffic_class, self.flow_label,
217 self.payload_length, self.nxt, self.hop_limit, self.src,
218 self.dst, self.ext_hdrs)
219 self.buf = struct.pack(
220 ipv6.ipv6._PACK_STR, self.v_tc_flow,
221 self.payload_length, self.nxt, self.hop_limit,
222 addrconv.ipv6.text_to_bin(self.src),
223 addrconv.ipv6.text_to_bin(self.dst))
224 self.buf += self.hop_opts.serialize()
225 self.buf += self.auth.serialize()
231 eq_(self.version, self.ip.version)
232 eq_(self.traffic_class, self.ip.traffic_class)
233 eq_(self.flow_label, self.ip.flow_label)
234 eq_(self.payload_length, self.ip.payload_length)
235 eq_(self.nxt, self.ip.nxt)
236 eq_(self.hop_limit, self.ip.hop_limit)
237 eq_(self.src, self.ip.src)
238 eq_(self.dst, self.ip.dst)
239 eq_(str(self.ext_hdrs), str(self.ip.ext_hdrs))
241 def test_init_with_hop_opts(self):
242 self.setUp_with_hop_opts()
245 def test_init_with_dst_opts(self):
246 self.setUp_with_dst_opts()
249 def test_init_with_routing_type3(self):
250 self.setUp_with_routing_type3()
253 def test_init_with_fragment(self):
254 self.setUp_with_fragment()
257 def test_init_with_auth(self):
258 self.setUp_with_auth()
261 def test_init_with_multi_headers(self):
262 self.setUp_with_multi_headers()
265 def test_parser(self):
266 _res = self.ip.parser(six.binary_type(self.buf))
267 if type(_res) is tuple:
272 eq_(self.version, res.version)
273 eq_(self.traffic_class, res.traffic_class)
274 eq_(self.flow_label, res.flow_label)
275 eq_(self.payload_length, res.payload_length)
276 eq_(self.nxt, res.nxt)
277 eq_(self.hop_limit, res.hop_limit)
278 eq_(self.src, res.src)
279 eq_(self.dst, res.dst)
280 eq_(str(self.ext_hdrs), str(res.ext_hdrs))
282 def test_parser_with_hop_opts(self):
283 self.setUp_with_hop_opts()
286 def test_parser_with_dst_opts(self):
287 self.setUp_with_dst_opts()
290 def test_parser_with_routing_type3(self):
291 self.setUp_with_routing_type3()
294 def test_parser_with_fragment(self):
295 self.setUp_with_fragment()
298 def test_parser_with_auth(self):
299 self.setUp_with_auth()
302 def test_parser_with_multi_headers(self):
303 self.setUp_with_multi_headers()
306 def test_serialize(self):
309 buf = self.ip.serialize(data, prev)
311 res = struct.unpack_from(ipv6.ipv6._PACK_STR, six.binary_type(buf))
313 eq_(self.v_tc_flow, res[0])
314 eq_(self.payload_length, res[1])
315 eq_(self.nxt, res[2])
316 eq_(self.hop_limit, res[3])
317 eq_(self.src, addrconv.ipv6.bin_to_text(res[4]))
318 eq_(self.dst, addrconv.ipv6.bin_to_text(res[5]))
320 def test_serialize_with_hop_opts(self):
321 self.setUp_with_hop_opts()
322 self.test_serialize()
326 buf = self.ip.serialize(data, prev)
327 hop_opts = ipv6.hop_opts.parser(six.binary_type(buf[ipv6.ipv6._MIN_LEN:]))
328 eq_(repr(self.hop_opts), repr(hop_opts))
330 def test_serialize_with_dst_opts(self):
331 self.setUp_with_dst_opts()
332 self.test_serialize()
336 buf = self.ip.serialize(data, prev)
337 dst_opts = ipv6.dst_opts.parser(six.binary_type(buf[ipv6.ipv6._MIN_LEN:]))
338 eq_(repr(self.dst_opts), repr(dst_opts))
340 def test_serialize_with_routing_type3(self):
341 self.setUp_with_routing_type3()
342 self.test_serialize()
346 buf = self.ip.serialize(data, prev)
347 routing = ipv6.routing.parser(six.binary_type(buf[ipv6.ipv6._MIN_LEN:]))
348 eq_(repr(self.routing), repr(routing))
350 def test_serialize_with_fragment(self):
351 self.setUp_with_fragment()
352 self.test_serialize()
356 buf = self.ip.serialize(data, prev)
357 fragment = ipv6.fragment.parser(six.binary_type(buf[ipv6.ipv6._MIN_LEN:]))
358 eq_(repr(self.fragment), repr(fragment))
360 def test_serialize_with_auth(self):
361 self.setUp_with_auth()
362 self.test_serialize()
366 buf = self.ip.serialize(data, prev)
367 auth = ipv6.auth.parser(six.binary_type(buf[ipv6.ipv6._MIN_LEN:]))
368 eq_(repr(self.auth), repr(auth))
370 def test_serialize_with_multi_headers(self):
371 self.setUp_with_multi_headers()
372 self.test_serialize()
376 buf = self.ip.serialize(data, prev)
377 offset = ipv6.ipv6._MIN_LEN
378 hop_opts = ipv6.hop_opts.parser(six.binary_type(buf[offset:]))
379 offset += len(hop_opts)
380 auth = ipv6.auth.parser(six.binary_type(buf[offset:]))
381 eq_(repr(self.hop_opts), repr(hop_opts))
382 eq_(repr(self.auth), repr(auth))
384 def test_to_string(self):
385 ipv6_values = {'version': self.version,
386 'traffic_class': self.traffic_class,
387 'flow_label': self.flow_label,
388 'payload_length': self.payload_length,
390 'hop_limit': self.hop_limit,
391 'src': repr(self.src),
392 'dst': repr(self.dst),
393 'ext_hdrs': self.ext_hdrs}
394 _ipv6_str = ','.join(['%s=%s' % (k, ipv6_values[k])
395 for k, v in inspect.getmembers(self.ip)
396 if k in ipv6_values])
397 ipv6_str = '%s(%s)' % (ipv6.ipv6.__name__, _ipv6_str)
399 eq_(str(self.ip), ipv6_str)
400 eq_(repr(self.ip), ipv6_str)
402 def test_to_string_with_hop_opts(self):
403 self.setUp_with_hop_opts()
404 self.test_to_string()
406 def test_to_string_with_dst_opts(self):
407 self.setUp_with_dst_opts()
408 self.test_to_string()
410 def test_to_string_with_fragment(self):
411 self.setUp_with_fragment()
412 self.test_to_string()
414 def test_to_string_with_auth(self):
415 self.setUp_with_auth()
416 self.test_to_string()
418 def test_to_string_with_multi_headers(self):
419 self.setUp_with_multi_headers()
420 self.test_to_string()
423 eq_(len(self.ip), 40)
425 def test_len_with_hop_opts(self):
426 self.setUp_with_hop_opts()
427 eq_(len(self.ip), 40 + len(self.hop_opts))
429 def test_len_with_dst_opts(self):
430 self.setUp_with_dst_opts()
431 eq_(len(self.ip), 40 + len(self.dst_opts))
433 def test_len_with_routing_type3(self):
434 self.setUp_with_routing_type3()
435 eq_(len(self.ip), 40 + len(self.routing))
437 def test_len_with_fragment(self):
438 self.setUp_with_fragment()
439 eq_(len(self.ip), 40 + len(self.fragment))
441 def test_len_with_auth(self):
442 self.setUp_with_auth()
443 eq_(len(self.ip), 40 + len(self.auth))
445 def test_len_with_multi_headers(self):
446 self.setUp_with_multi_headers()
447 eq_(len(self.ip), 40 + len(self.hop_opts) + len(self.auth))
449 def test_default_args(self):
451 buf = ip.serialize(bytearray(), None)
452 res = struct.unpack(ipv6.ipv6._PACK_STR, six.binary_type(buf))
458 eq_(res[4], addrconv.ipv6.text_to_bin('10::10'))
459 eq_(res[5], addrconv.ipv6.text_to_bin('20::20'))
461 # with extension header
464 ipv6.hop_opts(58, 0, [
465 ipv6.option(5, 2, b'\x00\x00'),
466 ipv6.option(1, 0, None)])])
467 buf = ip.serialize(bytearray(), None)
468 res = struct.unpack(ipv6.ipv6._PACK_STR + '8s', six.binary_type(buf))
474 eq_(res[4], addrconv.ipv6.text_to_bin('10::10'))
475 eq_(res[5], addrconv.ipv6.text_to_bin('20::20'))
476 eq_(res[6], b'\x3a\x00\x05\x02\x00\x00\x01\x00')
479 jsondict = self.ip.to_jsondict()
480 ip = ipv6.ipv6.from_jsondict(jsondict['ipv6'])
481 eq_(str(self.ip), str(ip))
483 def test_json_with_hop_opts(self):
484 self.setUp_with_hop_opts()
487 def test_json_with_dst_opts(self):
488 self.setUp_with_dst_opts()
491 def test_json_with_routing_type3(self):
492 self.setUp_with_routing_type3()
495 def test_json_with_fragment(self):
496 self.setUp_with_fragment()
499 def test_json_with_auth(self):
500 self.setUp_with_auth()
503 def test_json_with_multi_headers(self):
504 self.setUp_with_multi_headers()
508 class Test_hop_opts(unittest.TestCase):
514 ipv6.option(5, 2, b'\x00\x00'),
515 ipv6.option(1, 0, None),
516 ipv6.option(0xc2, 4, b'\x00\x01\x00\x00'),
517 ipv6.option(1, 0, None),
519 self.hop = ipv6.hop_opts(self.nxt, self.size, self.data)
521 self.buf = struct.pack(self.form, self.nxt, self.size) \
522 + self.data[0].serialize() \
523 + self.data[1].serialize() \
524 + self.data[2].serialize() \
525 + self.data[3].serialize()
531 eq_(self.nxt, self.hop.nxt)
532 eq_(self.size, self.hop.size)
533 eq_(self.data, self.hop.data)
536 def test_invalid_size(self):
537 ipv6.hop_opts(self.nxt, 1, self.data)
539 def test_parser(self):
540 _res = ipv6.hop_opts.parser(self.buf)
541 if type(_res) is tuple:
545 eq_(self.nxt, res.nxt)
546 eq_(self.size, res.size)
547 eq_(str(self.data), str(res.data))
549 def test_serialize(self):
550 buf = self.hop.serialize()
551 res = struct.unpack_from(self.form, six.binary_type(buf))
552 eq_(self.nxt, res[0])
553 eq_(self.size, res[1])
554 offset = struct.calcsize(self.form)
555 opt1 = ipv6.option.parser(six.binary_type(buf[offset:]))
557 opt2 = ipv6.option.parser(six.binary_type(buf[offset:]))
559 opt3 = ipv6.option.parser(six.binary_type(buf[offset:]))
561 opt4 = ipv6.option.parser(six.binary_type(buf[offset:]))
564 eq_(b'\x00\x00', opt1.data)
568 eq_(0xc2, opt3.type_)
570 eq_(b'\x00\x01\x00\x00', opt3.data)
576 eq_(16, len(self.hop))
578 def test_default_args(self):
579 hdr = ipv6.hop_opts()
580 buf = hdr.serialize()
581 res = struct.unpack('!BB', six.binary_type(buf[:2]))
585 opt = ipv6.option(type_=1, len_=4, data=b'\x00\x00\x00\x00')
586 eq_(six.binary_type(buf[2:]), opt.serialize())
589 class Test_dst_opts(unittest.TestCase):
595 ipv6.option(5, 2, b'\x00\x00'),
596 ipv6.option(1, 0, None),
597 ipv6.option(0xc2, 4, b'\x00\x01\x00\x00'),
598 ipv6.option(1, 0, None),
600 self.dst = ipv6.dst_opts(self.nxt, self.size, self.data)
602 self.buf = struct.pack(self.form, self.nxt, self.size) \
603 + self.data[0].serialize() \
604 + self.data[1].serialize() \
605 + self.data[2].serialize() \
606 + self.data[3].serialize()
612 eq_(self.nxt, self.dst.nxt)
613 eq_(self.size, self.dst.size)
614 eq_(self.data, self.dst.data)
617 def test_invalid_size(self):
618 ipv6.dst_opts(self.nxt, 1, self.data)
620 def test_parser(self):
621 _res = ipv6.dst_opts.parser(self.buf)
622 if type(_res) is tuple:
626 eq_(self.nxt, res.nxt)
627 eq_(self.size, res.size)
628 eq_(str(self.data), str(res.data))
630 def test_serialize(self):
631 buf = self.dst.serialize()
632 res = struct.unpack_from(self.form, six.binary_type(buf))
633 eq_(self.nxt, res[0])
634 eq_(self.size, res[1])
635 offset = struct.calcsize(self.form)
636 opt1 = ipv6.option.parser(six.binary_type(buf[offset:]))
638 opt2 = ipv6.option.parser(six.binary_type(buf[offset:]))
640 opt3 = ipv6.option.parser(six.binary_type(buf[offset:]))
642 opt4 = ipv6.option.parser(six.binary_type(buf[offset:]))
645 eq_(b'\x00\x00', opt1.data)
649 eq_(0xc2, opt3.type_)
651 eq_(b'\x00\x01\x00\x00', opt3.data)
657 eq_(16, len(self.dst))
659 def test_default_args(self):
660 hdr = ipv6.dst_opts()
661 buf = hdr.serialize()
662 res = struct.unpack('!BB', six.binary_type(buf[:2]))
666 opt = ipv6.option(type_=1, len_=4, data=b'\x00\x00\x00\x00')
667 eq_(six.binary_type(buf[2:]), opt.serialize())
670 class Test_option(unittest.TestCase):
674 self.data = b'\x00\x00'
675 self.len_ = len(self.data)
676 self.opt = ipv6.option(self.type_, self.len_, self.data)
677 self.form = '!BB%ds' % self.len_
678 self.buf = struct.pack(self.form, self.type_, self.len_, self.data)
684 eq_(self.type_, self.opt.type_)
685 eq_(self.len_, self.opt.len_)
686 eq_(self.data, self.opt.data)
688 def test_parser(self):
689 _res = ipv6.option.parser(self.buf)
690 if type(_res) is tuple:
694 eq_(self.type_, res.type_)
695 eq_(self.len_, res.len_)
696 eq_(self.data, res.data)
698 def test_serialize(self):
699 buf = self.opt.serialize()
700 res = struct.unpack_from(self.form, buf)
701 eq_(self.type_, res[0])
702 eq_(self.len_, res[1])
703 eq_(self.data, res[2])
706 eq_(len(self.opt), 2 + self.len_)
709 class Test_option_pad1(Test_option):
715 self.opt = ipv6.option(self.type_, self.len_, self.data)
717 self.buf = struct.pack(self.form, self.type_)
719 def test_serialize(self):
720 buf = self.opt.serialize()
721 res = struct.unpack_from(self.form, buf)
722 eq_(self.type_, res[0])
724 def test_default_args(self):
726 buf = opt.serialize()
727 res = struct.unpack('!B', buf)
732 class Test_option_padN(Test_option):
738 self.opt = ipv6.option(self.type_, self.len_, self.data)
740 self.buf = struct.pack(self.form, self.type_, self.len_)
742 def test_serialize(self):
743 buf = self.opt.serialize()
744 res = struct.unpack_from(self.form, buf)
745 eq_(self.type_, res[0])
746 eq_(self.len_, res[1])
749 class Test_routing(unittest.TestCase):
754 self.type_ = ipv6.routing.ROUTING_TYPE_3
758 self.adrs = ["2001:db8:dead::1",
762 self.pad = (8 - ((len(self.adrs) - 1) * (16 - self.cmpi) +
763 (16 - self.cmpe) % 8)) % 8
765 self.form = '!BBBBBB2x16s16s16s'
766 self.buf = struct.pack(self.form, self.nxt, self.size,
767 self.type_, self.seg,
768 (self.cmpi << 4) | self.cmpe,
770 addrconv.ipv6.text_to_bin(self.adrs[0]),
771 addrconv.ipv6.text_to_bin(self.adrs[1]),
772 addrconv.ipv6.text_to_bin(self.adrs[2]))
777 def test_parser(self):
778 _res = ipv6.routing.parser(self.buf)
779 if type(_res) is tuple:
783 eq_(self.nxt, res.nxt)
784 eq_(self.size, res.size)
785 eq_(self.type_, res.type_)
786 eq_(self.seg, res.seg)
787 eq_(self.cmpi, res.cmpi)
788 eq_(self.cmpe, res.cmpe)
789 eq_(self.pad, res._pad)
790 eq_(self.adrs[0], res.adrs[0])
791 eq_(self.adrs[1], res.adrs[1])
792 eq_(self.adrs[2], res.adrs[2])
794 def test_not_implemented_type(self):
795 not_implemented_buf = struct.pack(
796 '!BBBBBB2x', 0, 6, ipv6.routing.ROUTING_TYPE_2, 0, 0, 0)
797 instance = ipv6.routing.parser(not_implemented_buf)
798 assert None is instance
800 def test_invalid_type(self):
802 invalid_buf = struct.pack('!BBBBBB2x', 0, 6, invalid_type, 0, 0, 0)
803 instance = ipv6.routing.parser(invalid_buf)
804 assert None is instance
807 class Test_routing_type3(unittest.TestCase):
816 self.adrs = ["2001:db8:dead::1",
820 self.pad = (8 - ((len(self.adrs) - 1) * (16 - self.cmpi) +
821 (16 - self.cmpe) % 8)) % 8
823 self.routing = ipv6.routing_type3(
824 self.nxt, self.size, self.type_, self.seg, self.cmpi,
825 self.cmpe, self.adrs)
826 self.form = '!BBBBBB2x16s16s16s'
827 self.buf = struct.pack(self.form, self.nxt, self.size,
828 self.type_, self.seg,
829 (self.cmpi << 4) | self.cmpe,
831 addrconv.ipv6.text_to_bin(self.adrs[0]),
832 addrconv.ipv6.text_to_bin(self.adrs[1]),
833 addrconv.ipv6.text_to_bin(self.adrs[2]))
836 eq_(self.nxt, self.routing.nxt)
837 eq_(self.size, self.routing.size)
838 eq_(self.type_, self.routing.type_)
839 eq_(self.seg, self.routing.seg)
840 eq_(self.cmpi, self.routing.cmpi)
841 eq_(self.cmpe, self.routing.cmpe)
842 eq_(self.pad, self.routing._pad)
843 eq_(self.adrs[0], self.routing.adrs[0])
844 eq_(self.adrs[1], self.routing.adrs[1])
845 eq_(self.adrs[2], self.routing.adrs[2])
847 def test_parser(self):
848 _res = ipv6.routing.parser(self.buf)
849 if type(_res) is tuple:
853 eq_(self.nxt, res.nxt)
854 eq_(self.size, res.size)
855 eq_(self.type_, res.type_)
856 eq_(self.seg, res.seg)
857 eq_(self.cmpi, res.cmpi)
858 eq_(self.cmpe, res.cmpe)
859 eq_(self.pad, res._pad)
860 eq_(self.adrs[0], res.adrs[0])
861 eq_(self.adrs[1], res.adrs[1])
862 eq_(self.adrs[2], res.adrs[2])
864 def test_serialize(self):
865 buf = self.routing.serialize()
866 res = struct.unpack_from(self.form, six.binary_type(buf))
867 eq_(self.nxt, res[0])
868 eq_(self.size, res[1])
869 eq_(self.type_, res[2])
870 eq_(self.seg, res[3])
871 eq_(self.cmpi, res[4] >> 4)
872 eq_(self.cmpe, res[4] & 0xf)
873 eq_(self.pad, res[5])
874 eq_(addrconv.ipv6.text_to_bin(self.adrs[0]), res[6])
875 eq_(addrconv.ipv6.text_to_bin(self.adrs[1]), res[7])
876 eq_(addrconv.ipv6.text_to_bin(self.adrs[2]), res[8])
878 def test_parser_with_adrs_zero(self):
887 pad = (8 - ((len(adrs) - 1) * (16 - cmpi) + (16 - cmpe) % 8)) % 8
890 buf = struct.pack(form, nxt, size, type_, seg,
891 (cmpi << 4) | cmpe, pad << 4)
892 _res = ipv6.routing.parser(buf)
893 if type(_res) is tuple:
899 eq_(type_, res.type_)
905 def test_serialize_with_adrs_zero(self):
914 pad = (8 - ((len(adrs) - 1) * (16 - cmpi) + (16 - cmpe) % 8)) % 8
915 routing = ipv6.routing_type3(
916 nxt, size, type_, seg, cmpi,
918 buf = routing.serialize()
920 res = struct.unpack_from(form, six.binary_type(buf))
925 eq_(cmpi, res[4] >> 4)
926 eq_(cmpe, res[4] & 0xf)
929 def test_parser_with_compression(self):
937 adrs = ["2001:0db8:dead:0123:4567:89ab:cdef:0001",
938 "2001:0db8:dead:0123:4567:89ab:cdef:0002",
939 "2001:0db8:dead:0123:4567:89ab:cdef:0003"]
941 pad = (8 - ((len(adrs) - 1) * (16 - cmpi) + (16 - cmpe) % 8)) % 8
942 form = '!BBBBBB2x%ds%ds%ds' % (16 - cmpi, 16 - cmpi, 16 - cmpe)
943 slice_i = slice(cmpi, 16)
944 slice_e = slice(cmpe, 16)
945 buf = struct.pack(form, nxt, size, type_, seg,
946 (cmpi << 4) | cmpe, pad << 4,
947 addrconv.ipv6.text_to_bin(adrs[0])[slice_i],
948 addrconv.ipv6.text_to_bin(adrs[1])[slice_i],
949 addrconv.ipv6.text_to_bin(adrs[2])[slice_e])
950 _res = ipv6.routing.parser(buf)
951 if type(_res) is tuple:
957 eq_(type_, res.type_)
962 eq_("::4567:89ab:cdef:1", res.adrs[0])
963 eq_("::4567:89ab:cdef:2", res.adrs[1])
964 eq_("::205.239.0.3", res.adrs[2])
966 def test_serialize_with_compression(self):
973 adrs = ["2001:db8:dead::1",
977 pad = (8 - ((len(adrs) - 1) * (16 - cmpi) + (16 - cmpe) % 8)) % 8
978 slice_i = slice(cmpi, 16)
979 slice_e = slice(cmpe, 16)
980 routing = ipv6.routing_type3(
981 nxt, size, type_, seg, cmpi, cmpe, adrs)
982 buf = routing.serialize()
983 form = '!BBBBBB2x8s8s8s'
984 res = struct.unpack_from(form, six.binary_type(buf))
989 eq_(cmpi, res[4] >> 4)
990 eq_(cmpe, res[4] & 0xf)
992 eq_(addrconv.ipv6.text_to_bin(adrs[0])[slice_i], res[6])
993 eq_(addrconv.ipv6.text_to_bin(adrs[1])[slice_i], res[7])
994 eq_(addrconv.ipv6.text_to_bin(adrs[2])[slice_e], res[8])
997 eq_((6 + 1) * 8, len(self.routing))
999 def test_default_args(self):
1000 hdr = ipv6.routing_type3()
1001 buf = hdr.serialize()
1003 res = struct.unpack_from(ipv6.routing_type3._PACK_STR, six.binary_type(buf))
1010 eq_(res[4], (0 << 4) | 0)
1014 class Test_fragment(unittest.TestCase):
1021 self.fragment = ipv6.fragment(
1022 self.nxt, self.offset, self.more, self.id_)
1024 self.off_m = (self.offset << 3 | self.more)
1026 self.buf = struct.pack(self.form, self.nxt, self.off_m, self.id_)
1028 def test_init(self):
1029 eq_(self.nxt, self.fragment.nxt)
1030 eq_(self.offset, self.fragment.offset)
1031 eq_(self.more, self.fragment.more)
1032 eq_(self.id_, self.fragment.id_)
1034 def test_parser(self):
1035 _res = ipv6.fragment.parser(self.buf)
1036 if type(_res) is tuple:
1040 eq_(self.nxt, res.nxt)
1041 eq_(self.offset, res.offset)
1042 eq_(self.more, res.more)
1043 eq_(self.id_, res.id_)
1045 def test_serialize(self):
1046 buf = self.fragment.serialize()
1047 res = struct.unpack_from(self.form, six.binary_type(buf))
1048 eq_(self.nxt, res[0])
1049 eq_(self.off_m, res[1])
1050 eq_(self.id_, res[2])
1053 eq_(8, len(self.fragment))
1055 def test_default_args(self):
1056 hdr = ipv6.fragment()
1057 buf = hdr.serialize()
1058 res = struct.unpack_from(ipv6.fragment._PACK_STR, buf)
1065 class Test_auth(unittest.TestCase):
1072 self.data = b'\x21\xd3\xa9\x5c\x5f\xfd\x4d\x18\x46\x22\xb9\xf8'
1073 self.auth = ipv6.auth(
1074 self.nxt, self.size, self.spi, self.seq, self.data)
1075 self.form = '!BB2xII12s'
1076 self.buf = struct.pack(self.form, self.nxt, self.size, self.spi,
1077 self.seq, self.data)
1079 def test_init(self):
1080 eq_(self.nxt, self.auth.nxt)
1081 eq_(self.size, self.auth.size)
1082 eq_(self.spi, self.auth.spi)
1083 eq_(self.seq, self.auth.seq)
1084 eq_(self.data, self.auth.data)
1086 def test_parser(self):
1087 _res = ipv6.auth.parser(self.buf)
1088 if type(_res) is tuple:
1092 eq_(self.nxt, res.nxt)
1093 eq_(self.size, res.size)
1094 eq_(self.spi, res.spi)
1095 eq_(self.seq, res.seq)
1096 eq_(self.data, res.data)
1098 def test_serialize(self):
1099 buf = self.auth.serialize()
1100 res = struct.unpack_from(self.form, six.binary_type(buf))
1101 eq_(self.nxt, res[0])
1102 eq_(self.size, res[1])
1103 eq_(self.spi, res[2])
1104 eq_(self.seq, res[3])
1105 eq_(self.data, res[4])
1108 eq_((4 + 2) * 4, len(self.auth))
1110 def test_len_re(self):
1114 b'\x21\xd3\xa9\x5c\x5f\xfd\x4d\x18\x46\x22\xb9\xf8\xf8\xf8\xf8\xf8')
1115 eq_((size + 2) * 4, len(auth))
1117 def test_default_args(self):
1119 buf = hdr.serialize()
1121 res = struct.unpack_from(ipv6.auth._PACK_STR, six.binary_type(buf))
1128 eq_(buf[ipv6.auth._MIN_LEN:], b'\x00\x00\x00\x00')