1 # Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 # vim: tabstop=4 shiftwidth=4 softtabstop=4
22 from nose.tools import ok_, eq_
24 from ryu.ofproto import ether, inet
25 from ryu.lib.packet import arp
26 from ryu.lib.packet import bpdu
27 from ryu.lib.packet import ethernet
28 from ryu.lib.packet import icmp, icmpv6
29 from ryu.lib.packet import ipv4, ipv6
30 from ryu.lib.packet import llc
31 from ryu.lib.packet import packet, packet_utils
32 from ryu.lib.packet import sctp
33 from ryu.lib.packet import tcp, udp
34 from ryu.lib.packet import vlan
35 from ryu.lib import addrconv
38 LOG = logging.getLogger('test_packet')
41 class TestPacket(unittest.TestCase):
42 """ Test case for packet
45 dst_mac = 'aa:aa:aa:aa:aa:aa'
46 src_mac = 'bb:bb:bb:bb:bb:bb'
47 dst_mac_bin = addrconv.mac.text_to_bin(dst_mac)
48 src_mac_bin = addrconv.mac.text_to_bin(src_mac)
49 dst_ip = '192.168.128.10'
50 src_ip = '192.168.122.20'
51 dst_ip_bin = addrconv.ipv4.text_to_bin(dst_ip)
54 src_ip_bin = addrconv.ipv4.text_to_bin(src_ip)
55 payload = b'\x06\x06\x47\x50\x00\x00\x00\x00' \
56 + b'\xcd\xc5\x00\x00\x00\x00\x00\x00' \
57 + b'\x10\x11\x12\x13\x14\x15\x16\x17' \
58 + b'\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f'
60 def get_protocols(self, pkt):
63 if hasattr(p, 'protocol_name'):
64 protocols[p.protocol_name] = p
66 protocols['payload'] = p
77 e = ethernet.ethernet(self.dst_mac, self.src_mac,
79 a = arp.arp(1, ether.ETH_TYPE_IP, 6, 4, 2,
80 self.src_mac, self.src_ip, self.dst_mac,
88 e_buf = self.dst_mac_bin \
105 # Append padding if ethernet frame is less than 60 bytes length
106 pad_len = 60 - len(buf)
108 buf += b'\x00' * pad_len
112 pkt = packet.Packet(p.data)
113 protocols = self.get_protocols(pkt)
114 p_eth = protocols['ethernet']
115 p_arp = protocols['arp']
119 eq_(self.dst_mac, p_eth.dst)
120 eq_(self.src_mac, p_eth.src)
121 eq_(ether.ETH_TYPE_ARP, p_eth.ethertype)
126 eq_(ether.ETH_TYPE_IP, p_arp.proto)
130 eq_(self.src_mac, p_arp.src_mac)
131 eq_(self.src_ip, p_arp.src_ip)
132 eq_(self.dst_mac, p_arp.dst_mac)
133 eq_(self.dst_ip, p_arp.dst_ip)
136 eth_values = {'dst': self.dst_mac,
138 'ethertype': ether.ETH_TYPE_ARP}
139 _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k]))
140 for k, v in inspect.getmembers(p_eth)
142 eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str)
144 arp_values = {'hwtype': 1,
145 'proto': ether.ETH_TYPE_IP,
149 'src_mac': self.src_mac,
150 'dst_mac': self.dst_mac,
151 'src_ip': self.src_ip,
152 'dst_ip': self.dst_ip}
153 _arp_str = ','.join(['%s=%s' % (k, repr(arp_values[k]))
154 for k, v in inspect.getmembers(p_arp)
156 arp_str = '%s(%s)' % (arp.arp.__name__, _arp_str)
158 pkt_str = '%s, %s' % (eth_str, arp_str)
160 eq_(eth_str, str(p_eth))
161 eq_(eth_str, repr(p_eth))
163 eq_(arp_str, str(p_arp))
164 eq_(arp_str, repr(p_arp))
166 eq_(pkt_str, str(pkt))
167 eq_(pkt_str, repr(pkt))
169 def test_vlan_arp(self):
171 e = ethernet.ethernet(self.dst_mac, self.src_mac,
172 ether.ETH_TYPE_8021Q)
173 v = vlan.vlan(0b111, 0b1, 3, ether.ETH_TYPE_ARP)
174 a = arp.arp(1, ether.ETH_TYPE_IP, 6, 4, 2,
175 self.src_mac, self.src_ip, self.dst_mac,
184 e_buf = self.dst_mac_bin \
189 v_buf = b'\xF0\x03' \
193 a_buf = b'\x00\x01' \
203 buf = e_buf + v_buf + a_buf
205 # Append padding if ethernet frame is less than 60 bytes length
206 pad_len = 60 - len(buf)
208 buf += b'\x00' * pad_len
212 pkt = packet.Packet(p.data)
213 protocols = self.get_protocols(pkt)
214 p_eth = protocols['ethernet']
215 p_vlan = protocols['vlan']
216 p_arp = protocols['arp']
220 eq_(self.dst_mac, p_eth.dst)
221 eq_(self.src_mac, p_eth.src)
222 eq_(ether.ETH_TYPE_8021Q, p_eth.ethertype)
226 eq_(0b111, p_vlan.pcp)
229 eq_(ether.ETH_TYPE_ARP, p_vlan.ethertype)
234 eq_(ether.ETH_TYPE_IP, p_arp.proto)
238 eq_(self.src_mac, p_arp.src_mac)
239 eq_(self.src_ip, p_arp.src_ip)
240 eq_(self.dst_mac, p_arp.dst_mac)
241 eq_(self.dst_ip, p_arp.dst_ip)
244 eth_values = {'dst': self.dst_mac,
246 'ethertype': ether.ETH_TYPE_8021Q}
247 _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k]))
248 for k, v in inspect.getmembers(p_eth)
250 eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str)
252 vlan_values = {'pcp': 0b111,
255 'ethertype': ether.ETH_TYPE_ARP}
256 _vlan_str = ','.join(['%s=%s' % (k, repr(vlan_values[k]))
257 for k, v in inspect.getmembers(p_vlan)
258 if k in vlan_values])
259 vlan_str = '%s(%s)' % (vlan.vlan.__name__, _vlan_str)
261 arp_values = {'hwtype': 1,
262 'proto': ether.ETH_TYPE_IP,
266 'src_mac': self.src_mac,
267 'dst_mac': self.dst_mac,
268 'src_ip': self.src_ip,
269 'dst_ip': self.dst_ip}
270 _arp_str = ','.join(['%s=%s' % (k, repr(arp_values[k]))
271 for k, v in inspect.getmembers(p_arp)
273 arp_str = '%s(%s)' % (arp.arp.__name__, _arp_str)
275 pkt_str = '%s, %s, %s' % (eth_str, vlan_str, arp_str)
277 eq_(eth_str, str(p_eth))
278 eq_(eth_str, repr(p_eth))
280 eq_(vlan_str, str(p_vlan))
281 eq_(vlan_str, repr(p_vlan))
283 eq_(arp_str, str(p_arp))
284 eq_(arp_str, repr(p_arp))
286 eq_(pkt_str, str(pkt))
287 eq_(pkt_str, repr(pkt))
289 def test_ipv4_udp(self):
291 e = ethernet.ethernet(self.dst_mac, self.src_mac,
293 ip = ipv4.ipv4(4, 5, 1, 0, 3, 1, 4, 64, inet.IPPROTO_UDP, 0,
294 self.src_ip, self.dst_ip)
295 u = udp.udp(0x190F, 0x1F90, 0, 0)
301 p.add_protocol(self.payload)
305 e_buf = self.dst_mac_bin \
322 u_buf = b'\x19\x0F' \
327 buf = e_buf + ip_buf + u_buf + self.payload
330 pkt = packet.Packet(p.data)
331 protocols = self.get_protocols(pkt)
332 p_eth = protocols['ethernet']
333 p_ipv4 = protocols['ipv4']
334 p_udp = protocols['udp']
338 eq_(self.dst_mac, p_eth.dst)
339 eq_(self.src_mac, p_eth.src)
340 eq_(ether.ETH_TYPE_IP, p_eth.ethertype)
344 eq_(4, p_ipv4.version)
345 eq_(5, p_ipv4.header_length)
347 l = len(ip_buf) + len(u_buf) + len(self.payload)
348 eq_(l, p_ipv4.total_length)
349 eq_(3, p_ipv4.identification)
352 eq_(inet.IPPROTO_UDP, p_ipv4.proto)
353 eq_(self.src_ip, p_ipv4.src)
354 eq_(self.dst_ip, p_ipv4.dst)
355 t = bytearray(ip_buf)
356 struct.pack_into('!H', t, 10, p_ipv4.csum)
357 eq_(packet_utils.checksum(t), 0)
361 eq_(0x190f, p_udp.src_port)
362 eq_(0x1F90, p_udp.dst_port)
363 eq_(len(u_buf) + len(self.payload), p_udp.total_length)
364 eq_(0x77b2, p_udp.csum)
366 struct.pack_into('!H', t, 6, p_udp.csum)
367 ph = struct.pack('!4s4sBBH', self.src_ip_bin, self.dst_ip_bin, 0,
368 17, len(u_buf) + len(self.payload))
369 t = ph + t + self.payload
370 eq_(packet_utils.checksum(t), 0)
373 ok_('payload' in protocols)
374 eq_(self.payload, protocols['payload'])
377 eth_values = {'dst': self.dst_mac,
379 'ethertype': ether.ETH_TYPE_IP}
380 _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k]))
381 for k, v in inspect.getmembers(p_eth)
383 eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str)
385 ipv4_values = {'version': 4,
391 'offset': p_ipv4.offset,
393 'proto': inet.IPPROTO_UDP,
398 _ipv4_str = ','.join(['%s=%s' % (k, repr(ipv4_values[k]))
399 for k, v in inspect.getmembers(p_ipv4)
400 if k in ipv4_values])
401 ipv4_str = '%s(%s)' % (ipv4.ipv4.__name__, _ipv4_str)
403 udp_values = {'src_port': 0x190f,
405 'total_length': len(u_buf) + len(self.payload),
407 _udp_str = ','.join(['%s=%s' % (k, repr(udp_values[k]))
408 for k, v in inspect.getmembers(p_udp)
410 udp_str = '%s(%s)' % (udp.udp.__name__, _udp_str)
412 pkt_str = '%s, %s, %s, %s' % (eth_str, ipv4_str, udp_str,
413 repr(protocols['payload']))
415 eq_(eth_str, str(p_eth))
416 eq_(eth_str, repr(p_eth))
418 eq_(ipv4_str, str(p_ipv4))
419 eq_(ipv4_str, repr(p_ipv4))
421 eq_(udp_str, str(p_udp))
422 eq_(udp_str, repr(p_udp))
424 eq_(pkt_str, str(pkt))
425 eq_(pkt_str, repr(pkt))
427 def test_ipv4_tcp(self):
429 e = ethernet.ethernet(self.dst_mac, self.src_mac,
431 ip = ipv4.ipv4(4, 5, 0, 0, 0, 0, 0, 64, inet.IPPROTO_TCP, 0,
432 self.src_ip, self.dst_ip)
433 t = tcp.tcp(0x190F, 0x1F90, 0x123, 1, 6, 0b101010, 2048, 0, 0x6f,
440 p.add_protocol(self.payload)
444 e_buf = self.dst_mac_bin \
460 # tcp !HHIIBBHHH + option
461 t_buf = b'\x19\x0F' \
463 + b'\x00\x00\x01\x23' \
464 + b'\x00\x00\x00\x01' \
470 + b'\x01\x02\x00\x00'
472 buf = e_buf + ip_buf + t_buf + self.payload
475 pkt = packet.Packet(p.data)
476 protocols = self.get_protocols(pkt)
477 p_eth = protocols['ethernet']
478 p_ipv4 = protocols['ipv4']
479 p_tcp = protocols['tcp']
483 eq_(self.dst_mac, p_eth.dst)
484 eq_(self.src_mac, p_eth.src)
485 eq_(ether.ETH_TYPE_IP, p_eth.ethertype)
489 eq_(4, p_ipv4.version)
490 eq_(5, p_ipv4.header_length)
492 l = len(ip_buf) + len(t_buf) + len(self.payload)
493 eq_(l, p_ipv4.total_length)
494 eq_(0, p_ipv4.identification)
497 eq_(inet.IPPROTO_TCP, p_ipv4.proto)
498 eq_(self.src_ip, p_ipv4.src)
499 eq_(self.dst_ip, p_ipv4.dst)
500 t = bytearray(ip_buf)
501 struct.pack_into('!H', t, 10, p_ipv4.csum)
502 eq_(packet_utils.checksum(t), 0)
506 eq_(0x190f, p_tcp.src_port)
507 eq_(0x1F90, p_tcp.dst_port)
508 eq_(0x123, p_tcp.seq)
511 eq_(0b101010, p_tcp.bits)
512 eq_(2048, p_tcp.window_size)
513 eq_(0x6f, p_tcp.urgent)
514 eq_(len(t_buf), len(p_tcp))
516 struct.pack_into('!H', t, 16, p_tcp.csum)
517 ph = struct.pack('!4s4sBBH', self.src_ip_bin, self.dst_ip_bin, 0,
518 6, len(t_buf) + len(self.payload))
519 t = ph + t + self.payload
520 eq_(packet_utils.checksum(t), 0)
523 ok_('payload' in protocols)
524 eq_(self.payload, protocols['payload'])
527 eth_values = {'dst': self.dst_mac,
529 'ethertype': ether.ETH_TYPE_IP}
530 _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k]))
531 for k, v in inspect.getmembers(p_eth)
533 eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str)
535 ipv4_values = {'version': 4,
541 'offset': p_ipv4.offset,
543 'proto': inet.IPPROTO_TCP,
548 _ipv4_str = ','.join(['%s=%s' % (k, repr(ipv4_values[k]))
549 for k, v in inspect.getmembers(p_ipv4)
550 if k in ipv4_values])
551 ipv4_str = '%s(%s)' % (ipv4.ipv4.__name__, _ipv4_str)
553 tcp_values = {'src_port': 0x190f,
562 'option': p_tcp.option}
563 _tcp_str = ','.join(['%s=%s' % (k, repr(tcp_values[k]))
564 for k, v in inspect.getmembers(p_tcp)
566 tcp_str = '%s(%s)' % (tcp.tcp.__name__, _tcp_str)
568 pkt_str = '%s, %s, %s, %s' % (eth_str, ipv4_str, tcp_str,
569 repr(protocols['payload']))
571 eq_(eth_str, str(p_eth))
572 eq_(eth_str, repr(p_eth))
574 eq_(ipv4_str, str(p_ipv4))
575 eq_(ipv4_str, repr(p_ipv4))
577 eq_(tcp_str, str(p_tcp))
578 eq_(tcp_str, repr(p_tcp))
580 eq_(pkt_str, str(pkt))
581 eq_(pkt_str, repr(pkt))
583 def test_ipv4_sctp(self):
585 e = ethernet.ethernet()
586 ip = ipv4.ipv4(proto=inet.IPPROTO_SCTP)
587 s = sctp.sctp(chunks=[sctp.chunk_data(payload_data=self.payload)])
592 ipaddr = addrconv.ipv4.text_to_bin('0.0.0.0')
595 e_buf = b'\xff\xff\xff\xff\xff\xff' \
596 + b'\x00\x00\x00\x00\x00\x00' \
611 # sctp !HHII + chunk_data !BBHIHHI + payload
612 s_buf = b'\x00\x00' \
614 + b'\x00\x00\x00\x00' \
615 + b'\x00\x00\x00\x00' \
619 + b'\x00\x00\x00\x00' \
622 + b'\x00\x00\x00\x00' \
625 buf = e_buf + ip_buf + s_buf
628 pkt = packet.Packet(p.data)
629 protocols = self.get_protocols(pkt)
630 p_eth = protocols['ethernet']
631 p_ipv4 = protocols['ipv4']
632 p_sctp = protocols['sctp']
636 eq_('ff:ff:ff:ff:ff:ff', p_eth.dst)
637 eq_('00:00:00:00:00:00', p_eth.src)
638 eq_(ether.ETH_TYPE_IP, p_eth.ethertype)
642 eq_(4, p_ipv4.version)
643 eq_(5, p_ipv4.header_length)
645 l = len(ip_buf) + len(s_buf)
646 eq_(l, p_ipv4.total_length)
647 eq_(0, p_ipv4.identification)
650 eq_(inet.IPPROTO_SCTP, p_ipv4.proto)
651 eq_('10.0.0.1', p_ipv4.src)
652 eq_('10.0.0.2', p_ipv4.dst)
653 t = bytearray(ip_buf)
654 struct.pack_into('!H', t, 10, p_ipv4.csum)
655 eq_(packet_utils.checksum(t), 0x1403)
659 eq_(1, p_sctp.src_port)
660 eq_(1, p_sctp.dst_port)
662 assert isinstance(p_sctp.chunks[0], sctp.chunk_data)
663 eq_(0, p_sctp.chunks[0]._type)
664 eq_(0, p_sctp.chunks[0].unordered)
665 eq_(0, p_sctp.chunks[0].begin)
666 eq_(0, p_sctp.chunks[0].end)
667 eq_(16 + len(self.payload), p_sctp.chunks[0].length)
668 eq_(0, p_sctp.chunks[0].tsn)
669 eq_(0, p_sctp.chunks[0].sid)
670 eq_(0, p_sctp.chunks[0].seq)
671 eq_(0, p_sctp.chunks[0].payload_id)
672 eq_(self.payload, p_sctp.chunks[0].payload_data)
673 eq_(len(s_buf), len(p_sctp))
676 eth_values = {'dst': 'ff:ff:ff:ff:ff:ff',
677 'src': '00:00:00:00:00:00',
678 'ethertype': ether.ETH_TYPE_IP}
679 _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k]))
680 for k, v in inspect.getmembers(p_eth)
682 eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str)
684 ipv4_values = {'version': 4,
692 'proto': inet.IPPROTO_SCTP,
697 _ipv4_str = ','.join(['%s=%s' % (k, repr(ipv4_values[k]))
698 for k, v in inspect.getmembers(p_ipv4)
699 if k in ipv4_values])
700 ipv4_str = '%s(%s)' % (ipv4.ipv4.__name__, _ipv4_str)
702 data_values = {'unordered': 0,
705 'length': 16 + len(self.payload),
710 'payload_data': self.payload}
711 _data_str = ','.join(['%s=%s' % (k, repr(data_values[k]))
712 for k in sorted(data_values.keys())])
713 data_str = '[%s(%s)]' % (sctp.chunk_data.__name__, _data_str)
715 sctp_values = {'src_port': 1,
718 'csum': repr(p_sctp.csum),
720 _sctp_str = ','.join(['%s=%s' % (k, sctp_values[k])
721 for k, _ in inspect.getmembers(p_sctp)
722 if k in sctp_values])
723 sctp_str = '%s(%s)' % (sctp.sctp.__name__, _sctp_str)
725 pkt_str = '%s, %s, %s' % (eth_str, ipv4_str, sctp_str)
727 eq_(eth_str, str(p_eth))
728 eq_(eth_str, repr(p_eth))
730 eq_(ipv4_str, str(p_ipv4))
731 eq_(ipv4_str, repr(p_ipv4))
733 eq_(sctp_str, str(p_sctp))
734 eq_(sctp_str, repr(p_sctp))
736 eq_(pkt_str, str(pkt))
737 eq_(pkt_str, repr(pkt))
739 def test_ipv4_icmp(self):
741 e = ethernet.ethernet()
742 ip = ipv4.ipv4(proto=inet.IPPROTO_ICMP)
748 ipaddr = addrconv.ipv4.text_to_bin('0.0.0.0')
751 e_buf = b'\xff\xff\xff\xff\xff\xff' \
752 + b'\x00\x00\x00\x00\x00\x00' \
767 # icmp !BBH + echo !HH
774 buf = e_buf + ip_buf + ic_buf
777 pkt = packet.Packet(p.data)
778 protocols = self.get_protocols(pkt)
779 p_eth = protocols['ethernet']
780 p_ipv4 = protocols['ipv4']
781 p_icmp = protocols['icmp']
785 eq_('ff:ff:ff:ff:ff:ff', p_eth.dst)
786 eq_('00:00:00:00:00:00', p_eth.src)
787 eq_(ether.ETH_TYPE_IP, p_eth.ethertype)
791 eq_(4, p_ipv4.version)
792 eq_(5, p_ipv4.header_length)
794 l = len(ip_buf) + len(ic_buf)
795 eq_(l, p_ipv4.total_length)
796 eq_(0, p_ipv4.identification)
799 eq_(inet.IPPROTO_ICMP, p_ipv4.proto)
800 eq_('10.0.0.1', p_ipv4.src)
801 eq_('10.0.0.2', p_ipv4.dst)
802 t = bytearray(ip_buf)
803 struct.pack_into('!H', t, 10, p_ipv4.csum)
804 eq_(packet_utils.checksum(t), 0x1403)
810 eq_(0, p_icmp.data.id)
811 eq_(0, p_icmp.data.seq)
812 eq_(len(ic_buf), len(p_icmp))
813 t = bytearray(ic_buf)
814 struct.pack_into('!H', t, 2, p_icmp.csum)
815 eq_(packet_utils.checksum(t), 0)
818 eth_values = {'dst': 'ff:ff:ff:ff:ff:ff',
819 'src': '00:00:00:00:00:00',
820 'ethertype': ether.ETH_TYPE_IP}
821 _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k]))
822 for k, _ in inspect.getmembers(p_eth)
824 eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str)
826 ipv4_values = {'version': 4,
832 'offset': p_ipv4.offset,
834 'proto': inet.IPPROTO_ICMP,
839 _ipv4_str = ','.join(['%s=%s' % (k, repr(ipv4_values[k]))
840 for k, _ in inspect.getmembers(p_ipv4)
841 if k in ipv4_values])
842 ipv4_str = '%s(%s)' % (ipv4.ipv4.__name__, _ipv4_str)
844 echo_values = {'id': 0,
847 _echo_str = ','.join(['%s=%s' % (k, repr(echo_values[k]))
848 for k in sorted(echo_values.keys())])
849 echo_str = '%s(%s)' % (icmp.echo.__name__, _echo_str)
850 icmp_values = {'type': 8,
854 _icmp_str = ','.join(['%s=%s' % (k, icmp_values[k])
855 for k, _ in inspect.getmembers(p_icmp)
856 if k in icmp_values])
857 icmp_str = '%s(%s)' % (icmp.icmp.__name__, _icmp_str)
859 pkt_str = '%s, %s, %s' % (eth_str, ipv4_str, icmp_str)
861 eq_(eth_str, str(p_eth))
862 eq_(eth_str, repr(p_eth))
864 eq_(ipv4_str, str(p_ipv4))
865 eq_(ipv4_str, repr(p_ipv4))
867 eq_(icmp_str, str(p_icmp))
868 eq_(icmp_str, repr(p_icmp))
870 eq_(pkt_str, str(pkt))
871 eq_(pkt_str, repr(pkt))
873 def test_ipv6_udp(self):
875 e = ethernet.ethernet(ethertype=ether.ETH_TYPE_IPV6)
876 ip = ipv6.ipv6(nxt=inet.IPPROTO_UDP)
879 p = e / ip / u / self.payload
882 ipaddr = addrconv.ipv6.text_to_bin('::')
885 e_buf = b'\xff\xff\xff\xff\xff\xff' \
886 + b'\x00\x00\x00\x00\x00\x00' \
890 ip_buf = b'\x60\x00\x00\x00' \
899 u_buf = b'\x00\x00' \
904 buf = e_buf + ip_buf + u_buf + self.payload
907 pkt = packet.Packet(p.data)
908 protocols = self.get_protocols(pkt)
909 p_eth = protocols['ethernet']
910 p_ipv6 = protocols['ipv6']
911 p_udp = protocols['udp']
915 eq_('ff:ff:ff:ff:ff:ff', p_eth.dst)
916 eq_('00:00:00:00:00:00', p_eth.src)
917 eq_(ether.ETH_TYPE_IPV6, p_eth.ethertype)
921 eq_(6, p_ipv6.version)
922 eq_(0, p_ipv6.traffic_class)
923 eq_(0, p_ipv6.flow_label)
924 eq_(len(u_buf) + len(self.payload), p_ipv6.payload_length)
925 eq_(inet.IPPROTO_UDP, p_ipv6.nxt)
926 eq_(255, p_ipv6.hop_limit)
927 eq_('10::10', p_ipv6.src)
928 eq_('20::20', p_ipv6.dst)
932 eq_(1, p_udp.src_port)
933 eq_(1, p_udp.dst_port)
934 eq_(len(u_buf) + len(self.payload), p_udp.total_length)
935 eq_(0x2B60, p_udp.csum)
937 struct.pack_into('!H', t, 6, p_udp.csum)
938 ph = struct.pack('!16s16sI3xB', ipaddr, ipaddr,
939 len(u_buf) + len(self.payload), 17)
940 t = ph + t + self.payload
941 eq_(packet_utils.checksum(t), 0x62)
944 ok_('payload' in protocols)
945 eq_(self.payload, protocols['payload'])
948 eth_values = {'dst': 'ff:ff:ff:ff:ff:ff',
949 'src': '00:00:00:00:00:00',
950 'ethertype': ether.ETH_TYPE_IPV6}
951 _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k]))
952 for k, v in inspect.getmembers(p_eth)
954 eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str)
956 ipv6_values = {'version': 6,
959 'payload_length': len(u_buf) + len(self.payload),
960 'nxt': inet.IPPROTO_UDP,
965 _ipv6_str = ','.join(['%s=%s' % (k, repr(ipv6_values[k]))
966 for k, v in inspect.getmembers(p_ipv6)
967 if k in ipv6_values])
968 ipv6_str = '%s(%s)' % (ipv6.ipv6.__name__, _ipv6_str)
970 udp_values = {'src_port': 1,
972 'total_length': len(u_buf) + len(self.payload),
974 _udp_str = ','.join(['%s=%s' % (k, repr(udp_values[k]))
975 for k, v in inspect.getmembers(p_udp)
977 udp_str = '%s(%s)' % (udp.udp.__name__, _udp_str)
979 pkt_str = '%s, %s, %s, %s' % (eth_str, ipv6_str, udp_str,
980 repr(protocols['payload']))
982 eq_(eth_str, str(p_eth))
983 eq_(eth_str, repr(p_eth))
985 eq_(ipv6_str, str(p_ipv6))
986 eq_(ipv6_str, repr(p_ipv6))
988 eq_(udp_str, str(p_udp))
989 eq_(udp_str, repr(p_udp))
991 eq_(pkt_str, str(pkt))
992 eq_(pkt_str, repr(pkt))
994 def test_ipv6_tcp(self):
996 e = ethernet.ethernet(ethertype=ether.ETH_TYPE_IPV6)
998 t = tcp.tcp(option=b'\x01\x02')
1000 p = e / ip / t / self.payload
1003 ipaddr = addrconv.ipv6.text_to_bin('::')
1006 e_buf = b'\xff\xff\xff\xff\xff\xff' \
1007 + b'\x00\x00\x00\x00\x00\x00' \
1011 ip_buf = b'\x60\x00\x00\x00' \
1019 # tcp !HHIIBBHHH + option
1020 t_buf = b'\x00\x00' \
1022 + b'\x00\x00\x00\x00' \
1023 + b'\x00\x00\x00\x00' \
1029 + b'\x01\x02\x00\x00'
1031 buf = e_buf + ip_buf + t_buf + self.payload
1034 pkt = packet.Packet(p.data)
1035 protocols = self.get_protocols(pkt)
1036 p_eth = protocols['ethernet']
1037 p_ipv6 = protocols['ipv6']
1038 p_tcp = protocols['tcp']
1042 eq_('ff:ff:ff:ff:ff:ff', p_eth.dst)
1043 eq_('00:00:00:00:00:00', p_eth.src)
1044 eq_(ether.ETH_TYPE_IPV6, p_eth.ethertype)
1048 eq_(6, p_ipv6.version)
1049 eq_(0, p_ipv6.traffic_class)
1050 eq_(0, p_ipv6.flow_label)
1051 eq_(len(t_buf) + len(self.payload), p_ipv6.payload_length)
1052 eq_(inet.IPPROTO_TCP, p_ipv6.nxt)
1053 eq_(255, p_ipv6.hop_limit)
1054 eq_('10::10', p_ipv6.src)
1055 eq_('20::20', p_ipv6.dst)
1059 eq_(1, p_tcp.src_port)
1060 eq_(1, p_tcp.dst_port)
1063 eq_(6, p_tcp.offset)
1065 eq_(0, p_tcp.window_size)
1066 eq_(0, p_tcp.urgent)
1067 eq_(len(t_buf), len(p_tcp))
1068 t = bytearray(t_buf)
1069 struct.pack_into('!H', t, 16, p_tcp.csum)
1070 ph = struct.pack('!16s16sI3xB', ipaddr, ipaddr,
1071 len(t_buf) + len(self.payload), 6)
1072 t = ph + t + self.payload
1073 eq_(packet_utils.checksum(t), 0x62)
1076 ok_('payload' in protocols)
1077 eq_(self.payload, protocols['payload'])
1080 eth_values = {'dst': 'ff:ff:ff:ff:ff:ff',
1081 'src': '00:00:00:00:00:00',
1082 'ethertype': ether.ETH_TYPE_IPV6}
1083 _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k]))
1084 for k, v in inspect.getmembers(p_eth)
1085 if k in eth_values])
1086 eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str)
1088 ipv6_values = {'version': 6,
1091 'payload_length': len(t_buf) + len(self.payload),
1092 'nxt': inet.IPPROTO_TCP,
1097 _ipv6_str = ','.join(['%s=%s' % (k, repr(ipv6_values[k]))
1098 for k, v in inspect.getmembers(p_ipv6)
1099 if k in ipv6_values])
1100 ipv6_str = '%s(%s)' % (ipv6.ipv6.__name__, _ipv6_str)
1102 tcp_values = {'src_port': 1,
1111 'option': p_tcp.option}
1112 _tcp_str = ','.join(['%s=%s' % (k, repr(tcp_values[k]))
1113 for k, v in inspect.getmembers(p_tcp)
1114 if k in tcp_values])
1115 tcp_str = '%s(%s)' % (tcp.tcp.__name__, _tcp_str)
1117 pkt_str = '%s, %s, %s, %s' % (eth_str, ipv6_str, tcp_str,
1118 repr(protocols['payload']))
1120 eq_(eth_str, str(p_eth))
1121 eq_(eth_str, repr(p_eth))
1123 eq_(ipv6_str, str(p_ipv6))
1124 eq_(ipv6_str, repr(p_ipv6))
1126 eq_(tcp_str, str(p_tcp))
1127 eq_(tcp_str, repr(p_tcp))
1129 eq_(pkt_str, str(pkt))
1130 eq_(pkt_str, repr(pkt))
1132 def test_ipv6_sctp(self):
1134 e = ethernet.ethernet(ethertype=ether.ETH_TYPE_IPV6)
1135 ip = ipv6.ipv6(nxt=inet.IPPROTO_SCTP)
1136 s = sctp.sctp(chunks=[sctp.chunk_data(payload_data=self.payload)])
1141 ipaddr = addrconv.ipv6.text_to_bin('::')
1144 e_buf = b'\xff\xff\xff\xff\xff\xff' \
1145 + b'\x00\x00\x00\x00\x00\x00' \
1149 ip_buf = b'\x60\x00\x00\x00' \
1157 # sctp !HHII + chunk_data !BBHIHHI + payload
1158 s_buf = b'\x00\x00' \
1160 + b'\x00\x00\x00\x00' \
1161 + b'\x00\x00\x00\x00' \
1165 + b'\x00\x00\x00\x00' \
1168 + b'\x00\x00\x00\x00' \
1171 buf = e_buf + ip_buf + s_buf
1174 pkt = packet.Packet(p.data)
1175 protocols = self.get_protocols(pkt)
1176 p_eth = protocols['ethernet']
1177 p_ipv6 = protocols['ipv6']
1178 p_sctp = protocols['sctp']
1182 eq_('ff:ff:ff:ff:ff:ff', p_eth.dst)
1183 eq_('00:00:00:00:00:00', p_eth.src)
1184 eq_(ether.ETH_TYPE_IPV6, p_eth.ethertype)
1188 eq_(6, p_ipv6.version)
1189 eq_(0, p_ipv6.traffic_class)
1190 eq_(0, p_ipv6.flow_label)
1191 eq_(len(s_buf), p_ipv6.payload_length)
1192 eq_(inet.IPPROTO_SCTP, p_ipv6.nxt)
1193 eq_(255, p_ipv6.hop_limit)
1194 eq_('10::10', p_ipv6.src)
1195 eq_('20::20', p_ipv6.dst)
1199 eq_(1, p_sctp.src_port)
1200 eq_(1, p_sctp.dst_port)
1202 assert isinstance(p_sctp.chunks[0], sctp.chunk_data)
1203 eq_(0, p_sctp.chunks[0]._type)
1204 eq_(0, p_sctp.chunks[0].unordered)
1205 eq_(0, p_sctp.chunks[0].begin)
1206 eq_(0, p_sctp.chunks[0].end)
1207 eq_(16 + len(self.payload), p_sctp.chunks[0].length)
1208 eq_(0, p_sctp.chunks[0].tsn)
1209 eq_(0, p_sctp.chunks[0].sid)
1210 eq_(0, p_sctp.chunks[0].seq)
1211 eq_(0, p_sctp.chunks[0].payload_id)
1212 eq_(self.payload, p_sctp.chunks[0].payload_data)
1213 eq_(len(s_buf), len(p_sctp))
1216 eth_values = {'dst': 'ff:ff:ff:ff:ff:ff',
1217 'src': '00:00:00:00:00:00',
1218 'ethertype': ether.ETH_TYPE_IPV6}
1219 _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k]))
1220 for k, v in inspect.getmembers(p_eth)
1221 if k in eth_values])
1222 eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str)
1224 ipv6_values = {'version': 6,
1227 'payload_length': len(s_buf),
1228 'nxt': inet.IPPROTO_SCTP,
1233 _ipv6_str = ','.join(['%s=%s' % (k, repr(ipv6_values[k]))
1234 for k, v in inspect.getmembers(p_ipv6)
1235 if k in ipv6_values])
1236 ipv6_str = '%s(%s)' % (ipv6.ipv6.__name__, _ipv6_str)
1238 data_values = {'unordered': 0,
1241 'length': 16 + len(self.payload),
1246 'payload_data': self.payload}
1247 _data_str = ','.join(['%s=%s' % (k, repr(data_values[k]))
1248 for k in sorted(data_values.keys())])
1249 data_str = '[%s(%s)]' % (sctp.chunk_data.__name__, _data_str)
1251 sctp_values = {'src_port': 1,
1254 'csum': repr(p_sctp.csum),
1256 _sctp_str = ','.join(['%s=%s' % (k, sctp_values[k])
1257 for k, _ in inspect.getmembers(p_sctp)
1258 if k in sctp_values])
1259 sctp_str = '%s(%s)' % (sctp.sctp.__name__, _sctp_str)
1261 pkt_str = '%s, %s, %s' % (eth_str, ipv6_str, sctp_str)
1263 eq_(eth_str, str(p_eth))
1264 eq_(eth_str, repr(p_eth))
1266 eq_(ipv6_str, str(p_ipv6))
1267 eq_(ipv6_str, repr(p_ipv6))
1269 eq_(sctp_str, str(p_sctp))
1270 eq_(sctp_str, repr(p_sctp))
1272 eq_(pkt_str, str(pkt))
1273 eq_(pkt_str, repr(pkt))
1275 def test_ipv6_icmpv6(self):
1277 e = ethernet.ethernet(ethertype=ether.ETH_TYPE_IPV6)
1278 ip = ipv6.ipv6(nxt=inet.IPPROTO_ICMPV6)
1279 ic = icmpv6.icmpv6()
1284 ipaddr = addrconv.ipv6.text_to_bin('::')
1287 e_buf = b'\xff\xff\xff\xff\xff\xff' \
1288 + b'\x00\x00\x00\x00\x00\x00' \
1292 ip_buf = b'\x60\x00\x00\x00' \
1305 buf = e_buf + ip_buf + ic_buf
1308 pkt = packet.Packet(p.data)
1309 protocols = self.get_protocols(pkt)
1310 p_eth = protocols['ethernet']
1311 p_ipv6 = protocols['ipv6']
1312 p_icmpv6 = protocols['icmpv6']
1316 eq_('ff:ff:ff:ff:ff:ff', p_eth.dst)
1317 eq_('00:00:00:00:00:00', p_eth.src)
1318 eq_(ether.ETH_TYPE_IPV6, p_eth.ethertype)
1322 eq_(6, p_ipv6.version)
1323 eq_(0, p_ipv6.traffic_class)
1324 eq_(0, p_ipv6.flow_label)
1325 eq_(len(ic_buf), p_ipv6.payload_length)
1326 eq_(inet.IPPROTO_ICMPV6, p_ipv6.nxt)
1327 eq_(255, p_ipv6.hop_limit)
1328 eq_('10::10', p_ipv6.src)
1329 eq_('20::20', p_ipv6.dst)
1333 eq_(0, p_icmpv6.type_)
1334 eq_(0, p_icmpv6.code)
1335 eq_(len(ic_buf), len(p_icmpv6))
1336 t = bytearray(ic_buf)
1337 struct.pack_into('!H', t, 2, p_icmpv6.csum)
1338 ph = struct.pack('!16s16sI3xB', ipaddr, ipaddr, len(ic_buf), 58)
1340 eq_(packet_utils.checksum(t), 0x60)
1343 eth_values = {'dst': 'ff:ff:ff:ff:ff:ff',
1344 'src': '00:00:00:00:00:00',
1345 'ethertype': ether.ETH_TYPE_IPV6}
1346 _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k]))
1347 for k, _ in inspect.getmembers(p_eth)
1348 if k in eth_values])
1349 eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str)
1351 ipv6_values = {'version': 6,
1354 'payload_length': len(ic_buf),
1355 'nxt': inet.IPPROTO_ICMPV6,
1360 _ipv6_str = ','.join(['%s=%s' % (k, repr(ipv6_values[k]))
1361 for k, _ in inspect.getmembers(p_ipv6)
1362 if k in ipv6_values])
1363 ipv6_str = '%s(%s)' % (ipv6.ipv6.__name__, _ipv6_str)
1365 icmpv6_values = {'type_': 0,
1367 'csum': p_icmpv6.csum,
1369 _icmpv6_str = ','.join(['%s=%s' % (k, repr(icmpv6_values[k]))
1370 for k, _ in inspect.getmembers(p_icmpv6)
1371 if k in icmpv6_values])
1372 icmpv6_str = '%s(%s)' % (icmpv6.icmpv6.__name__, _icmpv6_str)
1374 pkt_str = '%s, %s, %s' % (eth_str, ipv6_str, icmpv6_str)
1376 eq_(eth_str, str(p_eth))
1377 eq_(eth_str, repr(p_eth))
1379 eq_(ipv6_str, str(p_ipv6))
1380 eq_(ipv6_str, repr(p_ipv6))
1382 eq_(icmpv6_str, str(p_icmpv6))
1383 eq_(icmpv6_str, repr(p_icmpv6))
1385 eq_(pkt_str, str(pkt))
1386 eq_(pkt_str, repr(pkt))
1388 def test_llc_bpdu(self):
1390 e = ethernet.ethernet(self.dst_mac, self.src_mac,
1391 ether.ETH_TYPE_IEEE802_3)
1392 llc_control = llc.ControlFormatU(0, 0, 0)
1393 l = llc.llc(llc.SAP_BPDU, llc.SAP_BPDU, llc_control)
1394 b = bpdu.ConfigurationBPDUs(flags=0,
1395 root_priority=32768,
1396 root_system_id_extension=0,
1397 root_mac_address=self.src_mac,
1399 bridge_priority=32768,
1400 bridge_system_id_extension=0,
1401 bridge_mac_address=self.dst_mac,
1416 e_buf = self.dst_mac_bin + self.src_mac_bin + b'\x05\xdc'
1423 # bpdu !HBBBQIQHHHHH
1424 b_buf = (b'\x00\x00'
1428 b'\x80\x00\xbb\xbb\xbb\xbb\xbb\xbb'
1430 b'\x80\x00\xaa\xaa\xaa\xaa\xaa\xaa'
1437 buf = e_buf + l_buf + b_buf
1439 # Append padding if ethernet frame is less than 60 bytes length
1440 pad_len = 60 - len(buf)
1442 buf += b'\x00' * pad_len
1446 pkt = packet.Packet(p.data)
1447 protocols = self.get_protocols(pkt)
1448 p_eth = protocols['ethernet']
1449 p_llc = protocols['llc']
1450 p_bpdu = protocols['ConfigurationBPDUs']
1454 eq_(self.dst_mac, p_eth.dst)
1455 eq_(self.src_mac, p_eth.src)
1456 eq_(ether.ETH_TYPE_IEEE802_3, p_eth.ethertype)
1460 eq_(llc.SAP_BPDU, p_llc.dsap_addr)
1461 eq_(llc.SAP_BPDU, p_llc.ssap_addr)
1462 eq_(0, p_llc.control.modifier_function1)
1463 eq_(0, p_llc.control.pf_bit)
1464 eq_(0, p_llc.control.modifier_function2)
1468 eq_(bpdu.PROTOCOL_IDENTIFIER, p_bpdu._protocol_id)
1469 eq_(bpdu.PROTOCOLVERSION_ID_BPDU, p_bpdu._version_id)
1470 eq_(bpdu.TYPE_CONFIG_BPDU, p_bpdu._bpdu_type)
1471 eq_(0, p_bpdu.flags)
1472 eq_(32768, p_bpdu.root_priority)
1473 eq_(0, p_bpdu.root_system_id_extension)
1474 eq_(self.src_mac, p_bpdu.root_mac_address)
1475 eq_(0, p_bpdu.root_path_cost)
1476 eq_(32768, p_bpdu.bridge_priority)
1477 eq_(0, p_bpdu.bridge_system_id_extension)
1478 eq_(self.dst_mac, p_bpdu.bridge_mac_address)
1479 eq_(128, p_bpdu.port_priority)
1480 eq_(4, p_bpdu.port_number)
1481 eq_(1, p_bpdu.message_age)
1482 eq_(20, p_bpdu.max_age)
1483 eq_(2, p_bpdu.hello_time)
1484 eq_(15, p_bpdu.forward_delay)
1487 eth_values = {'dst': self.dst_mac,
1488 'src': self.src_mac,
1489 'ethertype': ether.ETH_TYPE_IEEE802_3}
1490 _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k]))
1491 for k, v in inspect.getmembers(p_eth)
1492 if k in eth_values])
1493 eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str)
1495 ctrl_values = {'modifier_function1': 0,
1497 'modifier_function2': 0}
1498 _ctrl_str = ','.join(['%s=%s' % (k, repr(ctrl_values[k]))
1499 for k, v in inspect.getmembers(p_llc.control)
1500 if k in ctrl_values])
1501 ctrl_str = '%s(%s)' % (llc.ControlFormatU.__name__, _ctrl_str)
1503 llc_values = {'dsap_addr': repr(llc.SAP_BPDU),
1504 'ssap_addr': repr(llc.SAP_BPDU),
1505 'control': ctrl_str}
1506 _llc_str = ','.join(['%s=%s' % (k, llc_values[k])
1507 for k, v in inspect.getmembers(p_llc)
1508 if k in llc_values])
1509 llc_str = '%s(%s)' % (llc.llc.__name__, _llc_str)
1511 _long = int if six.PY3 else long
1512 bpdu_values = {'flags': 0,
1513 'root_priority': _long(32768),
1514 'root_system_id_extension': _long(0),
1515 'root_mac_address': self.src_mac,
1516 'root_path_cost': 0,
1517 'bridge_priority': _long(32768),
1518 'bridge_system_id_extension': _long(0),
1519 'bridge_mac_address': self.dst_mac,
1520 'port_priority': 128,
1522 'message_age': float(1),
1523 'max_age': float(20),
1524 'hello_time': float(2),
1525 'forward_delay': float(15)}
1526 _bpdu_str = ','.join(['%s=%s' % (k, repr(bpdu_values[k]))
1527 for k, v in inspect.getmembers(p_bpdu)
1528 if k in bpdu_values])
1529 bpdu_str = '%s(%s)' % (bpdu.ConfigurationBPDUs.__name__, _bpdu_str)
1531 pkt_str = '%s, %s, %s' % (eth_str, llc_str, bpdu_str)
1533 eq_(eth_str, str(p_eth))
1534 eq_(eth_str, repr(p_eth))
1536 eq_(llc_str, str(p_llc))
1537 eq_(llc_str, repr(p_llc))
1539 eq_(bpdu_str, str(p_bpdu))
1540 eq_(bpdu_str, repr(p_bpdu))
1542 eq_(pkt_str, str(pkt))
1543 eq_(pkt_str, repr(pkt))
1545 def test_div_api(self):
1546 e = ethernet.ethernet(self.dst_mac, self.src_mac, ether.ETH_TYPE_IP)
1548 u = udp.udp(self.src_port, self.dst_port)
1550 ok_(isinstance(pkt, packet.Packet))
1551 ok_(isinstance(pkt.protocols[0], ethernet.ethernet))
1552 ok_(isinstance(pkt.protocols[1], ipv4.ipv4))
1553 ok_(isinstance(pkt.protocols[2], udp.udp))