backing up
[vsorcdistro/.git] / ryu / build / lib.linux-armv7l-2.7 / ryu / tests / unit / packet / test_packet.py
1 # Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #    http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 # vim: tabstop=4 shiftwidth=4 softtabstop=4
17
18 import unittest
19 import logging
20 import struct
21 import inspect
22 from nose.tools import ok_, eq_
23 import six
24 from ryu.ofproto import ether, inet
25 from ryu.lib.packet import arp
26 from ryu.lib.packet import bpdu
27 from ryu.lib.packet import ethernet
28 from ryu.lib.packet import icmp, icmpv6
29 from ryu.lib.packet import ipv4, ipv6
30 from ryu.lib.packet import llc
31 from ryu.lib.packet import packet, packet_utils
32 from ryu.lib.packet import sctp
33 from ryu.lib.packet import tcp, udp
34 from ryu.lib.packet import vlan
35 from ryu.lib import addrconv
36
37
38 LOG = logging.getLogger('test_packet')
39
40
41 class TestPacket(unittest.TestCase):
42     """ Test case for packet
43     """
44
45     dst_mac = 'aa:aa:aa:aa:aa:aa'
46     src_mac = 'bb:bb:bb:bb:bb:bb'
47     dst_mac_bin = addrconv.mac.text_to_bin(dst_mac)
48     src_mac_bin = addrconv.mac.text_to_bin(src_mac)
49     dst_ip = '192.168.128.10'
50     src_ip = '192.168.122.20'
51     dst_ip_bin = addrconv.ipv4.text_to_bin(dst_ip)
52     src_port = 50001
53     dst_port = 50002
54     src_ip_bin = addrconv.ipv4.text_to_bin(src_ip)
55     payload = b'\x06\x06\x47\x50\x00\x00\x00\x00' \
56         + b'\xcd\xc5\x00\x00\x00\x00\x00\x00' \
57         + b'\x10\x11\x12\x13\x14\x15\x16\x17' \
58         + b'\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f'
59
60     def get_protocols(self, pkt):
61         protocols = {}
62         for p in pkt:
63             if hasattr(p, 'protocol_name'):
64                 protocols[p.protocol_name] = p
65             else:
66                 protocols['payload'] = p
67         return protocols
68
69     def setUp(self):
70         pass
71
72     def tearDown(self):
73         pass
74
75     def test_arp(self):
76         # buid packet
77         e = ethernet.ethernet(self.dst_mac, self.src_mac,
78                               ether.ETH_TYPE_ARP)
79         a = arp.arp(1, ether.ETH_TYPE_IP, 6, 4, 2,
80                     self.src_mac, self.src_ip, self.dst_mac,
81                     self.dst_ip)
82         p = packet.Packet()
83         p.add_protocol(e)
84         p.add_protocol(a)
85         p.serialize()
86
87         # ethernet !6s6sH
88         e_buf = self.dst_mac_bin \
89             + self.src_mac_bin \
90             + b'\x08\x06'
91
92         # arp !HHBBH6sI6sI
93         a_buf = b'\x00\x01' \
94             + b'\x08\x00' \
95             + b'\x06' \
96             + b'\x04' \
97             + b'\x00\x02' \
98             + self.src_mac_bin \
99             + self.src_ip_bin \
100             + self.dst_mac_bin \
101             + self.dst_ip_bin
102
103         buf = e_buf + a_buf
104
105         # Append padding if ethernet frame is less than 60 bytes length
106         pad_len = 60 - len(buf)
107         if pad_len > 0:
108             buf += b'\x00' * pad_len
109         eq_(buf, p.data)
110
111         # parse
112         pkt = packet.Packet(p.data)
113         protocols = self.get_protocols(pkt)
114         p_eth = protocols['ethernet']
115         p_arp = protocols['arp']
116
117         # ethernet
118         ok_(p_eth)
119         eq_(self.dst_mac, p_eth.dst)
120         eq_(self.src_mac, p_eth.src)
121         eq_(ether.ETH_TYPE_ARP, p_eth.ethertype)
122
123         # arp
124         ok_(p_arp)
125         eq_(1, p_arp.hwtype)
126         eq_(ether.ETH_TYPE_IP, p_arp.proto)
127         eq_(6, p_arp.hlen)
128         eq_(4, p_arp.plen)
129         eq_(2, p_arp.opcode)
130         eq_(self.src_mac, p_arp.src_mac)
131         eq_(self.src_ip, p_arp.src_ip)
132         eq_(self.dst_mac, p_arp.dst_mac)
133         eq_(self.dst_ip, p_arp.dst_ip)
134
135         # to string
136         eth_values = {'dst': self.dst_mac,
137                       'src': self.src_mac,
138                       'ethertype': ether.ETH_TYPE_ARP}
139         _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k]))
140                              for k, v in inspect.getmembers(p_eth)
141                              if k in eth_values])
142         eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str)
143
144         arp_values = {'hwtype': 1,
145                       'proto': ether.ETH_TYPE_IP,
146                       'hlen': 6,
147                       'plen': 4,
148                       'opcode': 2,
149                       'src_mac': self.src_mac,
150                       'dst_mac': self.dst_mac,
151                       'src_ip': self.src_ip,
152                       'dst_ip': self.dst_ip}
153         _arp_str = ','.join(['%s=%s' % (k, repr(arp_values[k]))
154                              for k, v in inspect.getmembers(p_arp)
155                              if k in arp_values])
156         arp_str = '%s(%s)' % (arp.arp.__name__, _arp_str)
157
158         pkt_str = '%s, %s' % (eth_str, arp_str)
159
160         eq_(eth_str, str(p_eth))
161         eq_(eth_str, repr(p_eth))
162
163         eq_(arp_str, str(p_arp))
164         eq_(arp_str, repr(p_arp))
165
166         eq_(pkt_str, str(pkt))
167         eq_(pkt_str, repr(pkt))
168
169     def test_vlan_arp(self):
170         # buid packet
171         e = ethernet.ethernet(self.dst_mac, self.src_mac,
172                               ether.ETH_TYPE_8021Q)
173         v = vlan.vlan(0b111, 0b1, 3, ether.ETH_TYPE_ARP)
174         a = arp.arp(1, ether.ETH_TYPE_IP, 6, 4, 2,
175                     self.src_mac, self.src_ip, self.dst_mac,
176                     self.dst_ip)
177         p = packet.Packet()
178         p.add_protocol(e)
179         p.add_protocol(v)
180         p.add_protocol(a)
181         p.serialize()
182
183         # ethernet !6s6sH
184         e_buf = self.dst_mac_bin \
185             + self.src_mac_bin \
186             + b'\x81\x00'
187
188         # vlan !HH
189         v_buf = b'\xF0\x03' \
190             + b'\x08\x06'
191
192         # arp !HHBBH6sI6sI
193         a_buf = b'\x00\x01' \
194             + b'\x08\x00' \
195             + b'\x06' \
196             + b'\x04' \
197             + b'\x00\x02' \
198             + self.src_mac_bin \
199             + self.src_ip_bin \
200             + self.dst_mac_bin \
201             + self.dst_ip_bin
202
203         buf = e_buf + v_buf + a_buf
204
205         # Append padding if ethernet frame is less than 60 bytes length
206         pad_len = 60 - len(buf)
207         if pad_len > 0:
208             buf += b'\x00' * pad_len
209         eq_(buf, p.data)
210
211         # parse
212         pkt = packet.Packet(p.data)
213         protocols = self.get_protocols(pkt)
214         p_eth = protocols['ethernet']
215         p_vlan = protocols['vlan']
216         p_arp = protocols['arp']
217
218         # ethernet
219         ok_(p_eth)
220         eq_(self.dst_mac, p_eth.dst)
221         eq_(self.src_mac, p_eth.src)
222         eq_(ether.ETH_TYPE_8021Q, p_eth.ethertype)
223
224         # vlan
225         ok_(p_vlan)
226         eq_(0b111, p_vlan.pcp)
227         eq_(0b1, p_vlan.cfi)
228         eq_(3, p_vlan.vid)
229         eq_(ether.ETH_TYPE_ARP, p_vlan.ethertype)
230
231         # arp
232         ok_(p_arp)
233         eq_(1, p_arp.hwtype)
234         eq_(ether.ETH_TYPE_IP, p_arp.proto)
235         eq_(6, p_arp.hlen)
236         eq_(4, p_arp.plen)
237         eq_(2, p_arp.opcode)
238         eq_(self.src_mac, p_arp.src_mac)
239         eq_(self.src_ip, p_arp.src_ip)
240         eq_(self.dst_mac, p_arp.dst_mac)
241         eq_(self.dst_ip, p_arp.dst_ip)
242
243         # to string
244         eth_values = {'dst': self.dst_mac,
245                       'src': self.src_mac,
246                       'ethertype': ether.ETH_TYPE_8021Q}
247         _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k]))
248                              for k, v in inspect.getmembers(p_eth)
249                              if k in eth_values])
250         eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str)
251
252         vlan_values = {'pcp': 0b111,
253                        'cfi': 0b1,
254                        'vid': 3,
255                        'ethertype': ether.ETH_TYPE_ARP}
256         _vlan_str = ','.join(['%s=%s' % (k, repr(vlan_values[k]))
257                               for k, v in inspect.getmembers(p_vlan)
258                               if k in vlan_values])
259         vlan_str = '%s(%s)' % (vlan.vlan.__name__, _vlan_str)
260
261         arp_values = {'hwtype': 1,
262                       'proto': ether.ETH_TYPE_IP,
263                       'hlen': 6,
264                       'plen': 4,
265                       'opcode': 2,
266                       'src_mac': self.src_mac,
267                       'dst_mac': self.dst_mac,
268                       'src_ip': self.src_ip,
269                       'dst_ip': self.dst_ip}
270         _arp_str = ','.join(['%s=%s' % (k, repr(arp_values[k]))
271                              for k, v in inspect.getmembers(p_arp)
272                              if k in arp_values])
273         arp_str = '%s(%s)' % (arp.arp.__name__, _arp_str)
274
275         pkt_str = '%s, %s, %s' % (eth_str, vlan_str, arp_str)
276
277         eq_(eth_str, str(p_eth))
278         eq_(eth_str, repr(p_eth))
279
280         eq_(vlan_str, str(p_vlan))
281         eq_(vlan_str, repr(p_vlan))
282
283         eq_(arp_str, str(p_arp))
284         eq_(arp_str, repr(p_arp))
285
286         eq_(pkt_str, str(pkt))
287         eq_(pkt_str, repr(pkt))
288
289     def test_ipv4_udp(self):
290         # buid packet
291         e = ethernet.ethernet(self.dst_mac, self.src_mac,
292                               ether.ETH_TYPE_IP)
293         ip = ipv4.ipv4(4, 5, 1, 0, 3, 1, 4, 64, inet.IPPROTO_UDP, 0,
294                        self.src_ip, self.dst_ip)
295         u = udp.udp(0x190F, 0x1F90, 0, 0)
296
297         p = packet.Packet()
298         p.add_protocol(e)
299         p.add_protocol(ip)
300         p.add_protocol(u)
301         p.add_protocol(self.payload)
302         p.serialize()
303
304         # ethernet !6s6sH
305         e_buf = self.dst_mac_bin \
306             + self.src_mac_bin \
307             + b'\x08\x00'
308
309         # ipv4 !BBHHHBBHII
310         ip_buf = b'\x45' \
311             + b'\x01' \
312             + b'\x00\x3C' \
313             + b'\x00\x03' \
314             + b'\x20\x04' \
315             + b'\x40' \
316             + b'\x11' \
317             + b'\x00\x00' \
318             + self.src_ip_bin \
319             + self.dst_ip_bin
320
321         # udp !HHHH
322         u_buf = b'\x19\x0F' \
323             + b'\x1F\x90' \
324             + b'\x00\x28' \
325             + b'\x00\x00'
326
327         buf = e_buf + ip_buf + u_buf + self.payload
328
329         # parse
330         pkt = packet.Packet(p.data)
331         protocols = self.get_protocols(pkt)
332         p_eth = protocols['ethernet']
333         p_ipv4 = protocols['ipv4']
334         p_udp = protocols['udp']
335
336         # ethernet
337         ok_(p_eth)
338         eq_(self.dst_mac, p_eth.dst)
339         eq_(self.src_mac, p_eth.src)
340         eq_(ether.ETH_TYPE_IP, p_eth.ethertype)
341
342         # ipv4
343         ok_(p_ipv4)
344         eq_(4, p_ipv4.version)
345         eq_(5, p_ipv4.header_length)
346         eq_(1, p_ipv4.tos)
347         l = len(ip_buf) + len(u_buf) + len(self.payload)
348         eq_(l, p_ipv4.total_length)
349         eq_(3, p_ipv4.identification)
350         eq_(1, p_ipv4.flags)
351         eq_(64, p_ipv4.ttl)
352         eq_(inet.IPPROTO_UDP, p_ipv4.proto)
353         eq_(self.src_ip, p_ipv4.src)
354         eq_(self.dst_ip, p_ipv4.dst)
355         t = bytearray(ip_buf)
356         struct.pack_into('!H', t, 10, p_ipv4.csum)
357         eq_(packet_utils.checksum(t), 0)
358
359         # udp
360         ok_(p_udp)
361         eq_(0x190f, p_udp.src_port)
362         eq_(0x1F90, p_udp.dst_port)
363         eq_(len(u_buf) + len(self.payload), p_udp.total_length)
364         eq_(0x77b2, p_udp.csum)
365         t = bytearray(u_buf)
366         struct.pack_into('!H', t, 6, p_udp.csum)
367         ph = struct.pack('!4s4sBBH', self.src_ip_bin, self.dst_ip_bin, 0,
368                          17, len(u_buf) + len(self.payload))
369         t = ph + t + self.payload
370         eq_(packet_utils.checksum(t), 0)
371
372         # payload
373         ok_('payload' in protocols)
374         eq_(self.payload, protocols['payload'])
375
376         # to string
377         eth_values = {'dst': self.dst_mac,
378                       'src': self.src_mac,
379                       'ethertype': ether.ETH_TYPE_IP}
380         _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k]))
381                              for k, v in inspect.getmembers(p_eth)
382                              if k in eth_values])
383         eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str)
384
385         ipv4_values = {'version': 4,
386                        'header_length': 5,
387                        'tos': 1,
388                        'total_length': l,
389                        'identification': 3,
390                        'flags': 1,
391                        'offset': p_ipv4.offset,
392                        'ttl': 64,
393                        'proto': inet.IPPROTO_UDP,
394                        'csum': p_ipv4.csum,
395                        'src': self.src_ip,
396                        'dst': self.dst_ip,
397                        'option': None}
398         _ipv4_str = ','.join(['%s=%s' % (k, repr(ipv4_values[k]))
399                               for k, v in inspect.getmembers(p_ipv4)
400                               if k in ipv4_values])
401         ipv4_str = '%s(%s)' % (ipv4.ipv4.__name__, _ipv4_str)
402
403         udp_values = {'src_port': 0x190f,
404                       'dst_port': 0x1F90,
405                       'total_length': len(u_buf) + len(self.payload),
406                       'csum': 0x77b2}
407         _udp_str = ','.join(['%s=%s' % (k, repr(udp_values[k]))
408                              for k, v in inspect.getmembers(p_udp)
409                              if k in udp_values])
410         udp_str = '%s(%s)' % (udp.udp.__name__, _udp_str)
411
412         pkt_str = '%s, %s, %s, %s' % (eth_str, ipv4_str, udp_str,
413                                       repr(protocols['payload']))
414
415         eq_(eth_str, str(p_eth))
416         eq_(eth_str, repr(p_eth))
417
418         eq_(ipv4_str, str(p_ipv4))
419         eq_(ipv4_str, repr(p_ipv4))
420
421         eq_(udp_str, str(p_udp))
422         eq_(udp_str, repr(p_udp))
423
424         eq_(pkt_str, str(pkt))
425         eq_(pkt_str, repr(pkt))
426
427     def test_ipv4_tcp(self):
428         # buid packet
429         e = ethernet.ethernet(self.dst_mac, self.src_mac,
430                               ether.ETH_TYPE_IP)
431         ip = ipv4.ipv4(4, 5, 0, 0, 0, 0, 0, 64, inet.IPPROTO_TCP, 0,
432                        self.src_ip, self.dst_ip)
433         t = tcp.tcp(0x190F, 0x1F90, 0x123, 1, 6, 0b101010, 2048, 0, 0x6f,
434                     b'\x01\x02')
435
436         p = packet.Packet()
437         p.add_protocol(e)
438         p.add_protocol(ip)
439         p.add_protocol(t)
440         p.add_protocol(self.payload)
441         p.serialize()
442
443         # ethernet !6s6sH
444         e_buf = self.dst_mac_bin \
445             + self.src_mac_bin \
446             + b'\x08\x00'
447
448         # ipv4 !BBHHHBBHII
449         ip_buf = b'\x45' \
450             + b'\x00' \
451             + b'\x00\x4C' \
452             + b'\x00\x00' \
453             + b'\x00\x00' \
454             + b'\x40' \
455             + b'\x06' \
456             + b'\x00\x00' \
457             + self.src_ip_bin \
458             + self.dst_ip_bin
459
460         # tcp !HHIIBBHHH + option
461         t_buf = b'\x19\x0F' \
462             + b'\x1F\x90' \
463             + b'\x00\x00\x01\x23' \
464             + b'\x00\x00\x00\x01' \
465             + b'\x60' \
466             + b'\x2A' \
467             + b'\x08\x00' \
468             + b'\x00\x00' \
469             + b'\x00\x6F' \
470             + b'\x01\x02\x00\x00'
471
472         buf = e_buf + ip_buf + t_buf + self.payload
473
474         # parse
475         pkt = packet.Packet(p.data)
476         protocols = self.get_protocols(pkt)
477         p_eth = protocols['ethernet']
478         p_ipv4 = protocols['ipv4']
479         p_tcp = protocols['tcp']
480
481         # ethernet
482         ok_(p_eth)
483         eq_(self.dst_mac, p_eth.dst)
484         eq_(self.src_mac, p_eth.src)
485         eq_(ether.ETH_TYPE_IP, p_eth.ethertype)
486
487         # ipv4
488         ok_(p_ipv4)
489         eq_(4, p_ipv4.version)
490         eq_(5, p_ipv4.header_length)
491         eq_(0, p_ipv4.tos)
492         l = len(ip_buf) + len(t_buf) + len(self.payload)
493         eq_(l, p_ipv4.total_length)
494         eq_(0, p_ipv4.identification)
495         eq_(0, p_ipv4.flags)
496         eq_(64, p_ipv4.ttl)
497         eq_(inet.IPPROTO_TCP, p_ipv4.proto)
498         eq_(self.src_ip, p_ipv4.src)
499         eq_(self.dst_ip, p_ipv4.dst)
500         t = bytearray(ip_buf)
501         struct.pack_into('!H', t, 10, p_ipv4.csum)
502         eq_(packet_utils.checksum(t), 0)
503
504         # tcp
505         ok_(p_tcp)
506         eq_(0x190f, p_tcp.src_port)
507         eq_(0x1F90, p_tcp.dst_port)
508         eq_(0x123, p_tcp.seq)
509         eq_(1, p_tcp.ack)
510         eq_(6, p_tcp.offset)
511         eq_(0b101010, p_tcp.bits)
512         eq_(2048, p_tcp.window_size)
513         eq_(0x6f, p_tcp.urgent)
514         eq_(len(t_buf), len(p_tcp))
515         t = bytearray(t_buf)
516         struct.pack_into('!H', t, 16, p_tcp.csum)
517         ph = struct.pack('!4s4sBBH', self.src_ip_bin, self.dst_ip_bin, 0,
518                          6, len(t_buf) + len(self.payload))
519         t = ph + t + self.payload
520         eq_(packet_utils.checksum(t), 0)
521
522         # payload
523         ok_('payload' in protocols)
524         eq_(self.payload, protocols['payload'])
525
526         # to string
527         eth_values = {'dst': self.dst_mac,
528                       'src': self.src_mac,
529                       'ethertype': ether.ETH_TYPE_IP}
530         _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k]))
531                              for k, v in inspect.getmembers(p_eth)
532                              if k in eth_values])
533         eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str)
534
535         ipv4_values = {'version': 4,
536                        'header_length': 5,
537                        'tos': 0,
538                        'total_length': l,
539                        'identification': 0,
540                        'flags': 0,
541                        'offset': p_ipv4.offset,
542                        'ttl': 64,
543                        'proto': inet.IPPROTO_TCP,
544                        'csum': p_ipv4.csum,
545                        'src': self.src_ip,
546                        'dst': self.dst_ip,
547                        'option': None}
548         _ipv4_str = ','.join(['%s=%s' % (k, repr(ipv4_values[k]))
549                               for k, v in inspect.getmembers(p_ipv4)
550                               if k in ipv4_values])
551         ipv4_str = '%s(%s)' % (ipv4.ipv4.__name__, _ipv4_str)
552
553         tcp_values = {'src_port': 0x190f,
554                       'dst_port': 0x1F90,
555                       'seq': 0x123,
556                       'ack': 1,
557                       'offset': 6,
558                       'bits': 0b101010,
559                       'window_size': 2048,
560                       'csum': p_tcp.csum,
561                       'urgent': 0x6f,
562                       'option': p_tcp.option}
563         _tcp_str = ','.join(['%s=%s' % (k, repr(tcp_values[k]))
564                              for k, v in inspect.getmembers(p_tcp)
565                              if k in tcp_values])
566         tcp_str = '%s(%s)' % (tcp.tcp.__name__, _tcp_str)
567
568         pkt_str = '%s, %s, %s, %s' % (eth_str, ipv4_str, tcp_str,
569                                       repr(protocols['payload']))
570
571         eq_(eth_str, str(p_eth))
572         eq_(eth_str, repr(p_eth))
573
574         eq_(ipv4_str, str(p_ipv4))
575         eq_(ipv4_str, repr(p_ipv4))
576
577         eq_(tcp_str, str(p_tcp))
578         eq_(tcp_str, repr(p_tcp))
579
580         eq_(pkt_str, str(pkt))
581         eq_(pkt_str, repr(pkt))
582
583     def test_ipv4_sctp(self):
584         # build packet
585         e = ethernet.ethernet()
586         ip = ipv4.ipv4(proto=inet.IPPROTO_SCTP)
587         s = sctp.sctp(chunks=[sctp.chunk_data(payload_data=self.payload)])
588
589         p = e / ip / s
590         p.serialize()
591
592         ipaddr = addrconv.ipv4.text_to_bin('0.0.0.0')
593
594         # ethernet !6s6sH
595         e_buf = b'\xff\xff\xff\xff\xff\xff' \
596             + b'\x00\x00\x00\x00\x00\x00' \
597             + b'\x08\x00'
598
599         # ipv4 !BBHHHBBHII
600         ip_buf = b'\x45' \
601             + b'\x00' \
602             + b'\x00\x50' \
603             + b'\x00\x00' \
604             + b'\x00\x00' \
605             + b'\xff' \
606             + b'\x84' \
607             + b'\x00\x00' \
608             + ipaddr \
609             + ipaddr
610
611         # sctp !HHII + chunk_data !BBHIHHI + payload
612         s_buf = b'\x00\x00' \
613             + b'\x00\x00' \
614             + b'\x00\x00\x00\x00' \
615             + b'\x00\x00\x00\x00' \
616             + b'\x00' \
617             + b'\x00' \
618             + b'\x00\x00' \
619             + b'\x00\x00\x00\x00' \
620             + b'\x00\x00' \
621             + b'\x00\x00' \
622             + b'\x00\x00\x00\x00' \
623             + self.payload
624
625         buf = e_buf + ip_buf + s_buf
626
627         # parse
628         pkt = packet.Packet(p.data)
629         protocols = self.get_protocols(pkt)
630         p_eth = protocols['ethernet']
631         p_ipv4 = protocols['ipv4']
632         p_sctp = protocols['sctp']
633
634         # ethernet
635         ok_(p_eth)
636         eq_('ff:ff:ff:ff:ff:ff', p_eth.dst)
637         eq_('00:00:00:00:00:00', p_eth.src)
638         eq_(ether.ETH_TYPE_IP, p_eth.ethertype)
639
640         # ipv4
641         ok_(p_ipv4)
642         eq_(4, p_ipv4.version)
643         eq_(5, p_ipv4.header_length)
644         eq_(0, p_ipv4.tos)
645         l = len(ip_buf) + len(s_buf)
646         eq_(l, p_ipv4.total_length)
647         eq_(0, p_ipv4.identification)
648         eq_(0, p_ipv4.flags)
649         eq_(255, p_ipv4.ttl)
650         eq_(inet.IPPROTO_SCTP, p_ipv4.proto)
651         eq_('10.0.0.1', p_ipv4.src)
652         eq_('10.0.0.2', p_ipv4.dst)
653         t = bytearray(ip_buf)
654         struct.pack_into('!H', t, 10, p_ipv4.csum)
655         eq_(packet_utils.checksum(t), 0x1403)
656
657         # sctp
658         ok_(p_sctp)
659         eq_(1, p_sctp.src_port)
660         eq_(1, p_sctp.dst_port)
661         eq_(0, p_sctp.vtag)
662         assert isinstance(p_sctp.chunks[0], sctp.chunk_data)
663         eq_(0, p_sctp.chunks[0]._type)
664         eq_(0, p_sctp.chunks[0].unordered)
665         eq_(0, p_sctp.chunks[0].begin)
666         eq_(0, p_sctp.chunks[0].end)
667         eq_(16 + len(self.payload), p_sctp.chunks[0].length)
668         eq_(0, p_sctp.chunks[0].tsn)
669         eq_(0, p_sctp.chunks[0].sid)
670         eq_(0, p_sctp.chunks[0].seq)
671         eq_(0, p_sctp.chunks[0].payload_id)
672         eq_(self.payload, p_sctp.chunks[0].payload_data)
673         eq_(len(s_buf), len(p_sctp))
674
675         # to string
676         eth_values = {'dst': 'ff:ff:ff:ff:ff:ff',
677                       'src': '00:00:00:00:00:00',
678                       'ethertype': ether.ETH_TYPE_IP}
679         _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k]))
680                              for k, v in inspect.getmembers(p_eth)
681                              if k in eth_values])
682         eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str)
683
684         ipv4_values = {'version': 4,
685                        'header_length': 5,
686                        'tos': 0,
687                        'total_length': l,
688                        'identification': 0,
689                        'flags': 0,
690                        'offset': 0,
691                        'ttl': 255,
692                        'proto': inet.IPPROTO_SCTP,
693                        'csum': p_ipv4.csum,
694                        'src': '10.0.0.1',
695                        'dst': '10.0.0.2',
696                        'option': None}
697         _ipv4_str = ','.join(['%s=%s' % (k, repr(ipv4_values[k]))
698                               for k, v in inspect.getmembers(p_ipv4)
699                               if k in ipv4_values])
700         ipv4_str = '%s(%s)' % (ipv4.ipv4.__name__, _ipv4_str)
701
702         data_values = {'unordered': 0,
703                        'begin': 0,
704                        'end': 0,
705                        'length': 16 + len(self.payload),
706                        'tsn': 0,
707                        'sid': 0,
708                        'seq': 0,
709                        'payload_id': 0,
710                        'payload_data': self.payload}
711         _data_str = ','.join(['%s=%s' % (k, repr(data_values[k]))
712                               for k in sorted(data_values.keys())])
713         data_str = '[%s(%s)]' % (sctp.chunk_data.__name__, _data_str)
714
715         sctp_values = {'src_port': 1,
716                        'dst_port': 1,
717                        'vtag': 0,
718                        'csum': repr(p_sctp.csum),
719                        'chunks': data_str}
720         _sctp_str = ','.join(['%s=%s' % (k, sctp_values[k])
721                               for k, _ in inspect.getmembers(p_sctp)
722                               if k in sctp_values])
723         sctp_str = '%s(%s)' % (sctp.sctp.__name__, _sctp_str)
724
725         pkt_str = '%s, %s, %s' % (eth_str, ipv4_str, sctp_str)
726
727         eq_(eth_str, str(p_eth))
728         eq_(eth_str, repr(p_eth))
729
730         eq_(ipv4_str, str(p_ipv4))
731         eq_(ipv4_str, repr(p_ipv4))
732
733         eq_(sctp_str, str(p_sctp))
734         eq_(sctp_str, repr(p_sctp))
735
736         eq_(pkt_str, str(pkt))
737         eq_(pkt_str, repr(pkt))
738
739     def test_ipv4_icmp(self):
740         # buid packet
741         e = ethernet.ethernet()
742         ip = ipv4.ipv4(proto=inet.IPPROTO_ICMP)
743         ic = icmp.icmp()
744
745         p = e / ip / ic
746         p.serialize()
747
748         ipaddr = addrconv.ipv4.text_to_bin('0.0.0.0')
749
750         # ethernet !6s6sH
751         e_buf = b'\xff\xff\xff\xff\xff\xff' \
752             + b'\x00\x00\x00\x00\x00\x00' \
753             + b'\x08\x00'
754
755         # ipv4 !BBHHHBBHII
756         ip_buf = b'\x45' \
757             + b'\x00' \
758             + b'\x00\x1c' \
759             + b'\x00\x00' \
760             + b'\x00\x00' \
761             + b'\xff' \
762             + b'\x01' \
763             + b'\x00\x00' \
764             + ipaddr \
765             + ipaddr
766
767         # icmp !BBH + echo !HH
768         ic_buf = b'\x08' \
769             + b'\x00' \
770             + b'\x00\x00' \
771             + b'\x00\x00' \
772             + b'\x00\x00'
773
774         buf = e_buf + ip_buf + ic_buf
775
776         # parse
777         pkt = packet.Packet(p.data)
778         protocols = self.get_protocols(pkt)
779         p_eth = protocols['ethernet']
780         p_ipv4 = protocols['ipv4']
781         p_icmp = protocols['icmp']
782
783         # ethernet
784         ok_(p_eth)
785         eq_('ff:ff:ff:ff:ff:ff', p_eth.dst)
786         eq_('00:00:00:00:00:00', p_eth.src)
787         eq_(ether.ETH_TYPE_IP, p_eth.ethertype)
788
789         # ipv4
790         ok_(p_ipv4)
791         eq_(4, p_ipv4.version)
792         eq_(5, p_ipv4.header_length)
793         eq_(0, p_ipv4.tos)
794         l = len(ip_buf) + len(ic_buf)
795         eq_(l, p_ipv4.total_length)
796         eq_(0, p_ipv4.identification)
797         eq_(0, p_ipv4.flags)
798         eq_(255, p_ipv4.ttl)
799         eq_(inet.IPPROTO_ICMP, p_ipv4.proto)
800         eq_('10.0.0.1', p_ipv4.src)
801         eq_('10.0.0.2', p_ipv4.dst)
802         t = bytearray(ip_buf)
803         struct.pack_into('!H', t, 10, p_ipv4.csum)
804         eq_(packet_utils.checksum(t), 0x1403)
805
806         # icmp
807         ok_(p_icmp)
808         eq_(8, p_icmp.type)
809         eq_(0, p_icmp.code)
810         eq_(0, p_icmp.data.id)
811         eq_(0, p_icmp.data.seq)
812         eq_(len(ic_buf), len(p_icmp))
813         t = bytearray(ic_buf)
814         struct.pack_into('!H', t, 2, p_icmp.csum)
815         eq_(packet_utils.checksum(t), 0)
816
817         # to string
818         eth_values = {'dst': 'ff:ff:ff:ff:ff:ff',
819                       'src': '00:00:00:00:00:00',
820                       'ethertype': ether.ETH_TYPE_IP}
821         _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k]))
822                              for k, _ in inspect.getmembers(p_eth)
823                              if k in eth_values])
824         eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str)
825
826         ipv4_values = {'version': 4,
827                        'header_length': 5,
828                        'tos': 0,
829                        'total_length': l,
830                        'identification': 0,
831                        'flags': 0,
832                        'offset': p_ipv4.offset,
833                        'ttl': 255,
834                        'proto': inet.IPPROTO_ICMP,
835                        'csum': p_ipv4.csum,
836                        'src': '10.0.0.1',
837                        'dst': '10.0.0.2',
838                        'option': None}
839         _ipv4_str = ','.join(['%s=%s' % (k, repr(ipv4_values[k]))
840                               for k, _ in inspect.getmembers(p_ipv4)
841                               if k in ipv4_values])
842         ipv4_str = '%s(%s)' % (ipv4.ipv4.__name__, _ipv4_str)
843
844         echo_values = {'id': 0,
845                        'seq': 0,
846                        'data': None}
847         _echo_str = ','.join(['%s=%s' % (k, repr(echo_values[k]))
848                               for k in sorted(echo_values.keys())])
849         echo_str = '%s(%s)' % (icmp.echo.__name__, _echo_str)
850         icmp_values = {'type': 8,
851                        'code': 0,
852                        'csum': p_icmp.csum,
853                        'data': echo_str}
854         _icmp_str = ','.join(['%s=%s' % (k, icmp_values[k])
855                               for k, _ in inspect.getmembers(p_icmp)
856                               if k in icmp_values])
857         icmp_str = '%s(%s)' % (icmp.icmp.__name__, _icmp_str)
858
859         pkt_str = '%s, %s, %s' % (eth_str, ipv4_str, icmp_str)
860
861         eq_(eth_str, str(p_eth))
862         eq_(eth_str, repr(p_eth))
863
864         eq_(ipv4_str, str(p_ipv4))
865         eq_(ipv4_str, repr(p_ipv4))
866
867         eq_(icmp_str, str(p_icmp))
868         eq_(icmp_str, repr(p_icmp))
869
870         eq_(pkt_str, str(pkt))
871         eq_(pkt_str, repr(pkt))
872
873     def test_ipv6_udp(self):
874         # build packet
875         e = ethernet.ethernet(ethertype=ether.ETH_TYPE_IPV6)
876         ip = ipv6.ipv6(nxt=inet.IPPROTO_UDP)
877         u = udp.udp()
878
879         p = e / ip / u / self.payload
880         p.serialize()
881
882         ipaddr = addrconv.ipv6.text_to_bin('::')
883
884         # ethernet !6s6sH
885         e_buf = b'\xff\xff\xff\xff\xff\xff' \
886             + b'\x00\x00\x00\x00\x00\x00' \
887             + b'\x86\xdd'
888
889         # ipv6 !IHBB16s16s'
890         ip_buf = b'\x60\x00\x00\x00' \
891             + b'\x00\x00' \
892             + b'\x11' \
893             + b'\xff' \
894             + b'\x00\x00' \
895             + ipaddr \
896             + ipaddr
897
898         # udp !HHHH
899         u_buf = b'\x00\x00' \
900             + b'\x00\x00' \
901             + b'\x00\x28' \
902             + b'\x00\x00'
903
904         buf = e_buf + ip_buf + u_buf + self.payload
905
906         # parse
907         pkt = packet.Packet(p.data)
908         protocols = self.get_protocols(pkt)
909         p_eth = protocols['ethernet']
910         p_ipv6 = protocols['ipv6']
911         p_udp = protocols['udp']
912
913         # ethernet
914         ok_(p_eth)
915         eq_('ff:ff:ff:ff:ff:ff', p_eth.dst)
916         eq_('00:00:00:00:00:00', p_eth.src)
917         eq_(ether.ETH_TYPE_IPV6, p_eth.ethertype)
918
919         # ipv6
920         ok_(p_ipv6)
921         eq_(6, p_ipv6.version)
922         eq_(0, p_ipv6.traffic_class)
923         eq_(0, p_ipv6.flow_label)
924         eq_(len(u_buf) + len(self.payload), p_ipv6.payload_length)
925         eq_(inet.IPPROTO_UDP, p_ipv6.nxt)
926         eq_(255, p_ipv6.hop_limit)
927         eq_('10::10', p_ipv6.src)
928         eq_('20::20', p_ipv6.dst)
929
930         # udp
931         ok_(p_udp)
932         eq_(1, p_udp.src_port)
933         eq_(1, p_udp.dst_port)
934         eq_(len(u_buf) + len(self.payload), p_udp.total_length)
935         eq_(0x2B60, p_udp.csum)
936         t = bytearray(u_buf)
937         struct.pack_into('!H', t, 6, p_udp.csum)
938         ph = struct.pack('!16s16sI3xB', ipaddr, ipaddr,
939                          len(u_buf) + len(self.payload), 17)
940         t = ph + t + self.payload
941         eq_(packet_utils.checksum(t), 0x62)
942
943         # payload
944         ok_('payload' in protocols)
945         eq_(self.payload, protocols['payload'])
946
947         # to string
948         eth_values = {'dst': 'ff:ff:ff:ff:ff:ff',
949                       'src': '00:00:00:00:00:00',
950                       'ethertype': ether.ETH_TYPE_IPV6}
951         _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k]))
952                              for k, v in inspect.getmembers(p_eth)
953                              if k in eth_values])
954         eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str)
955
956         ipv6_values = {'version': 6,
957                        'traffic_class': 0,
958                        'flow_label': 0,
959                        'payload_length': len(u_buf) + len(self.payload),
960                        'nxt': inet.IPPROTO_UDP,
961                        'hop_limit': 255,
962                        'src': '10::10',
963                        'dst': '20::20',
964                        'ext_hdrs': []}
965         _ipv6_str = ','.join(['%s=%s' % (k, repr(ipv6_values[k]))
966                               for k, v in inspect.getmembers(p_ipv6)
967                               if k in ipv6_values])
968         ipv6_str = '%s(%s)' % (ipv6.ipv6.__name__, _ipv6_str)
969
970         udp_values = {'src_port': 1,
971                       'dst_port': 1,
972                       'total_length': len(u_buf) + len(self.payload),
973                       'csum': 0x2B60}
974         _udp_str = ','.join(['%s=%s' % (k, repr(udp_values[k]))
975                              for k, v in inspect.getmembers(p_udp)
976                              if k in udp_values])
977         udp_str = '%s(%s)' % (udp.udp.__name__, _udp_str)
978
979         pkt_str = '%s, %s, %s, %s' % (eth_str, ipv6_str, udp_str,
980                                       repr(protocols['payload']))
981
982         eq_(eth_str, str(p_eth))
983         eq_(eth_str, repr(p_eth))
984
985         eq_(ipv6_str, str(p_ipv6))
986         eq_(ipv6_str, repr(p_ipv6))
987
988         eq_(udp_str, str(p_udp))
989         eq_(udp_str, repr(p_udp))
990
991         eq_(pkt_str, str(pkt))
992         eq_(pkt_str, repr(pkt))
993
994     def test_ipv6_tcp(self):
995         # build packet
996         e = ethernet.ethernet(ethertype=ether.ETH_TYPE_IPV6)
997         ip = ipv6.ipv6()
998         t = tcp.tcp(option=b'\x01\x02')
999
1000         p = e / ip / t / self.payload
1001         p.serialize()
1002
1003         ipaddr = addrconv.ipv6.text_to_bin('::')
1004
1005         # ethernet !6s6sH
1006         e_buf = b'\xff\xff\xff\xff\xff\xff' \
1007             + b'\x00\x00\x00\x00\x00\x00' \
1008             + b'\x86\xdd'
1009
1010         # ipv6 !IHBB16s16s'
1011         ip_buf = b'\x60\x00\x00\x00' \
1012             + b'\x00\x00' \
1013             + b'\x06' \
1014             + b'\xff' \
1015             + b'\x00\x00' \
1016             + ipaddr \
1017             + ipaddr
1018
1019         # tcp !HHIIBBHHH + option
1020         t_buf = b'\x00\x00' \
1021             + b'\x00\x00' \
1022             + b'\x00\x00\x00\x00' \
1023             + b'\x00\x00\x00\x00' \
1024             + b'\x60' \
1025             + b'\x00' \
1026             + b'\x00\x00' \
1027             + b'\x00\x00' \
1028             + b'\x00\x00' \
1029             + b'\x01\x02\x00\x00'
1030
1031         buf = e_buf + ip_buf + t_buf + self.payload
1032
1033         # parse
1034         pkt = packet.Packet(p.data)
1035         protocols = self.get_protocols(pkt)
1036         p_eth = protocols['ethernet']
1037         p_ipv6 = protocols['ipv6']
1038         p_tcp = protocols['tcp']
1039
1040         # ethernet
1041         ok_(p_eth)
1042         eq_('ff:ff:ff:ff:ff:ff', p_eth.dst)
1043         eq_('00:00:00:00:00:00', p_eth.src)
1044         eq_(ether.ETH_TYPE_IPV6, p_eth.ethertype)
1045
1046         # ipv6
1047         ok_(p_ipv6)
1048         eq_(6, p_ipv6.version)
1049         eq_(0, p_ipv6.traffic_class)
1050         eq_(0, p_ipv6.flow_label)
1051         eq_(len(t_buf) + len(self.payload), p_ipv6.payload_length)
1052         eq_(inet.IPPROTO_TCP, p_ipv6.nxt)
1053         eq_(255, p_ipv6.hop_limit)
1054         eq_('10::10', p_ipv6.src)
1055         eq_('20::20', p_ipv6.dst)
1056
1057         # tcp
1058         ok_(p_tcp)
1059         eq_(1, p_tcp.src_port)
1060         eq_(1, p_tcp.dst_port)
1061         eq_(0, p_tcp.seq)
1062         eq_(0, p_tcp.ack)
1063         eq_(6, p_tcp.offset)
1064         eq_(0, p_tcp.bits)
1065         eq_(0, p_tcp.window_size)
1066         eq_(0, p_tcp.urgent)
1067         eq_(len(t_buf), len(p_tcp))
1068         t = bytearray(t_buf)
1069         struct.pack_into('!H', t, 16, p_tcp.csum)
1070         ph = struct.pack('!16s16sI3xB', ipaddr, ipaddr,
1071                          len(t_buf) + len(self.payload), 6)
1072         t = ph + t + self.payload
1073         eq_(packet_utils.checksum(t), 0x62)
1074
1075         # payload
1076         ok_('payload' in protocols)
1077         eq_(self.payload, protocols['payload'])
1078
1079         # to string
1080         eth_values = {'dst': 'ff:ff:ff:ff:ff:ff',
1081                       'src': '00:00:00:00:00:00',
1082                       'ethertype': ether.ETH_TYPE_IPV6}
1083         _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k]))
1084                              for k, v in inspect.getmembers(p_eth)
1085                              if k in eth_values])
1086         eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str)
1087
1088         ipv6_values = {'version': 6,
1089                        'traffic_class': 0,
1090                        'flow_label': 0,
1091                        'payload_length': len(t_buf) + len(self.payload),
1092                        'nxt': inet.IPPROTO_TCP,
1093                        'hop_limit': 255,
1094                        'src': '10::10',
1095                        'dst': '20::20',
1096                        'ext_hdrs': []}
1097         _ipv6_str = ','.join(['%s=%s' % (k, repr(ipv6_values[k]))
1098                               for k, v in inspect.getmembers(p_ipv6)
1099                               if k in ipv6_values])
1100         ipv6_str = '%s(%s)' % (ipv6.ipv6.__name__, _ipv6_str)
1101
1102         tcp_values = {'src_port': 1,
1103                       'dst_port': 1,
1104                       'seq': 0,
1105                       'ack': 0,
1106                       'offset': 6,
1107                       'bits': 0,
1108                       'window_size': 0,
1109                       'csum': p_tcp.csum,
1110                       'urgent': 0,
1111                       'option': p_tcp.option}
1112         _tcp_str = ','.join(['%s=%s' % (k, repr(tcp_values[k]))
1113                              for k, v in inspect.getmembers(p_tcp)
1114                              if k in tcp_values])
1115         tcp_str = '%s(%s)' % (tcp.tcp.__name__, _tcp_str)
1116
1117         pkt_str = '%s, %s, %s, %s' % (eth_str, ipv6_str, tcp_str,
1118                                       repr(protocols['payload']))
1119
1120         eq_(eth_str, str(p_eth))
1121         eq_(eth_str, repr(p_eth))
1122
1123         eq_(ipv6_str, str(p_ipv6))
1124         eq_(ipv6_str, repr(p_ipv6))
1125
1126         eq_(tcp_str, str(p_tcp))
1127         eq_(tcp_str, repr(p_tcp))
1128
1129         eq_(pkt_str, str(pkt))
1130         eq_(pkt_str, repr(pkt))
1131
1132     def test_ipv6_sctp(self):
1133         # build packet
1134         e = ethernet.ethernet(ethertype=ether.ETH_TYPE_IPV6)
1135         ip = ipv6.ipv6(nxt=inet.IPPROTO_SCTP)
1136         s = sctp.sctp(chunks=[sctp.chunk_data(payload_data=self.payload)])
1137
1138         p = e / ip / s
1139         p.serialize()
1140
1141         ipaddr = addrconv.ipv6.text_to_bin('::')
1142
1143         # ethernet !6s6sH
1144         e_buf = b'\xff\xff\xff\xff\xff\xff' \
1145             + b'\x00\x00\x00\x00\x00\x00' \
1146             + b'\x86\xdd'
1147
1148         # ipv6 !IHBB16s16s'
1149         ip_buf = b'\x60\x00\x00\x00' \
1150             + b'\x00\x00' \
1151             + b'\x84' \
1152             + b'\xff' \
1153             + b'\x00\x00' \
1154             + ipaddr \
1155             + ipaddr
1156
1157         # sctp !HHII + chunk_data !BBHIHHI + payload
1158         s_buf = b'\x00\x00' \
1159             + b'\x00\x00' \
1160             + b'\x00\x00\x00\x00' \
1161             + b'\x00\x00\x00\x00' \
1162             + b'\x00' \
1163             + b'\x00' \
1164             + b'\x00\x00' \
1165             + b'\x00\x00\x00\x00' \
1166             + b'\x00\x00' \
1167             + b'\x00\x00' \
1168             + b'\x00\x00\x00\x00' \
1169             + self.payload
1170
1171         buf = e_buf + ip_buf + s_buf
1172
1173         # parse
1174         pkt = packet.Packet(p.data)
1175         protocols = self.get_protocols(pkt)
1176         p_eth = protocols['ethernet']
1177         p_ipv6 = protocols['ipv6']
1178         p_sctp = protocols['sctp']
1179
1180         # ethernet
1181         ok_(p_eth)
1182         eq_('ff:ff:ff:ff:ff:ff', p_eth.dst)
1183         eq_('00:00:00:00:00:00', p_eth.src)
1184         eq_(ether.ETH_TYPE_IPV6, p_eth.ethertype)
1185
1186         # ipv6
1187         ok_(p_ipv6)
1188         eq_(6, p_ipv6.version)
1189         eq_(0, p_ipv6.traffic_class)
1190         eq_(0, p_ipv6.flow_label)
1191         eq_(len(s_buf), p_ipv6.payload_length)
1192         eq_(inet.IPPROTO_SCTP, p_ipv6.nxt)
1193         eq_(255, p_ipv6.hop_limit)
1194         eq_('10::10', p_ipv6.src)
1195         eq_('20::20', p_ipv6.dst)
1196
1197         # sctp
1198         ok_(p_sctp)
1199         eq_(1, p_sctp.src_port)
1200         eq_(1, p_sctp.dst_port)
1201         eq_(0, p_sctp.vtag)
1202         assert isinstance(p_sctp.chunks[0], sctp.chunk_data)
1203         eq_(0, p_sctp.chunks[0]._type)
1204         eq_(0, p_sctp.chunks[0].unordered)
1205         eq_(0, p_sctp.chunks[0].begin)
1206         eq_(0, p_sctp.chunks[0].end)
1207         eq_(16 + len(self.payload), p_sctp.chunks[0].length)
1208         eq_(0, p_sctp.chunks[0].tsn)
1209         eq_(0, p_sctp.chunks[0].sid)
1210         eq_(0, p_sctp.chunks[0].seq)
1211         eq_(0, p_sctp.chunks[0].payload_id)
1212         eq_(self.payload, p_sctp.chunks[0].payload_data)
1213         eq_(len(s_buf), len(p_sctp))
1214
1215         # to string
1216         eth_values = {'dst': 'ff:ff:ff:ff:ff:ff',
1217                       'src': '00:00:00:00:00:00',
1218                       'ethertype': ether.ETH_TYPE_IPV6}
1219         _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k]))
1220                              for k, v in inspect.getmembers(p_eth)
1221                              if k in eth_values])
1222         eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str)
1223
1224         ipv6_values = {'version': 6,
1225                        'traffic_class': 0,
1226                        'flow_label': 0,
1227                        'payload_length': len(s_buf),
1228                        'nxt': inet.IPPROTO_SCTP,
1229                        'hop_limit': 255,
1230                        'src': '10::10',
1231                        'dst': '20::20',
1232                        'ext_hdrs': []}
1233         _ipv6_str = ','.join(['%s=%s' % (k, repr(ipv6_values[k]))
1234                               for k, v in inspect.getmembers(p_ipv6)
1235                               if k in ipv6_values])
1236         ipv6_str = '%s(%s)' % (ipv6.ipv6.__name__, _ipv6_str)
1237
1238         data_values = {'unordered': 0,
1239                        'begin': 0,
1240                        'end': 0,
1241                        'length': 16 + len(self.payload),
1242                        'tsn': 0,
1243                        'sid': 0,
1244                        'seq': 0,
1245                        'payload_id': 0,
1246                        'payload_data': self.payload}
1247         _data_str = ','.join(['%s=%s' % (k, repr(data_values[k]))
1248                               for k in sorted(data_values.keys())])
1249         data_str = '[%s(%s)]' % (sctp.chunk_data.__name__, _data_str)
1250
1251         sctp_values = {'src_port': 1,
1252                        'dst_port': 1,
1253                        'vtag': 0,
1254                        'csum': repr(p_sctp.csum),
1255                        'chunks': data_str}
1256         _sctp_str = ','.join(['%s=%s' % (k, sctp_values[k])
1257                               for k, _ in inspect.getmembers(p_sctp)
1258                               if k in sctp_values])
1259         sctp_str = '%s(%s)' % (sctp.sctp.__name__, _sctp_str)
1260
1261         pkt_str = '%s, %s, %s' % (eth_str, ipv6_str, sctp_str)
1262
1263         eq_(eth_str, str(p_eth))
1264         eq_(eth_str, repr(p_eth))
1265
1266         eq_(ipv6_str, str(p_ipv6))
1267         eq_(ipv6_str, repr(p_ipv6))
1268
1269         eq_(sctp_str, str(p_sctp))
1270         eq_(sctp_str, repr(p_sctp))
1271
1272         eq_(pkt_str, str(pkt))
1273         eq_(pkt_str, repr(pkt))
1274
1275     def test_ipv6_icmpv6(self):
1276         # build packet
1277         e = ethernet.ethernet(ethertype=ether.ETH_TYPE_IPV6)
1278         ip = ipv6.ipv6(nxt=inet.IPPROTO_ICMPV6)
1279         ic = icmpv6.icmpv6()
1280
1281         p = e / ip / ic
1282         p.serialize()
1283
1284         ipaddr = addrconv.ipv6.text_to_bin('::')
1285
1286         # ethernet !6s6sH
1287         e_buf = b'\xff\xff\xff\xff\xff\xff' \
1288             + b'\x00\x00\x00\x00\x00\x00' \
1289             + b'\x86\xdd'
1290
1291         # ipv6 !IHBB16s16s'
1292         ip_buf = b'\x60\x00\x00\x00' \
1293             + b'\x00\x00' \
1294             + b'\x3a' \
1295             + b'\xff' \
1296             + b'\x00\x00' \
1297             + ipaddr \
1298             + ipaddr
1299
1300         # icmpv6 !BBH
1301         ic_buf = b'\x00' \
1302             + b'\x00' \
1303             + b'\x00\x00'
1304
1305         buf = e_buf + ip_buf + ic_buf
1306
1307         # parse
1308         pkt = packet.Packet(p.data)
1309         protocols = self.get_protocols(pkt)
1310         p_eth = protocols['ethernet']
1311         p_ipv6 = protocols['ipv6']
1312         p_icmpv6 = protocols['icmpv6']
1313
1314         # ethernet
1315         ok_(p_eth)
1316         eq_('ff:ff:ff:ff:ff:ff', p_eth.dst)
1317         eq_('00:00:00:00:00:00', p_eth.src)
1318         eq_(ether.ETH_TYPE_IPV6, p_eth.ethertype)
1319
1320         # ipv6
1321         ok_(p_ipv6)
1322         eq_(6, p_ipv6.version)
1323         eq_(0, p_ipv6.traffic_class)
1324         eq_(0, p_ipv6.flow_label)
1325         eq_(len(ic_buf), p_ipv6.payload_length)
1326         eq_(inet.IPPROTO_ICMPV6, p_ipv6.nxt)
1327         eq_(255, p_ipv6.hop_limit)
1328         eq_('10::10', p_ipv6.src)
1329         eq_('20::20', p_ipv6.dst)
1330
1331         # icmpv6
1332         ok_(p_icmpv6)
1333         eq_(0, p_icmpv6.type_)
1334         eq_(0, p_icmpv6.code)
1335         eq_(len(ic_buf), len(p_icmpv6))
1336         t = bytearray(ic_buf)
1337         struct.pack_into('!H', t, 2, p_icmpv6.csum)
1338         ph = struct.pack('!16s16sI3xB', ipaddr, ipaddr, len(ic_buf), 58)
1339         t = ph + t
1340         eq_(packet_utils.checksum(t), 0x60)
1341
1342         # to string
1343         eth_values = {'dst': 'ff:ff:ff:ff:ff:ff',
1344                       'src': '00:00:00:00:00:00',
1345                       'ethertype': ether.ETH_TYPE_IPV6}
1346         _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k]))
1347                              for k, _ in inspect.getmembers(p_eth)
1348                              if k in eth_values])
1349         eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str)
1350
1351         ipv6_values = {'version': 6,
1352                        'traffic_class': 0,
1353                        'flow_label': 0,
1354                        'payload_length': len(ic_buf),
1355                        'nxt': inet.IPPROTO_ICMPV6,
1356                        'hop_limit': 255,
1357                        'src': '10::10',
1358                        'dst': '20::20',
1359                        'ext_hdrs': []}
1360         _ipv6_str = ','.join(['%s=%s' % (k, repr(ipv6_values[k]))
1361                               for k, _ in inspect.getmembers(p_ipv6)
1362                               if k in ipv6_values])
1363         ipv6_str = '%s(%s)' % (ipv6.ipv6.__name__, _ipv6_str)
1364
1365         icmpv6_values = {'type_': 0,
1366                          'code': 0,
1367                          'csum': p_icmpv6.csum,
1368                          'data': b''}
1369         _icmpv6_str = ','.join(['%s=%s' % (k, repr(icmpv6_values[k]))
1370                                 for k, _ in inspect.getmembers(p_icmpv6)
1371                                 if k in icmpv6_values])
1372         icmpv6_str = '%s(%s)' % (icmpv6.icmpv6.__name__, _icmpv6_str)
1373
1374         pkt_str = '%s, %s, %s' % (eth_str, ipv6_str, icmpv6_str)
1375
1376         eq_(eth_str, str(p_eth))
1377         eq_(eth_str, repr(p_eth))
1378
1379         eq_(ipv6_str, str(p_ipv6))
1380         eq_(ipv6_str, repr(p_ipv6))
1381
1382         eq_(icmpv6_str, str(p_icmpv6))
1383         eq_(icmpv6_str, repr(p_icmpv6))
1384
1385         eq_(pkt_str, str(pkt))
1386         eq_(pkt_str, repr(pkt))
1387
1388     def test_llc_bpdu(self):
1389         # buid packet
1390         e = ethernet.ethernet(self.dst_mac, self.src_mac,
1391                               ether.ETH_TYPE_IEEE802_3)
1392         llc_control = llc.ControlFormatU(0, 0, 0)
1393         l = llc.llc(llc.SAP_BPDU, llc.SAP_BPDU, llc_control)
1394         b = bpdu.ConfigurationBPDUs(flags=0,
1395                                     root_priority=32768,
1396                                     root_system_id_extension=0,
1397                                     root_mac_address=self.src_mac,
1398                                     root_path_cost=0,
1399                                     bridge_priority=32768,
1400                                     bridge_system_id_extension=0,
1401                                     bridge_mac_address=self.dst_mac,
1402                                     port_priority=128,
1403                                     port_number=4,
1404                                     message_age=1,
1405                                     max_age=20,
1406                                     hello_time=2,
1407                                     forward_delay=15)
1408
1409         p = packet.Packet()
1410         p.add_protocol(e)
1411         p.add_protocol(l)
1412         p.add_protocol(b)
1413         p.serialize()
1414
1415         # ethernet !6s6sH
1416         e_buf = self.dst_mac_bin + self.src_mac_bin + b'\x05\xdc'
1417
1418         # llc !BBB
1419         l_buf = (b'\x42'
1420                  b'\x42'
1421                  b'\x03')
1422
1423         # bpdu !HBBBQIQHHHHH
1424         b_buf = (b'\x00\x00'
1425                  b'\x00'
1426                  b'\x00'
1427                  b'\x00'
1428                  b'\x80\x00\xbb\xbb\xbb\xbb\xbb\xbb'
1429                  b'\x00\x00\x00\x00'
1430                  b'\x80\x00\xaa\xaa\xaa\xaa\xaa\xaa'
1431                  b'\x80\x04'
1432                  b'\x01\x00'
1433                  b'\x14\x00'
1434                  b'\x02\x00'
1435                  b'\x0f\x00')
1436
1437         buf = e_buf + l_buf + b_buf
1438
1439         # Append padding if ethernet frame is less than 60 bytes length
1440         pad_len = 60 - len(buf)
1441         if pad_len > 0:
1442             buf += b'\x00' * pad_len
1443         eq_(buf, p.data)
1444
1445         # parse
1446         pkt = packet.Packet(p.data)
1447         protocols = self.get_protocols(pkt)
1448         p_eth = protocols['ethernet']
1449         p_llc = protocols['llc']
1450         p_bpdu = protocols['ConfigurationBPDUs']
1451
1452         # ethernet
1453         ok_(p_eth)
1454         eq_(self.dst_mac, p_eth.dst)
1455         eq_(self.src_mac, p_eth.src)
1456         eq_(ether.ETH_TYPE_IEEE802_3, p_eth.ethertype)
1457
1458         # llc
1459         ok_(p_llc)
1460         eq_(llc.SAP_BPDU, p_llc.dsap_addr)
1461         eq_(llc.SAP_BPDU, p_llc.ssap_addr)
1462         eq_(0, p_llc.control.modifier_function1)
1463         eq_(0, p_llc.control.pf_bit)
1464         eq_(0, p_llc.control.modifier_function2)
1465
1466         # bpdu
1467         ok_(p_bpdu)
1468         eq_(bpdu.PROTOCOL_IDENTIFIER, p_bpdu._protocol_id)
1469         eq_(bpdu.PROTOCOLVERSION_ID_BPDU, p_bpdu._version_id)
1470         eq_(bpdu.TYPE_CONFIG_BPDU, p_bpdu._bpdu_type)
1471         eq_(0, p_bpdu.flags)
1472         eq_(32768, p_bpdu.root_priority)
1473         eq_(0, p_bpdu.root_system_id_extension)
1474         eq_(self.src_mac, p_bpdu.root_mac_address)
1475         eq_(0, p_bpdu.root_path_cost)
1476         eq_(32768, p_bpdu.bridge_priority)
1477         eq_(0, p_bpdu.bridge_system_id_extension)
1478         eq_(self.dst_mac, p_bpdu.bridge_mac_address)
1479         eq_(128, p_bpdu.port_priority)
1480         eq_(4, p_bpdu.port_number)
1481         eq_(1, p_bpdu.message_age)
1482         eq_(20, p_bpdu.max_age)
1483         eq_(2, p_bpdu.hello_time)
1484         eq_(15, p_bpdu.forward_delay)
1485
1486         # to string
1487         eth_values = {'dst': self.dst_mac,
1488                       'src': self.src_mac,
1489                       'ethertype': ether.ETH_TYPE_IEEE802_3}
1490         _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k]))
1491                              for k, v in inspect.getmembers(p_eth)
1492                              if k in eth_values])
1493         eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str)
1494
1495         ctrl_values = {'modifier_function1': 0,
1496                        'pf_bit': 0,
1497                        'modifier_function2': 0}
1498         _ctrl_str = ','.join(['%s=%s' % (k, repr(ctrl_values[k]))
1499                               for k, v in inspect.getmembers(p_llc.control)
1500                               if k in ctrl_values])
1501         ctrl_str = '%s(%s)' % (llc.ControlFormatU.__name__, _ctrl_str)
1502
1503         llc_values = {'dsap_addr': repr(llc.SAP_BPDU),
1504                       'ssap_addr': repr(llc.SAP_BPDU),
1505                       'control': ctrl_str}
1506         _llc_str = ','.join(['%s=%s' % (k, llc_values[k])
1507                              for k, v in inspect.getmembers(p_llc)
1508                              if k in llc_values])
1509         llc_str = '%s(%s)' % (llc.llc.__name__, _llc_str)
1510
1511         _long = int if six.PY3 else long
1512         bpdu_values = {'flags': 0,
1513                        'root_priority': _long(32768),
1514                        'root_system_id_extension': _long(0),
1515                        'root_mac_address': self.src_mac,
1516                        'root_path_cost': 0,
1517                        'bridge_priority': _long(32768),
1518                        'bridge_system_id_extension': _long(0),
1519                        'bridge_mac_address': self.dst_mac,
1520                        'port_priority': 128,
1521                        'port_number': 4,
1522                        'message_age': float(1),
1523                        'max_age': float(20),
1524                        'hello_time': float(2),
1525                        'forward_delay': float(15)}
1526         _bpdu_str = ','.join(['%s=%s' % (k, repr(bpdu_values[k]))
1527                               for k, v in inspect.getmembers(p_bpdu)
1528                               if k in bpdu_values])
1529         bpdu_str = '%s(%s)' % (bpdu.ConfigurationBPDUs.__name__, _bpdu_str)
1530
1531         pkt_str = '%s, %s, %s' % (eth_str, llc_str, bpdu_str)
1532
1533         eq_(eth_str, str(p_eth))
1534         eq_(eth_str, repr(p_eth))
1535
1536         eq_(llc_str, str(p_llc))
1537         eq_(llc_str, repr(p_llc))
1538
1539         eq_(bpdu_str, str(p_bpdu))
1540         eq_(bpdu_str, repr(p_bpdu))
1541
1542         eq_(pkt_str, str(pkt))
1543         eq_(pkt_str, repr(pkt))
1544
1545     def test_div_api(self):
1546         e = ethernet.ethernet(self.dst_mac, self.src_mac, ether.ETH_TYPE_IP)
1547         i = ipv4.ipv4()
1548         u = udp.udp(self.src_port, self.dst_port)
1549         pkt = e / i / u
1550         ok_(isinstance(pkt, packet.Packet))
1551         ok_(isinstance(pkt.protocols[0], ethernet.ethernet))
1552         ok_(isinstance(pkt.protocols[1], ipv4.ipv4))
1553         ok_(isinstance(pkt.protocols[2], udp.udp))