backing up
[vsorcdistro/.git] / ryu / build / lib.linux-armv7l-2.7 / ryu / tests / unit / packet / test_ipv6.py
1 # Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16
17 import unittest
18 import logging
19 import inspect
20 import six
21 import struct
22
23 from nose.tools import *
24 from ryu.lib import addrconv
25 from ryu.lib import ip
26 from ryu.lib.packet import ipv6
27
28
29 LOG = logging.getLogger(__name__)
30
31
32 class Test_ipv6(unittest.TestCase):
33
34     def setUp(self):
35         self.version = 6
36         self.traffic_class = 0
37         self.flow_label = 0
38         self.payload_length = 817
39         self.nxt = 6
40         self.hop_limit = 128
41         self.src = '2002:4637:d5d3::4637:d5d3'
42         self.dst = '2001:4860:0:2001::68'
43         self.ext_hdrs = []
44         self.ip = ipv6.ipv6(
45             self.version, self.traffic_class, self.flow_label,
46             self.payload_length, self.nxt, self.hop_limit, self.src,
47             self.dst, self.ext_hdrs)
48
49         self.v_tc_flow = (
50             self.version << 28 | self.traffic_class << 20 |
51             self.flow_label << 12)
52         self.buf = struct.pack(
53             ipv6.ipv6._PACK_STR, self.v_tc_flow,
54             self.payload_length, self.nxt, self.hop_limit,
55             addrconv.ipv6.text_to_bin(self.src),
56             addrconv.ipv6.text_to_bin(self.dst))
57
58     def setUp_with_hop_opts(self):
59         self.opt1_type = 5
60         self.opt1_len = 2
61         self.opt1_data = b'\x00\x00'
62         self.opt2_type = 1
63         self.opt2_len = 0
64         self.opt2_data = None
65         self.options = [
66             ipv6.option(self.opt1_type, self.opt1_len, self.opt1_data),
67             ipv6.option(self.opt2_type, self.opt2_len, self.opt2_data),
68         ]
69         self.hop_opts_nxt = 6
70         self.hop_opts_size = 0
71         self.hop_opts = ipv6.hop_opts(
72             self.hop_opts_nxt, self.hop_opts_size, self.options)
73         self.ext_hdrs = [self.hop_opts]
74         self.payload_length += len(self.hop_opts)
75         self.nxt = ipv6.hop_opts.TYPE
76         self.ip = ipv6.ipv6(
77             self.version, self.traffic_class, self.flow_label,
78             self.payload_length, self.nxt, self.hop_limit, self.src,
79             self.dst, self.ext_hdrs)
80         self.buf = struct.pack(
81             ipv6.ipv6._PACK_STR, self.v_tc_flow,
82             self.payload_length, self.nxt, self.hop_limit,
83             addrconv.ipv6.text_to_bin(self.src),
84             addrconv.ipv6.text_to_bin(self.dst))
85         self.buf += self.hop_opts.serialize()
86
87     def setUp_with_dst_opts(self):
88         self.opt1_type = 5
89         self.opt1_len = 2
90         self.opt1_data = b'\x00\x00'
91         self.opt2_type = 1
92         self.opt2_len = 0
93         self.opt2_data = None
94         self.options = [
95             ipv6.option(self.opt1_type, self.opt1_len, self.opt1_data),
96             ipv6.option(self.opt2_type, self.opt2_len, self.opt2_data),
97         ]
98         self.dst_opts_nxt = 6
99         self.dst_opts_size = 0
100         self.dst_opts = ipv6.dst_opts(
101             self.dst_opts_nxt, self.dst_opts_size, self.options)
102         self.ext_hdrs = [self.dst_opts]
103         self.payload_length += len(self.dst_opts)
104         self.nxt = ipv6.dst_opts.TYPE
105         self.ip = ipv6.ipv6(
106             self.version, self.traffic_class, self.flow_label,
107             self.payload_length, self.nxt, self.hop_limit, self.src,
108             self.dst, self.ext_hdrs)
109         self.buf = struct.pack(
110             ipv6.ipv6._PACK_STR, self.v_tc_flow,
111             self.payload_length, self.nxt, self.hop_limit,
112             addrconv.ipv6.text_to_bin(self.src),
113             addrconv.ipv6.text_to_bin(self.dst))
114         self.buf += self.dst_opts.serialize()
115
116     def setUp_with_routing_type3(self):
117         self.routing_nxt = 6
118         self.routing_size = 6
119         self.routing_type = 3
120         self.routing_seg = 2
121         self.routing_cmpi = 0
122         self.routing_cmpe = 0
123         self.routing_adrs = ["2001:db8:dead::1", "2001:db8:dead::2",
124                              "2001:db8:dead::3"]
125         self.routing = ipv6.routing_type3(
126             self.routing_nxt, self.routing_size,
127             self.routing_type, self.routing_seg,
128             self.routing_cmpi, self.routing_cmpe,
129             self.routing_adrs)
130         self.ext_hdrs = [self.routing]
131         self.payload_length += len(self.routing)
132         self.nxt = ipv6.routing.TYPE
133         self.ip = ipv6.ipv6(
134             self.version, self.traffic_class, self.flow_label,
135             self.payload_length, self.nxt, self.hop_limit, self.src,
136             self.dst, self.ext_hdrs)
137         self.buf = struct.pack(
138             ipv6.ipv6._PACK_STR, self.v_tc_flow,
139             self.payload_length, self.nxt, self.hop_limit,
140             addrconv.ipv6.text_to_bin(self.src),
141             addrconv.ipv6.text_to_bin(self.dst))
142         self.buf += self.routing.serialize()
143
144     def setUp_with_fragment(self):
145         self.fragment_nxt = 6
146         self.fragment_offset = 50
147         self.fragment_more = 1
148         self.fragment_id = 123
149         self.fragment = ipv6.fragment(
150             self.fragment_nxt, self.fragment_offset, self.fragment_more,
151             self.fragment_id)
152         self.ext_hdrs = [self.fragment]
153         self.payload_length += len(self.fragment)
154         self.nxt = ipv6.fragment.TYPE
155         self.ip = ipv6.ipv6(
156             self.version, self.traffic_class, self.flow_label,
157             self.payload_length, self.nxt, self.hop_limit, self.src,
158             self.dst, self.ext_hdrs)
159         self.buf = struct.pack(
160             ipv6.ipv6._PACK_STR, self.v_tc_flow,
161             self.payload_length, self.nxt, self.hop_limit,
162             addrconv.ipv6.text_to_bin(self.src),
163             addrconv.ipv6.text_to_bin(self.dst))
164         self.buf += self.fragment.serialize()
165
166     def setUp_with_auth(self):
167         self.auth_nxt = 6
168         self.auth_size = 4
169         self.auth_spi = 256
170         self.auth_seq = 1
171         self.auth_data = b'\xa0\xe7\xf8\xab\xf9\x69\x1a\x8b\xf3\x9f\x7c\xae'
172         self.auth = ipv6.auth(
173             self.auth_nxt, self.auth_size, self.auth_spi, self.auth_seq,
174             self.auth_data)
175         self.ext_hdrs = [self.auth]
176         self.payload_length += len(self.auth)
177         self.nxt = ipv6.auth.TYPE
178         self.ip = ipv6.ipv6(
179             self.version, self.traffic_class, self.flow_label,
180             self.payload_length, self.nxt, self.hop_limit, self.src,
181             self.dst, self.ext_hdrs)
182         self.buf = struct.pack(
183             ipv6.ipv6._PACK_STR, self.v_tc_flow,
184             self.payload_length, self.nxt, self.hop_limit,
185             addrconv.ipv6.text_to_bin(self.src),
186             addrconv.ipv6.text_to_bin(self.dst))
187         self.buf += self.auth.serialize()
188
189     def setUp_with_multi_headers(self):
190         self.opt1_type = 5
191         self.opt1_len = 2
192         self.opt1_data = b'\x00\x00'
193         self.opt2_type = 1
194         self.opt2_len = 0
195         self.opt2_data = None
196         self.options = [
197             ipv6.option(self.opt1_type, self.opt1_len, self.opt1_data),
198             ipv6.option(self.opt2_type, self.opt2_len, self.opt2_data),
199         ]
200         self.hop_opts_nxt = ipv6.auth.TYPE
201         self.hop_opts_size = 0
202         self.hop_opts = ipv6.hop_opts(
203             self.hop_opts_nxt, self.hop_opts_size, self.options)
204         self.auth_nxt = 6
205         self.auth_size = 4
206         self.auth_spi = 256
207         self.auth_seq = 1
208         self.auth_data = b'\xa0\xe7\xf8\xab\xf9\x69\x1a\x8b\xf3\x9f\x7c\xae'
209         self.auth = ipv6.auth(
210             self.auth_nxt, self.auth_size, self.auth_spi, self.auth_seq,
211             self.auth_data)
212         self.ext_hdrs = [self.hop_opts, self.auth]
213         self.payload_length += len(self.hop_opts) + len(self.auth)
214         self.nxt = ipv6.hop_opts.TYPE
215         self.ip = ipv6.ipv6(
216             self.version, self.traffic_class, self.flow_label,
217             self.payload_length, self.nxt, self.hop_limit, self.src,
218             self.dst, self.ext_hdrs)
219         self.buf = struct.pack(
220             ipv6.ipv6._PACK_STR, self.v_tc_flow,
221             self.payload_length, self.nxt, self.hop_limit,
222             addrconv.ipv6.text_to_bin(self.src),
223             addrconv.ipv6.text_to_bin(self.dst))
224         self.buf += self.hop_opts.serialize()
225         self.buf += self.auth.serialize()
226
227     def tearDown(self):
228         pass
229
230     def test_init(self):
231         eq_(self.version, self.ip.version)
232         eq_(self.traffic_class, self.ip.traffic_class)
233         eq_(self.flow_label, self.ip.flow_label)
234         eq_(self.payload_length, self.ip.payload_length)
235         eq_(self.nxt, self.ip.nxt)
236         eq_(self.hop_limit, self.ip.hop_limit)
237         eq_(self.src, self.ip.src)
238         eq_(self.dst, self.ip.dst)
239         eq_(str(self.ext_hdrs), str(self.ip.ext_hdrs))
240
241     def test_init_with_hop_opts(self):
242         self.setUp_with_hop_opts()
243         self.test_init()
244
245     def test_init_with_dst_opts(self):
246         self.setUp_with_dst_opts()
247         self.test_init()
248
249     def test_init_with_routing_type3(self):
250         self.setUp_with_routing_type3()
251         self.test_init()
252
253     def test_init_with_fragment(self):
254         self.setUp_with_fragment()
255         self.test_init()
256
257     def test_init_with_auth(self):
258         self.setUp_with_auth()
259         self.test_init()
260
261     def test_init_with_multi_headers(self):
262         self.setUp_with_multi_headers()
263         self.test_init()
264
265     def test_parser(self):
266         _res = self.ip.parser(six.binary_type(self.buf))
267         if type(_res) is tuple:
268             res = _res[0]
269         else:
270             res = _res
271
272         eq_(self.version, res.version)
273         eq_(self.traffic_class, res.traffic_class)
274         eq_(self.flow_label, res.flow_label)
275         eq_(self.payload_length, res.payload_length)
276         eq_(self.nxt, res.nxt)
277         eq_(self.hop_limit, res.hop_limit)
278         eq_(self.src, res.src)
279         eq_(self.dst, res.dst)
280         eq_(str(self.ext_hdrs), str(res.ext_hdrs))
281
282     def test_parser_with_hop_opts(self):
283         self.setUp_with_hop_opts()
284         self.test_parser()
285
286     def test_parser_with_dst_opts(self):
287         self.setUp_with_dst_opts()
288         self.test_parser()
289
290     def test_parser_with_routing_type3(self):
291         self.setUp_with_routing_type3()
292         self.test_parser()
293
294     def test_parser_with_fragment(self):
295         self.setUp_with_fragment()
296         self.test_parser()
297
298     def test_parser_with_auth(self):
299         self.setUp_with_auth()
300         self.test_parser()
301
302     def test_parser_with_multi_headers(self):
303         self.setUp_with_multi_headers()
304         self.test_parser()
305
306     def test_serialize(self):
307         data = bytearray()
308         prev = None
309         buf = self.ip.serialize(data, prev)
310
311         res = struct.unpack_from(ipv6.ipv6._PACK_STR, six.binary_type(buf))
312
313         eq_(self.v_tc_flow, res[0])
314         eq_(self.payload_length, res[1])
315         eq_(self.nxt, res[2])
316         eq_(self.hop_limit, res[3])
317         eq_(self.src, addrconv.ipv6.bin_to_text(res[4]))
318         eq_(self.dst, addrconv.ipv6.bin_to_text(res[5]))
319
320     def test_serialize_with_hop_opts(self):
321         self.setUp_with_hop_opts()
322         self.test_serialize()
323
324         data = bytearray()
325         prev = None
326         buf = self.ip.serialize(data, prev)
327         hop_opts = ipv6.hop_opts.parser(six.binary_type(buf[ipv6.ipv6._MIN_LEN:]))
328         eq_(repr(self.hop_opts), repr(hop_opts))
329
330     def test_serialize_with_dst_opts(self):
331         self.setUp_with_dst_opts()
332         self.test_serialize()
333
334         data = bytearray()
335         prev = None
336         buf = self.ip.serialize(data, prev)
337         dst_opts = ipv6.dst_opts.parser(six.binary_type(buf[ipv6.ipv6._MIN_LEN:]))
338         eq_(repr(self.dst_opts), repr(dst_opts))
339
340     def test_serialize_with_routing_type3(self):
341         self.setUp_with_routing_type3()
342         self.test_serialize()
343
344         data = bytearray()
345         prev = None
346         buf = self.ip.serialize(data, prev)
347         routing = ipv6.routing.parser(six.binary_type(buf[ipv6.ipv6._MIN_LEN:]))
348         eq_(repr(self.routing), repr(routing))
349
350     def test_serialize_with_fragment(self):
351         self.setUp_with_fragment()
352         self.test_serialize()
353
354         data = bytearray()
355         prev = None
356         buf = self.ip.serialize(data, prev)
357         fragment = ipv6.fragment.parser(six.binary_type(buf[ipv6.ipv6._MIN_LEN:]))
358         eq_(repr(self.fragment), repr(fragment))
359
360     def test_serialize_with_auth(self):
361         self.setUp_with_auth()
362         self.test_serialize()
363
364         data = bytearray()
365         prev = None
366         buf = self.ip.serialize(data, prev)
367         auth = ipv6.auth.parser(six.binary_type(buf[ipv6.ipv6._MIN_LEN:]))
368         eq_(repr(self.auth), repr(auth))
369
370     def test_serialize_with_multi_headers(self):
371         self.setUp_with_multi_headers()
372         self.test_serialize()
373
374         data = bytearray()
375         prev = None
376         buf = self.ip.serialize(data, prev)
377         offset = ipv6.ipv6._MIN_LEN
378         hop_opts = ipv6.hop_opts.parser(six.binary_type(buf[offset:]))
379         offset += len(hop_opts)
380         auth = ipv6.auth.parser(six.binary_type(buf[offset:]))
381         eq_(repr(self.hop_opts), repr(hop_opts))
382         eq_(repr(self.auth), repr(auth))
383
384     def test_to_string(self):
385         ipv6_values = {'version': self.version,
386                        'traffic_class': self.traffic_class,
387                        'flow_label': self.flow_label,
388                        'payload_length': self.payload_length,
389                        'nxt': self.nxt,
390                        'hop_limit': self.hop_limit,
391                        'src': repr(self.src),
392                        'dst': repr(self.dst),
393                        'ext_hdrs': self.ext_hdrs}
394         _ipv6_str = ','.join(['%s=%s' % (k, ipv6_values[k])
395                               for k, v in inspect.getmembers(self.ip)
396                               if k in ipv6_values])
397         ipv6_str = '%s(%s)' % (ipv6.ipv6.__name__, _ipv6_str)
398
399         eq_(str(self.ip), ipv6_str)
400         eq_(repr(self.ip), ipv6_str)
401
402     def test_to_string_with_hop_opts(self):
403         self.setUp_with_hop_opts()
404         self.test_to_string()
405
406     def test_to_string_with_dst_opts(self):
407         self.setUp_with_dst_opts()
408         self.test_to_string()
409
410     def test_to_string_with_fragment(self):
411         self.setUp_with_fragment()
412         self.test_to_string()
413
414     def test_to_string_with_auth(self):
415         self.setUp_with_auth()
416         self.test_to_string()
417
418     def test_to_string_with_multi_headers(self):
419         self.setUp_with_multi_headers()
420         self.test_to_string()
421
422     def test_len(self):
423         eq_(len(self.ip), 40)
424
425     def test_len_with_hop_opts(self):
426         self.setUp_with_hop_opts()
427         eq_(len(self.ip), 40 + len(self.hop_opts))
428
429     def test_len_with_dst_opts(self):
430         self.setUp_with_dst_opts()
431         eq_(len(self.ip), 40 + len(self.dst_opts))
432
433     def test_len_with_routing_type3(self):
434         self.setUp_with_routing_type3()
435         eq_(len(self.ip), 40 + len(self.routing))
436
437     def test_len_with_fragment(self):
438         self.setUp_with_fragment()
439         eq_(len(self.ip), 40 + len(self.fragment))
440
441     def test_len_with_auth(self):
442         self.setUp_with_auth()
443         eq_(len(self.ip), 40 + len(self.auth))
444
445     def test_len_with_multi_headers(self):
446         self.setUp_with_multi_headers()
447         eq_(len(self.ip), 40 + len(self.hop_opts) + len(self.auth))
448
449     def test_default_args(self):
450         ip = ipv6.ipv6()
451         buf = ip.serialize(bytearray(), None)
452         res = struct.unpack(ipv6.ipv6._PACK_STR, six.binary_type(buf))
453
454         eq_(res[0], 6 << 28)
455         eq_(res[1], 0)
456         eq_(res[2], 6)
457         eq_(res[3], 255)
458         eq_(res[4], addrconv.ipv6.text_to_bin('10::10'))
459         eq_(res[5], addrconv.ipv6.text_to_bin('20::20'))
460
461         # with extension header
462         ip = ipv6.ipv6(
463             nxt=0, ext_hdrs=[
464                 ipv6.hop_opts(58, 0, [
465                     ipv6.option(5, 2, b'\x00\x00'),
466                     ipv6.option(1, 0, None)])])
467         buf = ip.serialize(bytearray(), None)
468         res = struct.unpack(ipv6.ipv6._PACK_STR + '8s', six.binary_type(buf))
469
470         eq_(res[0], 6 << 28)
471         eq_(res[1], 8)
472         eq_(res[2], 0)
473         eq_(res[3], 255)
474         eq_(res[4], addrconv.ipv6.text_to_bin('10::10'))
475         eq_(res[5], addrconv.ipv6.text_to_bin('20::20'))
476         eq_(res[6], b'\x3a\x00\x05\x02\x00\x00\x01\x00')
477
478     def test_json(self):
479         jsondict = self.ip.to_jsondict()
480         ip = ipv6.ipv6.from_jsondict(jsondict['ipv6'])
481         eq_(str(self.ip), str(ip))
482
483     def test_json_with_hop_opts(self):
484         self.setUp_with_hop_opts()
485         self.test_json()
486
487     def test_json_with_dst_opts(self):
488         self.setUp_with_dst_opts()
489         self.test_json()
490
491     def test_json_with_routing_type3(self):
492         self.setUp_with_routing_type3()
493         self.test_json()
494
495     def test_json_with_fragment(self):
496         self.setUp_with_fragment()
497         self.test_json()
498
499     def test_json_with_auth(self):
500         self.setUp_with_auth()
501         self.test_json()
502
503     def test_json_with_multi_headers(self):
504         self.setUp_with_multi_headers()
505         self.test_json()
506
507
508 class Test_hop_opts(unittest.TestCase):
509
510     def setUp(self):
511         self.nxt = 0
512         self.size = 8
513         self.data = [
514             ipv6.option(5, 2, b'\x00\x00'),
515             ipv6.option(1, 0, None),
516             ipv6.option(0xc2, 4, b'\x00\x01\x00\x00'),
517             ipv6.option(1, 0, None),
518         ]
519         self.hop = ipv6.hop_opts(self.nxt, self.size, self.data)
520         self.form = '!BB'
521         self.buf = struct.pack(self.form, self.nxt, self.size) \
522             + self.data[0].serialize() \
523             + self.data[1].serialize() \
524             + self.data[2].serialize() \
525             + self.data[3].serialize()
526
527     def tearDown(self):
528         pass
529
530     def test_init(self):
531         eq_(self.nxt, self.hop.nxt)
532         eq_(self.size, self.hop.size)
533         eq_(self.data, self.hop.data)
534
535     @raises(Exception)
536     def test_invalid_size(self):
537         ipv6.hop_opts(self.nxt, 1, self.data)
538
539     def test_parser(self):
540         _res = ipv6.hop_opts.parser(self.buf)
541         if type(_res) is tuple:
542             res = _res[0]
543         else:
544             res = _res
545         eq_(self.nxt, res.nxt)
546         eq_(self.size, res.size)
547         eq_(str(self.data), str(res.data))
548
549     def test_serialize(self):
550         buf = self.hop.serialize()
551         res = struct.unpack_from(self.form, six.binary_type(buf))
552         eq_(self.nxt, res[0])
553         eq_(self.size, res[1])
554         offset = struct.calcsize(self.form)
555         opt1 = ipv6.option.parser(six.binary_type(buf[offset:]))
556         offset += len(opt1)
557         opt2 = ipv6.option.parser(six.binary_type(buf[offset:]))
558         offset += len(opt2)
559         opt3 = ipv6.option.parser(six.binary_type(buf[offset:]))
560         offset += len(opt3)
561         opt4 = ipv6.option.parser(six.binary_type(buf[offset:]))
562         eq_(5, opt1.type_)
563         eq_(2, opt1.len_)
564         eq_(b'\x00\x00', opt1.data)
565         eq_(1, opt2.type_)
566         eq_(0, opt2.len_)
567         eq_(None, opt2.data)
568         eq_(0xc2, opt3.type_)
569         eq_(4, opt3.len_)
570         eq_(b'\x00\x01\x00\x00', opt3.data)
571         eq_(1, opt4.type_)
572         eq_(0, opt4.len_)
573         eq_(None, opt4.data)
574
575     def test_len(self):
576         eq_(16, len(self.hop))
577
578     def test_default_args(self):
579         hdr = ipv6.hop_opts()
580         buf = hdr.serialize()
581         res = struct.unpack('!BB', six.binary_type(buf[:2]))
582
583         eq_(res[0], 6)
584         eq_(res[1], 0)
585         opt = ipv6.option(type_=1, len_=4, data=b'\x00\x00\x00\x00')
586         eq_(six.binary_type(buf[2:]), opt.serialize())
587
588
589 class Test_dst_opts(unittest.TestCase):
590
591     def setUp(self):
592         self.nxt = 60
593         self.size = 8
594         self.data = [
595             ipv6.option(5, 2, b'\x00\x00'),
596             ipv6.option(1, 0, None),
597             ipv6.option(0xc2, 4, b'\x00\x01\x00\x00'),
598             ipv6.option(1, 0, None),
599         ]
600         self.dst = ipv6.dst_opts(self.nxt, self.size, self.data)
601         self.form = '!BB'
602         self.buf = struct.pack(self.form, self.nxt, self.size) \
603             + self.data[0].serialize() \
604             + self.data[1].serialize() \
605             + self.data[2].serialize() \
606             + self.data[3].serialize()
607
608     def tearDown(self):
609         pass
610
611     def test_init(self):
612         eq_(self.nxt, self.dst.nxt)
613         eq_(self.size, self.dst.size)
614         eq_(self.data, self.dst.data)
615
616     @raises(Exception)
617     def test_invalid_size(self):
618         ipv6.dst_opts(self.nxt, 1, self.data)
619
620     def test_parser(self):
621         _res = ipv6.dst_opts.parser(self.buf)
622         if type(_res) is tuple:
623             res = _res[0]
624         else:
625             res = _res
626         eq_(self.nxt, res.nxt)
627         eq_(self.size, res.size)
628         eq_(str(self.data), str(res.data))
629
630     def test_serialize(self):
631         buf = self.dst.serialize()
632         res = struct.unpack_from(self.form, six.binary_type(buf))
633         eq_(self.nxt, res[0])
634         eq_(self.size, res[1])
635         offset = struct.calcsize(self.form)
636         opt1 = ipv6.option.parser(six.binary_type(buf[offset:]))
637         offset += len(opt1)
638         opt2 = ipv6.option.parser(six.binary_type(buf[offset:]))
639         offset += len(opt2)
640         opt3 = ipv6.option.parser(six.binary_type(buf[offset:]))
641         offset += len(opt3)
642         opt4 = ipv6.option.parser(six.binary_type(buf[offset:]))
643         eq_(5, opt1.type_)
644         eq_(2, opt1.len_)
645         eq_(b'\x00\x00', opt1.data)
646         eq_(1, opt2.type_)
647         eq_(0, opt2.len_)
648         eq_(None, opt2.data)
649         eq_(0xc2, opt3.type_)
650         eq_(4, opt3.len_)
651         eq_(b'\x00\x01\x00\x00', opt3.data)
652         eq_(1, opt4.type_)
653         eq_(0, opt4.len_)
654         eq_(None, opt4.data)
655
656     def test_len(self):
657         eq_(16, len(self.dst))
658
659     def test_default_args(self):
660         hdr = ipv6.dst_opts()
661         buf = hdr.serialize()
662         res = struct.unpack('!BB', six.binary_type(buf[:2]))
663
664         eq_(res[0], 6)
665         eq_(res[1], 0)
666         opt = ipv6.option(type_=1, len_=4, data=b'\x00\x00\x00\x00')
667         eq_(six.binary_type(buf[2:]), opt.serialize())
668
669
670 class Test_option(unittest.TestCase):
671
672     def setUp(self):
673         self.type_ = 5
674         self.data = b'\x00\x00'
675         self.len_ = len(self.data)
676         self.opt = ipv6.option(self.type_, self.len_, self.data)
677         self.form = '!BB%ds' % self.len_
678         self.buf = struct.pack(self.form, self.type_, self.len_, self.data)
679
680     def tearDown(self):
681         pass
682
683     def test_init(self):
684         eq_(self.type_, self.opt.type_)
685         eq_(self.len_, self.opt.len_)
686         eq_(self.data, self.opt.data)
687
688     def test_parser(self):
689         _res = ipv6.option.parser(self.buf)
690         if type(_res) is tuple:
691             res = _res[0]
692         else:
693             res = _res
694         eq_(self.type_, res.type_)
695         eq_(self.len_, res.len_)
696         eq_(self.data, res.data)
697
698     def test_serialize(self):
699         buf = self.opt.serialize()
700         res = struct.unpack_from(self.form, buf)
701         eq_(self.type_, res[0])
702         eq_(self.len_, res[1])
703         eq_(self.data, res[2])
704
705     def test_len(self):
706         eq_(len(self.opt), 2 + self.len_)
707
708
709 class Test_option_pad1(Test_option):
710
711     def setUp(self):
712         self.type_ = 0
713         self.len_ = -1
714         self.data = None
715         self.opt = ipv6.option(self.type_, self.len_, self.data)
716         self.form = '!B'
717         self.buf = struct.pack(self.form, self.type_)
718
719     def test_serialize(self):
720         buf = self.opt.serialize()
721         res = struct.unpack_from(self.form, buf)
722         eq_(self.type_, res[0])
723
724     def test_default_args(self):
725         opt = ipv6.option()
726         buf = opt.serialize()
727         res = struct.unpack('!B', buf)
728
729         eq_(res[0], 0)
730
731
732 class Test_option_padN(Test_option):
733
734     def setUp(self):
735         self.type_ = 1
736         self.len_ = 0
737         self.data = None
738         self.opt = ipv6.option(self.type_, self.len_, self.data)
739         self.form = '!BB'
740         self.buf = struct.pack(self.form, self.type_, self.len_)
741
742     def test_serialize(self):
743         buf = self.opt.serialize()
744         res = struct.unpack_from(self.form, buf)
745         eq_(self.type_, res[0])
746         eq_(self.len_, res[1])
747
748
749 class Test_routing(unittest.TestCase):
750
751     def setUp(self):
752         self.nxt = 0
753         self.size = 6
754         self.type_ = ipv6.routing.ROUTING_TYPE_3
755         self.seg = 0
756         self.cmpi = 0
757         self.cmpe = 0
758         self.adrs = ["2001:db8:dead::1",
759                      "2001:db8:dead::2",
760                      "2001:db8:dead::3"]
761         # calculate pad
762         self.pad = (8 - ((len(self.adrs) - 1) * (16 - self.cmpi) +
763                          (16 - self.cmpe) % 8)) % 8
764         # create buf
765         self.form = '!BBBBBB2x16s16s16s'
766         self.buf = struct.pack(self.form, self.nxt, self.size,
767                                self.type_, self.seg,
768                                (self.cmpi << 4) | self.cmpe,
769                                self.pad << 4,
770                                addrconv.ipv6.text_to_bin(self.adrs[0]),
771                                addrconv.ipv6.text_to_bin(self.adrs[1]),
772                                addrconv.ipv6.text_to_bin(self.adrs[2]))
773
774     def tearDown(self):
775         pass
776
777     def test_parser(self):
778         _res = ipv6.routing.parser(self.buf)
779         if type(_res) is tuple:
780             res = _res[0]
781         else:
782             res = _res
783         eq_(self.nxt, res.nxt)
784         eq_(self.size, res.size)
785         eq_(self.type_, res.type_)
786         eq_(self.seg, res.seg)
787         eq_(self.cmpi, res.cmpi)
788         eq_(self.cmpe, res.cmpe)
789         eq_(self.pad, res._pad)
790         eq_(self.adrs[0], res.adrs[0])
791         eq_(self.adrs[1], res.adrs[1])
792         eq_(self.adrs[2], res.adrs[2])
793
794     def test_not_implemented_type(self):
795         not_implemented_buf = struct.pack(
796             '!BBBBBB2x', 0, 6, ipv6.routing.ROUTING_TYPE_2, 0, 0, 0)
797         instance = ipv6.routing.parser(not_implemented_buf)
798         assert None is instance
799
800     def test_invalid_type(self):
801         invalid_type = 99
802         invalid_buf = struct.pack('!BBBBBB2x', 0, 6, invalid_type, 0, 0, 0)
803         instance = ipv6.routing.parser(invalid_buf)
804         assert None is instance
805
806
807 class Test_routing_type3(unittest.TestCase):
808
809     def setUp(self):
810         self.nxt = 0
811         self.size = 6
812         self.type_ = 3
813         self.seg = 0
814         self.cmpi = 0
815         self.cmpe = 0
816         self.adrs = ["2001:db8:dead::1",
817                      "2001:db8:dead::2",
818                      "2001:db8:dead::3"]
819         # calculate pad
820         self.pad = (8 - ((len(self.adrs) - 1) * (16 - self.cmpi) +
821                          (16 - self.cmpe) % 8)) % 8
822
823         self.routing = ipv6.routing_type3(
824             self.nxt, self.size, self.type_, self.seg, self.cmpi,
825             self.cmpe, self.adrs)
826         self.form = '!BBBBBB2x16s16s16s'
827         self.buf = struct.pack(self.form, self.nxt, self.size,
828                                self.type_, self.seg,
829                                (self.cmpi << 4) | self.cmpe,
830                                self.pad << 4,
831                                addrconv.ipv6.text_to_bin(self.adrs[0]),
832                                addrconv.ipv6.text_to_bin(self.adrs[1]),
833                                addrconv.ipv6.text_to_bin(self.adrs[2]))
834
835     def test_init(self):
836         eq_(self.nxt, self.routing.nxt)
837         eq_(self.size, self.routing.size)
838         eq_(self.type_, self.routing.type_)
839         eq_(self.seg, self.routing.seg)
840         eq_(self.cmpi, self.routing.cmpi)
841         eq_(self.cmpe, self.routing.cmpe)
842         eq_(self.pad, self.routing._pad)
843         eq_(self.adrs[0], self.routing.adrs[0])
844         eq_(self.adrs[1], self.routing.adrs[1])
845         eq_(self.adrs[2], self.routing.adrs[2])
846
847     def test_parser(self):
848         _res = ipv6.routing.parser(self.buf)
849         if type(_res) is tuple:
850             res = _res[0]
851         else:
852             res = _res
853         eq_(self.nxt, res.nxt)
854         eq_(self.size, res.size)
855         eq_(self.type_, res.type_)
856         eq_(self.seg, res.seg)
857         eq_(self.cmpi, res.cmpi)
858         eq_(self.cmpe, res.cmpe)
859         eq_(self.pad, res._pad)
860         eq_(self.adrs[0], res.adrs[0])
861         eq_(self.adrs[1], res.adrs[1])
862         eq_(self.adrs[2], res.adrs[2])
863
864     def test_serialize(self):
865         buf = self.routing.serialize()
866         res = struct.unpack_from(self.form, six.binary_type(buf))
867         eq_(self.nxt, res[0])
868         eq_(self.size, res[1])
869         eq_(self.type_, res[2])
870         eq_(self.seg, res[3])
871         eq_(self.cmpi, res[4] >> 4)
872         eq_(self.cmpe, res[4] & 0xf)
873         eq_(self.pad, res[5])
874         eq_(addrconv.ipv6.text_to_bin(self.adrs[0]), res[6])
875         eq_(addrconv.ipv6.text_to_bin(self.adrs[1]), res[7])
876         eq_(addrconv.ipv6.text_to_bin(self.adrs[2]), res[8])
877
878     def test_parser_with_adrs_zero(self):
879         nxt = 0
880         size = 0
881         type_ = 3
882         seg = 0
883         cmpi = 0
884         cmpe = 0
885         adrs = []
886         # calculate pad
887         pad = (8 - ((len(adrs) - 1) * (16 - cmpi) + (16 - cmpe) % 8)) % 8
888
889         form = '!BBBBBB2x'
890         buf = struct.pack(form, nxt, size, type_, seg,
891                           (cmpi << 4) | cmpe, pad << 4)
892         _res = ipv6.routing.parser(buf)
893         if type(_res) is tuple:
894             res = _res[0]
895         else:
896             res = _res
897         eq_(nxt, res.nxt)
898         eq_(size, res.size)
899         eq_(type_, res.type_)
900         eq_(seg, res.seg)
901         eq_(cmpi, res.cmpi)
902         eq_(cmpe, res.cmpe)
903         eq_(pad, res._pad)
904
905     def test_serialize_with_adrs_zero(self):
906         nxt = 0
907         size = 0
908         type_ = 3
909         seg = 0
910         cmpi = 0
911         cmpe = 0
912         adrs = []
913         # calculate pad
914         pad = (8 - ((len(adrs) - 1) * (16 - cmpi) + (16 - cmpe) % 8)) % 8
915         routing = ipv6.routing_type3(
916             nxt, size, type_, seg, cmpi,
917             cmpe, pad)
918         buf = routing.serialize()
919         form = '!BBBBBB2x'
920         res = struct.unpack_from(form, six.binary_type(buf))
921         eq_(nxt, res[0])
922         eq_(size, res[1])
923         eq_(type_, res[2])
924         eq_(seg, res[3])
925         eq_(cmpi, res[4] >> 4)
926         eq_(cmpe, res[4] & 0xf)
927         eq_(pad, res[5])
928
929     def test_parser_with_compression(self):
930         pass
931         nxt = 0
932         size = 3
933         type_ = 3
934         seg = 0
935         cmpi = 8
936         cmpe = 12
937         adrs = ["2001:0db8:dead:0123:4567:89ab:cdef:0001",
938                 "2001:0db8:dead:0123:4567:89ab:cdef:0002",
939                 "2001:0db8:dead:0123:4567:89ab:cdef:0003"]
940         # calculate pad
941         pad = (8 - ((len(adrs) - 1) * (16 - cmpi) + (16 - cmpe) % 8)) % 8
942         form = '!BBBBBB2x%ds%ds%ds' % (16 - cmpi, 16 - cmpi, 16 - cmpe)
943         slice_i = slice(cmpi, 16)
944         slice_e = slice(cmpe, 16)
945         buf = struct.pack(form, nxt, size, type_, seg,
946                           (cmpi << 4) | cmpe, pad << 4,
947                           addrconv.ipv6.text_to_bin(adrs[0])[slice_i],
948                           addrconv.ipv6.text_to_bin(adrs[1])[slice_i],
949                           addrconv.ipv6.text_to_bin(adrs[2])[slice_e])
950         _res = ipv6.routing.parser(buf)
951         if type(_res) is tuple:
952             res = _res[0]
953         else:
954             res = _res
955         eq_(nxt, res.nxt)
956         eq_(size, res.size)
957         eq_(type_, res.type_)
958         eq_(seg, res.seg)
959         eq_(cmpi, res.cmpi)
960         eq_(cmpe, res.cmpe)
961         eq_(pad, res._pad)
962         eq_("::4567:89ab:cdef:1", res.adrs[0])
963         eq_("::4567:89ab:cdef:2", res.adrs[1])
964         eq_("::205.239.0.3", res.adrs[2])
965
966     def test_serialize_with_compression(self):
967         nxt = 0
968         size = 3
969         type_ = 3
970         seg = 0
971         cmpi = 8
972         cmpe = 8
973         adrs = ["2001:db8:dead::1",
974                 "2001:db8:dead::2",
975                 "2001:db8:dead::3"]
976         # calculate pad
977         pad = (8 - ((len(adrs) - 1) * (16 - cmpi) + (16 - cmpe) % 8)) % 8
978         slice_i = slice(cmpi, 16)
979         slice_e = slice(cmpe, 16)
980         routing = ipv6.routing_type3(
981             nxt, size, type_, seg, cmpi, cmpe, adrs)
982         buf = routing.serialize()
983         form = '!BBBBBB2x8s8s8s'
984         res = struct.unpack_from(form, six.binary_type(buf))
985         eq_(nxt, res[0])
986         eq_(size, res[1])
987         eq_(type_, res[2])
988         eq_(seg, res[3])
989         eq_(cmpi, res[4] >> 4)
990         eq_(cmpe, res[4] & 0xf)
991         eq_(pad, res[5])
992         eq_(addrconv.ipv6.text_to_bin(adrs[0])[slice_i], res[6])
993         eq_(addrconv.ipv6.text_to_bin(adrs[1])[slice_i], res[7])
994         eq_(addrconv.ipv6.text_to_bin(adrs[2])[slice_e], res[8])
995
996     def test_len(self):
997         eq_((6 + 1) * 8, len(self.routing))
998
999     def test_default_args(self):
1000         hdr = ipv6.routing_type3()
1001         buf = hdr.serialize()
1002         LOG.info(repr(buf))
1003         res = struct.unpack_from(ipv6.routing_type3._PACK_STR, six.binary_type(buf))
1004         LOG.info(res)
1005
1006         eq_(res[0], 6)
1007         eq_(res[1], 0)
1008         eq_(res[2], 3)
1009         eq_(res[3], 0)
1010         eq_(res[4], (0 << 4) | 0)
1011         eq_(res[5], 0)
1012
1013
1014 class Test_fragment(unittest.TestCase):
1015
1016     def setUp(self):
1017         self.nxt = 44
1018         self.offset = 50
1019         self.more = 1
1020         self.id_ = 123
1021         self.fragment = ipv6.fragment(
1022             self.nxt, self.offset, self.more, self.id_)
1023
1024         self.off_m = (self.offset << 3 | self.more)
1025         self.form = '!BxHI'
1026         self.buf = struct.pack(self.form, self.nxt, self.off_m, self.id_)
1027
1028     def test_init(self):
1029         eq_(self.nxt, self.fragment.nxt)
1030         eq_(self.offset, self.fragment.offset)
1031         eq_(self.more, self.fragment.more)
1032         eq_(self.id_, self.fragment.id_)
1033
1034     def test_parser(self):
1035         _res = ipv6.fragment.parser(self.buf)
1036         if type(_res) is tuple:
1037             res = _res[0]
1038         else:
1039             res = _res
1040         eq_(self.nxt, res.nxt)
1041         eq_(self.offset, res.offset)
1042         eq_(self.more, res.more)
1043         eq_(self.id_, res.id_)
1044
1045     def test_serialize(self):
1046         buf = self.fragment.serialize()
1047         res = struct.unpack_from(self.form, six.binary_type(buf))
1048         eq_(self.nxt, res[0])
1049         eq_(self.off_m, res[1])
1050         eq_(self.id_, res[2])
1051
1052     def test_len(self):
1053         eq_(8, len(self.fragment))
1054
1055     def test_default_args(self):
1056         hdr = ipv6.fragment()
1057         buf = hdr.serialize()
1058         res = struct.unpack_from(ipv6.fragment._PACK_STR, buf)
1059
1060         eq_(res[0], 6)
1061         eq_(res[1], 0)
1062         eq_(res[2], 0)
1063
1064
1065 class Test_auth(unittest.TestCase):
1066
1067     def setUp(self):
1068         self.nxt = 0
1069         self.size = 4
1070         self.spi = 256
1071         self.seq = 1
1072         self.data = b'\x21\xd3\xa9\x5c\x5f\xfd\x4d\x18\x46\x22\xb9\xf8'
1073         self.auth = ipv6.auth(
1074             self.nxt, self.size, self.spi, self.seq, self.data)
1075         self.form = '!BB2xII12s'
1076         self.buf = struct.pack(self.form, self.nxt, self.size, self.spi,
1077                                self.seq, self.data)
1078
1079     def test_init(self):
1080         eq_(self.nxt, self.auth.nxt)
1081         eq_(self.size, self.auth.size)
1082         eq_(self.spi, self.auth.spi)
1083         eq_(self.seq, self.auth.seq)
1084         eq_(self.data, self.auth.data)
1085
1086     def test_parser(self):
1087         _res = ipv6.auth.parser(self.buf)
1088         if type(_res) is tuple:
1089             res = _res[0]
1090         else:
1091             res = _res
1092         eq_(self.nxt, res.nxt)
1093         eq_(self.size, res.size)
1094         eq_(self.spi, res.spi)
1095         eq_(self.seq, res.seq)
1096         eq_(self.data, res.data)
1097
1098     def test_serialize(self):
1099         buf = self.auth.serialize()
1100         res = struct.unpack_from(self.form, six.binary_type(buf))
1101         eq_(self.nxt, res[0])
1102         eq_(self.size, res[1])
1103         eq_(self.spi, res[2])
1104         eq_(self.seq, res[3])
1105         eq_(self.data, res[4])
1106
1107     def test_len(self):
1108         eq_((4 + 2) * 4, len(self.auth))
1109
1110     def test_len_re(self):
1111         size = 5
1112         auth = ipv6.auth(
1113             0, size, 256, 1,
1114             b'\x21\xd3\xa9\x5c\x5f\xfd\x4d\x18\x46\x22\xb9\xf8\xf8\xf8\xf8\xf8')
1115         eq_((size + 2) * 4, len(auth))
1116
1117     def test_default_args(self):
1118         hdr = ipv6.auth()
1119         buf = hdr.serialize()
1120         LOG.info(repr(buf))
1121         res = struct.unpack_from(ipv6.auth._PACK_STR, six.binary_type(buf))
1122         LOG.info(res)
1123
1124         eq_(res[0], 6)
1125         eq_(res[1], 2)
1126         eq_(res[2], 0)
1127         eq_(res[3], 0)
1128         eq_(buf[ipv6.auth._MIN_LEN:], b'\x00\x00\x00\x00')