--- /dev/null
+# Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+import unittest
+import logging
+import struct
+import inspect
+from nose.tools import ok_, eq_
+import six
+from ryu.ofproto import ether, inet
+from ryu.lib.packet import arp
+from ryu.lib.packet import bpdu
+from ryu.lib.packet import ethernet
+from ryu.lib.packet import icmp, icmpv6
+from ryu.lib.packet import ipv4, ipv6
+from ryu.lib.packet import llc
+from ryu.lib.packet import packet, packet_utils
+from ryu.lib.packet import sctp
+from ryu.lib.packet import tcp, udp
+from ryu.lib.packet import vlan
+from ryu.lib import addrconv
+
+
+LOG = logging.getLogger('test_packet')
+
+
+class TestPacket(unittest.TestCase):
+ """ Test case for packet
+ """
+
+ dst_mac = 'aa:aa:aa:aa:aa:aa'
+ src_mac = 'bb:bb:bb:bb:bb:bb'
+ dst_mac_bin = addrconv.mac.text_to_bin(dst_mac)
+ src_mac_bin = addrconv.mac.text_to_bin(src_mac)
+ dst_ip = '192.168.128.10'
+ src_ip = '192.168.122.20'
+ dst_ip_bin = addrconv.ipv4.text_to_bin(dst_ip)
+ src_port = 50001
+ dst_port = 50002
+ src_ip_bin = addrconv.ipv4.text_to_bin(src_ip)
+ payload = b'\x06\x06\x47\x50\x00\x00\x00\x00' \
+ + b'\xcd\xc5\x00\x00\x00\x00\x00\x00' \
+ + b'\x10\x11\x12\x13\x14\x15\x16\x17' \
+ + b'\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f'
+
+ def get_protocols(self, pkt):
+ protocols = {}
+ for p in pkt:
+ if hasattr(p, 'protocol_name'):
+ protocols[p.protocol_name] = p
+ else:
+ protocols['payload'] = p
+ return protocols
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_arp(self):
+ # buid packet
+ e = ethernet.ethernet(self.dst_mac, self.src_mac,
+ ether.ETH_TYPE_ARP)
+ a = arp.arp(1, ether.ETH_TYPE_IP, 6, 4, 2,
+ self.src_mac, self.src_ip, self.dst_mac,
+ self.dst_ip)
+ p = packet.Packet()
+ p.add_protocol(e)
+ p.add_protocol(a)
+ p.serialize()
+
+ # ethernet !6s6sH
+ e_buf = self.dst_mac_bin \
+ + self.src_mac_bin \
+ + b'\x08\x06'
+
+ # arp !HHBBH6sI6sI
+ a_buf = b'\x00\x01' \
+ + b'\x08\x00' \
+ + b'\x06' \
+ + b'\x04' \
+ + b'\x00\x02' \
+ + self.src_mac_bin \
+ + self.src_ip_bin \
+ + self.dst_mac_bin \
+ + self.dst_ip_bin
+
+ buf = e_buf + a_buf
+
+ # Append padding if ethernet frame is less than 60 bytes length
+ pad_len = 60 - len(buf)
+ if pad_len > 0:
+ buf += b'\x00' * pad_len
+ eq_(buf, p.data)
+
+ # parse
+ pkt = packet.Packet(p.data)
+ protocols = self.get_protocols(pkt)
+ p_eth = protocols['ethernet']
+ p_arp = protocols['arp']
+
+ # ethernet
+ ok_(p_eth)
+ eq_(self.dst_mac, p_eth.dst)
+ eq_(self.src_mac, p_eth.src)
+ eq_(ether.ETH_TYPE_ARP, p_eth.ethertype)
+
+ # arp
+ ok_(p_arp)
+ eq_(1, p_arp.hwtype)
+ eq_(ether.ETH_TYPE_IP, p_arp.proto)
+ eq_(6, p_arp.hlen)
+ eq_(4, p_arp.plen)
+ eq_(2, p_arp.opcode)
+ eq_(self.src_mac, p_arp.src_mac)
+ eq_(self.src_ip, p_arp.src_ip)
+ eq_(self.dst_mac, p_arp.dst_mac)
+ eq_(self.dst_ip, p_arp.dst_ip)
+
+ # to string
+ eth_values = {'dst': self.dst_mac,
+ 'src': self.src_mac,
+ 'ethertype': ether.ETH_TYPE_ARP}
+ _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k]))
+ for k, v in inspect.getmembers(p_eth)
+ if k in eth_values])
+ eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str)
+
+ arp_values = {'hwtype': 1,
+ 'proto': ether.ETH_TYPE_IP,
+ 'hlen': 6,
+ 'plen': 4,
+ 'opcode': 2,
+ 'src_mac': self.src_mac,
+ 'dst_mac': self.dst_mac,
+ 'src_ip': self.src_ip,
+ 'dst_ip': self.dst_ip}
+ _arp_str = ','.join(['%s=%s' % (k, repr(arp_values[k]))
+ for k, v in inspect.getmembers(p_arp)
+ if k in arp_values])
+ arp_str = '%s(%s)' % (arp.arp.__name__, _arp_str)
+
+ pkt_str = '%s, %s' % (eth_str, arp_str)
+
+ eq_(eth_str, str(p_eth))
+ eq_(eth_str, repr(p_eth))
+
+ eq_(arp_str, str(p_arp))
+ eq_(arp_str, repr(p_arp))
+
+ eq_(pkt_str, str(pkt))
+ eq_(pkt_str, repr(pkt))
+
+ def test_vlan_arp(self):
+ # buid packet
+ e = ethernet.ethernet(self.dst_mac, self.src_mac,
+ ether.ETH_TYPE_8021Q)
+ v = vlan.vlan(0b111, 0b1, 3, ether.ETH_TYPE_ARP)
+ a = arp.arp(1, ether.ETH_TYPE_IP, 6, 4, 2,
+ self.src_mac, self.src_ip, self.dst_mac,
+ self.dst_ip)
+ p = packet.Packet()
+ p.add_protocol(e)
+ p.add_protocol(v)
+ p.add_protocol(a)
+ p.serialize()
+
+ # ethernet !6s6sH
+ e_buf = self.dst_mac_bin \
+ + self.src_mac_bin \
+ + b'\x81\x00'
+
+ # vlan !HH
+ v_buf = b'\xF0\x03' \
+ + b'\x08\x06'
+
+ # arp !HHBBH6sI6sI
+ a_buf = b'\x00\x01' \
+ + b'\x08\x00' \
+ + b'\x06' \
+ + b'\x04' \
+ + b'\x00\x02' \
+ + self.src_mac_bin \
+ + self.src_ip_bin \
+ + self.dst_mac_bin \
+ + self.dst_ip_bin
+
+ buf = e_buf + v_buf + a_buf
+
+ # Append padding if ethernet frame is less than 60 bytes length
+ pad_len = 60 - len(buf)
+ if pad_len > 0:
+ buf += b'\x00' * pad_len
+ eq_(buf, p.data)
+
+ # parse
+ pkt = packet.Packet(p.data)
+ protocols = self.get_protocols(pkt)
+ p_eth = protocols['ethernet']
+ p_vlan = protocols['vlan']
+ p_arp = protocols['arp']
+
+ # ethernet
+ ok_(p_eth)
+ eq_(self.dst_mac, p_eth.dst)
+ eq_(self.src_mac, p_eth.src)
+ eq_(ether.ETH_TYPE_8021Q, p_eth.ethertype)
+
+ # vlan
+ ok_(p_vlan)
+ eq_(0b111, p_vlan.pcp)
+ eq_(0b1, p_vlan.cfi)
+ eq_(3, p_vlan.vid)
+ eq_(ether.ETH_TYPE_ARP, p_vlan.ethertype)
+
+ # arp
+ ok_(p_arp)
+ eq_(1, p_arp.hwtype)
+ eq_(ether.ETH_TYPE_IP, p_arp.proto)
+ eq_(6, p_arp.hlen)
+ eq_(4, p_arp.plen)
+ eq_(2, p_arp.opcode)
+ eq_(self.src_mac, p_arp.src_mac)
+ eq_(self.src_ip, p_arp.src_ip)
+ eq_(self.dst_mac, p_arp.dst_mac)
+ eq_(self.dst_ip, p_arp.dst_ip)
+
+ # to string
+ eth_values = {'dst': self.dst_mac,
+ 'src': self.src_mac,
+ 'ethertype': ether.ETH_TYPE_8021Q}
+ _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k]))
+ for k, v in inspect.getmembers(p_eth)
+ if k in eth_values])
+ eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str)
+
+ vlan_values = {'pcp': 0b111,
+ 'cfi': 0b1,
+ 'vid': 3,
+ 'ethertype': ether.ETH_TYPE_ARP}
+ _vlan_str = ','.join(['%s=%s' % (k, repr(vlan_values[k]))
+ for k, v in inspect.getmembers(p_vlan)
+ if k in vlan_values])
+ vlan_str = '%s(%s)' % (vlan.vlan.__name__, _vlan_str)
+
+ arp_values = {'hwtype': 1,
+ 'proto': ether.ETH_TYPE_IP,
+ 'hlen': 6,
+ 'plen': 4,
+ 'opcode': 2,
+ 'src_mac': self.src_mac,
+ 'dst_mac': self.dst_mac,
+ 'src_ip': self.src_ip,
+ 'dst_ip': self.dst_ip}
+ _arp_str = ','.join(['%s=%s' % (k, repr(arp_values[k]))
+ for k, v in inspect.getmembers(p_arp)
+ if k in arp_values])
+ arp_str = '%s(%s)' % (arp.arp.__name__, _arp_str)
+
+ pkt_str = '%s, %s, %s' % (eth_str, vlan_str, arp_str)
+
+ eq_(eth_str, str(p_eth))
+ eq_(eth_str, repr(p_eth))
+
+ eq_(vlan_str, str(p_vlan))
+ eq_(vlan_str, repr(p_vlan))
+
+ eq_(arp_str, str(p_arp))
+ eq_(arp_str, repr(p_arp))
+
+ eq_(pkt_str, str(pkt))
+ eq_(pkt_str, repr(pkt))
+
+ def test_ipv4_udp(self):
+ # buid packet
+ e = ethernet.ethernet(self.dst_mac, self.src_mac,
+ ether.ETH_TYPE_IP)
+ ip = ipv4.ipv4(4, 5, 1, 0, 3, 1, 4, 64, inet.IPPROTO_UDP, 0,
+ self.src_ip, self.dst_ip)
+ u = udp.udp(0x190F, 0x1F90, 0, 0)
+
+ p = packet.Packet()
+ p.add_protocol(e)
+ p.add_protocol(ip)
+ p.add_protocol(u)
+ p.add_protocol(self.payload)
+ p.serialize()
+
+ # ethernet !6s6sH
+ e_buf = self.dst_mac_bin \
+ + self.src_mac_bin \
+ + b'\x08\x00'
+
+ # ipv4 !BBHHHBBHII
+ ip_buf = b'\x45' \
+ + b'\x01' \
+ + b'\x00\x3C' \
+ + b'\x00\x03' \
+ + b'\x20\x04' \
+ + b'\x40' \
+ + b'\x11' \
+ + b'\x00\x00' \
+ + self.src_ip_bin \
+ + self.dst_ip_bin
+
+ # udp !HHHH
+ u_buf = b'\x19\x0F' \
+ + b'\x1F\x90' \
+ + b'\x00\x28' \
+ + b'\x00\x00'
+
+ buf = e_buf + ip_buf + u_buf + self.payload
+
+ # parse
+ pkt = packet.Packet(p.data)
+ protocols = self.get_protocols(pkt)
+ p_eth = protocols['ethernet']
+ p_ipv4 = protocols['ipv4']
+ p_udp = protocols['udp']
+
+ # ethernet
+ ok_(p_eth)
+ eq_(self.dst_mac, p_eth.dst)
+ eq_(self.src_mac, p_eth.src)
+ eq_(ether.ETH_TYPE_IP, p_eth.ethertype)
+
+ # ipv4
+ ok_(p_ipv4)
+ eq_(4, p_ipv4.version)
+ eq_(5, p_ipv4.header_length)
+ eq_(1, p_ipv4.tos)
+ l = len(ip_buf) + len(u_buf) + len(self.payload)
+ eq_(l, p_ipv4.total_length)
+ eq_(3, p_ipv4.identification)
+ eq_(1, p_ipv4.flags)
+ eq_(64, p_ipv4.ttl)
+ eq_(inet.IPPROTO_UDP, p_ipv4.proto)
+ eq_(self.src_ip, p_ipv4.src)
+ eq_(self.dst_ip, p_ipv4.dst)
+ t = bytearray(ip_buf)
+ struct.pack_into('!H', t, 10, p_ipv4.csum)
+ eq_(packet_utils.checksum(t), 0)
+
+ # udp
+ ok_(p_udp)
+ eq_(0x190f, p_udp.src_port)
+ eq_(0x1F90, p_udp.dst_port)
+ eq_(len(u_buf) + len(self.payload), p_udp.total_length)
+ eq_(0x77b2, p_udp.csum)
+ t = bytearray(u_buf)
+ struct.pack_into('!H', t, 6, p_udp.csum)
+ ph = struct.pack('!4s4sBBH', self.src_ip_bin, self.dst_ip_bin, 0,
+ 17, len(u_buf) + len(self.payload))
+ t = ph + t + self.payload
+ eq_(packet_utils.checksum(t), 0)
+
+ # payload
+ ok_('payload' in protocols)
+ eq_(self.payload, protocols['payload'])
+
+ # to string
+ eth_values = {'dst': self.dst_mac,
+ 'src': self.src_mac,
+ 'ethertype': ether.ETH_TYPE_IP}
+ _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k]))
+ for k, v in inspect.getmembers(p_eth)
+ if k in eth_values])
+ eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str)
+
+ ipv4_values = {'version': 4,
+ 'header_length': 5,
+ 'tos': 1,
+ 'total_length': l,
+ 'identification': 3,
+ 'flags': 1,
+ 'offset': p_ipv4.offset,
+ 'ttl': 64,
+ 'proto': inet.IPPROTO_UDP,
+ 'csum': p_ipv4.csum,
+ 'src': self.src_ip,
+ 'dst': self.dst_ip,
+ 'option': None}
+ _ipv4_str = ','.join(['%s=%s' % (k, repr(ipv4_values[k]))
+ for k, v in inspect.getmembers(p_ipv4)
+ if k in ipv4_values])
+ ipv4_str = '%s(%s)' % (ipv4.ipv4.__name__, _ipv4_str)
+
+ udp_values = {'src_port': 0x190f,
+ 'dst_port': 0x1F90,
+ 'total_length': len(u_buf) + len(self.payload),
+ 'csum': 0x77b2}
+ _udp_str = ','.join(['%s=%s' % (k, repr(udp_values[k]))
+ for k, v in inspect.getmembers(p_udp)
+ if k in udp_values])
+ udp_str = '%s(%s)' % (udp.udp.__name__, _udp_str)
+
+ pkt_str = '%s, %s, %s, %s' % (eth_str, ipv4_str, udp_str,
+ repr(protocols['payload']))
+
+ eq_(eth_str, str(p_eth))
+ eq_(eth_str, repr(p_eth))
+
+ eq_(ipv4_str, str(p_ipv4))
+ eq_(ipv4_str, repr(p_ipv4))
+
+ eq_(udp_str, str(p_udp))
+ eq_(udp_str, repr(p_udp))
+
+ eq_(pkt_str, str(pkt))
+ eq_(pkt_str, repr(pkt))
+
+ def test_ipv4_tcp(self):
+ # buid packet
+ e = ethernet.ethernet(self.dst_mac, self.src_mac,
+ ether.ETH_TYPE_IP)
+ ip = ipv4.ipv4(4, 5, 0, 0, 0, 0, 0, 64, inet.IPPROTO_TCP, 0,
+ self.src_ip, self.dst_ip)
+ t = tcp.tcp(0x190F, 0x1F90, 0x123, 1, 6, 0b101010, 2048, 0, 0x6f,
+ b'\x01\x02')
+
+ p = packet.Packet()
+ p.add_protocol(e)
+ p.add_protocol(ip)
+ p.add_protocol(t)
+ p.add_protocol(self.payload)
+ p.serialize()
+
+ # ethernet !6s6sH
+ e_buf = self.dst_mac_bin \
+ + self.src_mac_bin \
+ + b'\x08\x00'
+
+ # ipv4 !BBHHHBBHII
+ ip_buf = b'\x45' \
+ + b'\x00' \
+ + b'\x00\x4C' \
+ + b'\x00\x00' \
+ + b'\x00\x00' \
+ + b'\x40' \
+ + b'\x06' \
+ + b'\x00\x00' \
+ + self.src_ip_bin \
+ + self.dst_ip_bin
+
+ # tcp !HHIIBBHHH + option
+ t_buf = b'\x19\x0F' \
+ + b'\x1F\x90' \
+ + b'\x00\x00\x01\x23' \
+ + b'\x00\x00\x00\x01' \
+ + b'\x60' \
+ + b'\x2A' \
+ + b'\x08\x00' \
+ + b'\x00\x00' \
+ + b'\x00\x6F' \
+ + b'\x01\x02\x00\x00'
+
+ buf = e_buf + ip_buf + t_buf + self.payload
+
+ # parse
+ pkt = packet.Packet(p.data)
+ protocols = self.get_protocols(pkt)
+ p_eth = protocols['ethernet']
+ p_ipv4 = protocols['ipv4']
+ p_tcp = protocols['tcp']
+
+ # ethernet
+ ok_(p_eth)
+ eq_(self.dst_mac, p_eth.dst)
+ eq_(self.src_mac, p_eth.src)
+ eq_(ether.ETH_TYPE_IP, p_eth.ethertype)
+
+ # ipv4
+ ok_(p_ipv4)
+ eq_(4, p_ipv4.version)
+ eq_(5, p_ipv4.header_length)
+ eq_(0, p_ipv4.tos)
+ l = len(ip_buf) + len(t_buf) + len(self.payload)
+ eq_(l, p_ipv4.total_length)
+ eq_(0, p_ipv4.identification)
+ eq_(0, p_ipv4.flags)
+ eq_(64, p_ipv4.ttl)
+ eq_(inet.IPPROTO_TCP, p_ipv4.proto)
+ eq_(self.src_ip, p_ipv4.src)
+ eq_(self.dst_ip, p_ipv4.dst)
+ t = bytearray(ip_buf)
+ struct.pack_into('!H', t, 10, p_ipv4.csum)
+ eq_(packet_utils.checksum(t), 0)
+
+ # tcp
+ ok_(p_tcp)
+ eq_(0x190f, p_tcp.src_port)
+ eq_(0x1F90, p_tcp.dst_port)
+ eq_(0x123, p_tcp.seq)
+ eq_(1, p_tcp.ack)
+ eq_(6, p_tcp.offset)
+ eq_(0b101010, p_tcp.bits)
+ eq_(2048, p_tcp.window_size)
+ eq_(0x6f, p_tcp.urgent)
+ eq_(len(t_buf), len(p_tcp))
+ t = bytearray(t_buf)
+ struct.pack_into('!H', t, 16, p_tcp.csum)
+ ph = struct.pack('!4s4sBBH', self.src_ip_bin, self.dst_ip_bin, 0,
+ 6, len(t_buf) + len(self.payload))
+ t = ph + t + self.payload
+ eq_(packet_utils.checksum(t), 0)
+
+ # payload
+ ok_('payload' in protocols)
+ eq_(self.payload, protocols['payload'])
+
+ # to string
+ eth_values = {'dst': self.dst_mac,
+ 'src': self.src_mac,
+ 'ethertype': ether.ETH_TYPE_IP}
+ _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k]))
+ for k, v in inspect.getmembers(p_eth)
+ if k in eth_values])
+ eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str)
+
+ ipv4_values = {'version': 4,
+ 'header_length': 5,
+ 'tos': 0,
+ 'total_length': l,
+ 'identification': 0,
+ 'flags': 0,
+ 'offset': p_ipv4.offset,
+ 'ttl': 64,
+ 'proto': inet.IPPROTO_TCP,
+ 'csum': p_ipv4.csum,
+ 'src': self.src_ip,
+ 'dst': self.dst_ip,
+ 'option': None}
+ _ipv4_str = ','.join(['%s=%s' % (k, repr(ipv4_values[k]))
+ for k, v in inspect.getmembers(p_ipv4)
+ if k in ipv4_values])
+ ipv4_str = '%s(%s)' % (ipv4.ipv4.__name__, _ipv4_str)
+
+ tcp_values = {'src_port': 0x190f,
+ 'dst_port': 0x1F90,
+ 'seq': 0x123,
+ 'ack': 1,
+ 'offset': 6,
+ 'bits': 0b101010,
+ 'window_size': 2048,
+ 'csum': p_tcp.csum,
+ 'urgent': 0x6f,
+ 'option': p_tcp.option}
+ _tcp_str = ','.join(['%s=%s' % (k, repr(tcp_values[k]))
+ for k, v in inspect.getmembers(p_tcp)
+ if k in tcp_values])
+ tcp_str = '%s(%s)' % (tcp.tcp.__name__, _tcp_str)
+
+ pkt_str = '%s, %s, %s, %s' % (eth_str, ipv4_str, tcp_str,
+ repr(protocols['payload']))
+
+ eq_(eth_str, str(p_eth))
+ eq_(eth_str, repr(p_eth))
+
+ eq_(ipv4_str, str(p_ipv4))
+ eq_(ipv4_str, repr(p_ipv4))
+
+ eq_(tcp_str, str(p_tcp))
+ eq_(tcp_str, repr(p_tcp))
+
+ eq_(pkt_str, str(pkt))
+ eq_(pkt_str, repr(pkt))
+
+ def test_ipv4_sctp(self):
+ # build packet
+ e = ethernet.ethernet()
+ ip = ipv4.ipv4(proto=inet.IPPROTO_SCTP)
+ s = sctp.sctp(chunks=[sctp.chunk_data(payload_data=self.payload)])
+
+ p = e / ip / s
+ p.serialize()
+
+ ipaddr = addrconv.ipv4.text_to_bin('0.0.0.0')
+
+ # ethernet !6s6sH
+ e_buf = b'\xff\xff\xff\xff\xff\xff' \
+ + b'\x00\x00\x00\x00\x00\x00' \
+ + b'\x08\x00'
+
+ # ipv4 !BBHHHBBHII
+ ip_buf = b'\x45' \
+ + b'\x00' \
+ + b'\x00\x50' \
+ + b'\x00\x00' \
+ + b'\x00\x00' \
+ + b'\xff' \
+ + b'\x84' \
+ + b'\x00\x00' \
+ + ipaddr \
+ + ipaddr
+
+ # sctp !HHII + chunk_data !BBHIHHI + payload
+ s_buf = b'\x00\x00' \
+ + b'\x00\x00' \
+ + b'\x00\x00\x00\x00' \
+ + b'\x00\x00\x00\x00' \
+ + b'\x00' \
+ + b'\x00' \
+ + b'\x00\x00' \
+ + b'\x00\x00\x00\x00' \
+ + b'\x00\x00' \
+ + b'\x00\x00' \
+ + b'\x00\x00\x00\x00' \
+ + self.payload
+
+ buf = e_buf + ip_buf + s_buf
+
+ # parse
+ pkt = packet.Packet(p.data)
+ protocols = self.get_protocols(pkt)
+ p_eth = protocols['ethernet']
+ p_ipv4 = protocols['ipv4']
+ p_sctp = protocols['sctp']
+
+ # ethernet
+ ok_(p_eth)
+ eq_('ff:ff:ff:ff:ff:ff', p_eth.dst)
+ eq_('00:00:00:00:00:00', p_eth.src)
+ eq_(ether.ETH_TYPE_IP, p_eth.ethertype)
+
+ # ipv4
+ ok_(p_ipv4)
+ eq_(4, p_ipv4.version)
+ eq_(5, p_ipv4.header_length)
+ eq_(0, p_ipv4.tos)
+ l = len(ip_buf) + len(s_buf)
+ eq_(l, p_ipv4.total_length)
+ eq_(0, p_ipv4.identification)
+ eq_(0, p_ipv4.flags)
+ eq_(255, p_ipv4.ttl)
+ eq_(inet.IPPROTO_SCTP, p_ipv4.proto)
+ eq_('10.0.0.1', p_ipv4.src)
+ eq_('10.0.0.2', p_ipv4.dst)
+ t = bytearray(ip_buf)
+ struct.pack_into('!H', t, 10, p_ipv4.csum)
+ eq_(packet_utils.checksum(t), 0x1403)
+
+ # sctp
+ ok_(p_sctp)
+ eq_(1, p_sctp.src_port)
+ eq_(1, p_sctp.dst_port)
+ eq_(0, p_sctp.vtag)
+ assert isinstance(p_sctp.chunks[0], sctp.chunk_data)
+ eq_(0, p_sctp.chunks[0]._type)
+ eq_(0, p_sctp.chunks[0].unordered)
+ eq_(0, p_sctp.chunks[0].begin)
+ eq_(0, p_sctp.chunks[0].end)
+ eq_(16 + len(self.payload), p_sctp.chunks[0].length)
+ eq_(0, p_sctp.chunks[0].tsn)
+ eq_(0, p_sctp.chunks[0].sid)
+ eq_(0, p_sctp.chunks[0].seq)
+ eq_(0, p_sctp.chunks[0].payload_id)
+ eq_(self.payload, p_sctp.chunks[0].payload_data)
+ eq_(len(s_buf), len(p_sctp))
+
+ # to string
+ eth_values = {'dst': 'ff:ff:ff:ff:ff:ff',
+ 'src': '00:00:00:00:00:00',
+ 'ethertype': ether.ETH_TYPE_IP}
+ _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k]))
+ for k, v in inspect.getmembers(p_eth)
+ if k in eth_values])
+ eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str)
+
+ ipv4_values = {'version': 4,
+ 'header_length': 5,
+ 'tos': 0,
+ 'total_length': l,
+ 'identification': 0,
+ 'flags': 0,
+ 'offset': 0,
+ 'ttl': 255,
+ 'proto': inet.IPPROTO_SCTP,
+ 'csum': p_ipv4.csum,
+ 'src': '10.0.0.1',
+ 'dst': '10.0.0.2',
+ 'option': None}
+ _ipv4_str = ','.join(['%s=%s' % (k, repr(ipv4_values[k]))
+ for k, v in inspect.getmembers(p_ipv4)
+ if k in ipv4_values])
+ ipv4_str = '%s(%s)' % (ipv4.ipv4.__name__, _ipv4_str)
+
+ data_values = {'unordered': 0,
+ 'begin': 0,
+ 'end': 0,
+ 'length': 16 + len(self.payload),
+ 'tsn': 0,
+ 'sid': 0,
+ 'seq': 0,
+ 'payload_id': 0,
+ 'payload_data': self.payload}
+ _data_str = ','.join(['%s=%s' % (k, repr(data_values[k]))
+ for k in sorted(data_values.keys())])
+ data_str = '[%s(%s)]' % (sctp.chunk_data.__name__, _data_str)
+
+ sctp_values = {'src_port': 1,
+ 'dst_port': 1,
+ 'vtag': 0,
+ 'csum': repr(p_sctp.csum),
+ 'chunks': data_str}
+ _sctp_str = ','.join(['%s=%s' % (k, sctp_values[k])
+ for k, _ in inspect.getmembers(p_sctp)
+ if k in sctp_values])
+ sctp_str = '%s(%s)' % (sctp.sctp.__name__, _sctp_str)
+
+ pkt_str = '%s, %s, %s' % (eth_str, ipv4_str, sctp_str)
+
+ eq_(eth_str, str(p_eth))
+ eq_(eth_str, repr(p_eth))
+
+ eq_(ipv4_str, str(p_ipv4))
+ eq_(ipv4_str, repr(p_ipv4))
+
+ eq_(sctp_str, str(p_sctp))
+ eq_(sctp_str, repr(p_sctp))
+
+ eq_(pkt_str, str(pkt))
+ eq_(pkt_str, repr(pkt))
+
+ def test_ipv4_icmp(self):
+ # buid packet
+ e = ethernet.ethernet()
+ ip = ipv4.ipv4(proto=inet.IPPROTO_ICMP)
+ ic = icmp.icmp()
+
+ p = e / ip / ic
+ p.serialize()
+
+ ipaddr = addrconv.ipv4.text_to_bin('0.0.0.0')
+
+ # ethernet !6s6sH
+ e_buf = b'\xff\xff\xff\xff\xff\xff' \
+ + b'\x00\x00\x00\x00\x00\x00' \
+ + b'\x08\x00'
+
+ # ipv4 !BBHHHBBHII
+ ip_buf = b'\x45' \
+ + b'\x00' \
+ + b'\x00\x1c' \
+ + b'\x00\x00' \
+ + b'\x00\x00' \
+ + b'\xff' \
+ + b'\x01' \
+ + b'\x00\x00' \
+ + ipaddr \
+ + ipaddr
+
+ # icmp !BBH + echo !HH
+ ic_buf = b'\x08' \
+ + b'\x00' \
+ + b'\x00\x00' \
+ + b'\x00\x00' \
+ + b'\x00\x00'
+
+ buf = e_buf + ip_buf + ic_buf
+
+ # parse
+ pkt = packet.Packet(p.data)
+ protocols = self.get_protocols(pkt)
+ p_eth = protocols['ethernet']
+ p_ipv4 = protocols['ipv4']
+ p_icmp = protocols['icmp']
+
+ # ethernet
+ ok_(p_eth)
+ eq_('ff:ff:ff:ff:ff:ff', p_eth.dst)
+ eq_('00:00:00:00:00:00', p_eth.src)
+ eq_(ether.ETH_TYPE_IP, p_eth.ethertype)
+
+ # ipv4
+ ok_(p_ipv4)
+ eq_(4, p_ipv4.version)
+ eq_(5, p_ipv4.header_length)
+ eq_(0, p_ipv4.tos)
+ l = len(ip_buf) + len(ic_buf)
+ eq_(l, p_ipv4.total_length)
+ eq_(0, p_ipv4.identification)
+ eq_(0, p_ipv4.flags)
+ eq_(255, p_ipv4.ttl)
+ eq_(inet.IPPROTO_ICMP, p_ipv4.proto)
+ eq_('10.0.0.1', p_ipv4.src)
+ eq_('10.0.0.2', p_ipv4.dst)
+ t = bytearray(ip_buf)
+ struct.pack_into('!H', t, 10, p_ipv4.csum)
+ eq_(packet_utils.checksum(t), 0x1403)
+
+ # icmp
+ ok_(p_icmp)
+ eq_(8, p_icmp.type)
+ eq_(0, p_icmp.code)
+ eq_(0, p_icmp.data.id)
+ eq_(0, p_icmp.data.seq)
+ eq_(len(ic_buf), len(p_icmp))
+ t = bytearray(ic_buf)
+ struct.pack_into('!H', t, 2, p_icmp.csum)
+ eq_(packet_utils.checksum(t), 0)
+
+ # to string
+ eth_values = {'dst': 'ff:ff:ff:ff:ff:ff',
+ 'src': '00:00:00:00:00:00',
+ 'ethertype': ether.ETH_TYPE_IP}
+ _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k]))
+ for k, _ in inspect.getmembers(p_eth)
+ if k in eth_values])
+ eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str)
+
+ ipv4_values = {'version': 4,
+ 'header_length': 5,
+ 'tos': 0,
+ 'total_length': l,
+ 'identification': 0,
+ 'flags': 0,
+ 'offset': p_ipv4.offset,
+ 'ttl': 255,
+ 'proto': inet.IPPROTO_ICMP,
+ 'csum': p_ipv4.csum,
+ 'src': '10.0.0.1',
+ 'dst': '10.0.0.2',
+ 'option': None}
+ _ipv4_str = ','.join(['%s=%s' % (k, repr(ipv4_values[k]))
+ for k, _ in inspect.getmembers(p_ipv4)
+ if k in ipv4_values])
+ ipv4_str = '%s(%s)' % (ipv4.ipv4.__name__, _ipv4_str)
+
+ echo_values = {'id': 0,
+ 'seq': 0,
+ 'data': None}
+ _echo_str = ','.join(['%s=%s' % (k, repr(echo_values[k]))
+ for k in sorted(echo_values.keys())])
+ echo_str = '%s(%s)' % (icmp.echo.__name__, _echo_str)
+ icmp_values = {'type': 8,
+ 'code': 0,
+ 'csum': p_icmp.csum,
+ 'data': echo_str}
+ _icmp_str = ','.join(['%s=%s' % (k, icmp_values[k])
+ for k, _ in inspect.getmembers(p_icmp)
+ if k in icmp_values])
+ icmp_str = '%s(%s)' % (icmp.icmp.__name__, _icmp_str)
+
+ pkt_str = '%s, %s, %s' % (eth_str, ipv4_str, icmp_str)
+
+ eq_(eth_str, str(p_eth))
+ eq_(eth_str, repr(p_eth))
+
+ eq_(ipv4_str, str(p_ipv4))
+ eq_(ipv4_str, repr(p_ipv4))
+
+ eq_(icmp_str, str(p_icmp))
+ eq_(icmp_str, repr(p_icmp))
+
+ eq_(pkt_str, str(pkt))
+ eq_(pkt_str, repr(pkt))
+
+ def test_ipv6_udp(self):
+ # build packet
+ e = ethernet.ethernet(ethertype=ether.ETH_TYPE_IPV6)
+ ip = ipv6.ipv6(nxt=inet.IPPROTO_UDP)
+ u = udp.udp()
+
+ p = e / ip / u / self.payload
+ p.serialize()
+
+ ipaddr = addrconv.ipv6.text_to_bin('::')
+
+ # ethernet !6s6sH
+ e_buf = b'\xff\xff\xff\xff\xff\xff' \
+ + b'\x00\x00\x00\x00\x00\x00' \
+ + b'\x86\xdd'
+
+ # ipv6 !IHBB16s16s'
+ ip_buf = b'\x60\x00\x00\x00' \
+ + b'\x00\x00' \
+ + b'\x11' \
+ + b'\xff' \
+ + b'\x00\x00' \
+ + ipaddr \
+ + ipaddr
+
+ # udp !HHHH
+ u_buf = b'\x00\x00' \
+ + b'\x00\x00' \
+ + b'\x00\x28' \
+ + b'\x00\x00'
+
+ buf = e_buf + ip_buf + u_buf + self.payload
+
+ # parse
+ pkt = packet.Packet(p.data)
+ protocols = self.get_protocols(pkt)
+ p_eth = protocols['ethernet']
+ p_ipv6 = protocols['ipv6']
+ p_udp = protocols['udp']
+
+ # ethernet
+ ok_(p_eth)
+ eq_('ff:ff:ff:ff:ff:ff', p_eth.dst)
+ eq_('00:00:00:00:00:00', p_eth.src)
+ eq_(ether.ETH_TYPE_IPV6, p_eth.ethertype)
+
+ # ipv6
+ ok_(p_ipv6)
+ eq_(6, p_ipv6.version)
+ eq_(0, p_ipv6.traffic_class)
+ eq_(0, p_ipv6.flow_label)
+ eq_(len(u_buf) + len(self.payload), p_ipv6.payload_length)
+ eq_(inet.IPPROTO_UDP, p_ipv6.nxt)
+ eq_(255, p_ipv6.hop_limit)
+ eq_('10::10', p_ipv6.src)
+ eq_('20::20', p_ipv6.dst)
+
+ # udp
+ ok_(p_udp)
+ eq_(1, p_udp.src_port)
+ eq_(1, p_udp.dst_port)
+ eq_(len(u_buf) + len(self.payload), p_udp.total_length)
+ eq_(0x2B60, p_udp.csum)
+ t = bytearray(u_buf)
+ struct.pack_into('!H', t, 6, p_udp.csum)
+ ph = struct.pack('!16s16sI3xB', ipaddr, ipaddr,
+ len(u_buf) + len(self.payload), 17)
+ t = ph + t + self.payload
+ eq_(packet_utils.checksum(t), 0x62)
+
+ # payload
+ ok_('payload' in protocols)
+ eq_(self.payload, protocols['payload'])
+
+ # to string
+ eth_values = {'dst': 'ff:ff:ff:ff:ff:ff',
+ 'src': '00:00:00:00:00:00',
+ 'ethertype': ether.ETH_TYPE_IPV6}
+ _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k]))
+ for k, v in inspect.getmembers(p_eth)
+ if k in eth_values])
+ eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str)
+
+ ipv6_values = {'version': 6,
+ 'traffic_class': 0,
+ 'flow_label': 0,
+ 'payload_length': len(u_buf) + len(self.payload),
+ 'nxt': inet.IPPROTO_UDP,
+ 'hop_limit': 255,
+ 'src': '10::10',
+ 'dst': '20::20',
+ 'ext_hdrs': []}
+ _ipv6_str = ','.join(['%s=%s' % (k, repr(ipv6_values[k]))
+ for k, v in inspect.getmembers(p_ipv6)
+ if k in ipv6_values])
+ ipv6_str = '%s(%s)' % (ipv6.ipv6.__name__, _ipv6_str)
+
+ udp_values = {'src_port': 1,
+ 'dst_port': 1,
+ 'total_length': len(u_buf) + len(self.payload),
+ 'csum': 0x2B60}
+ _udp_str = ','.join(['%s=%s' % (k, repr(udp_values[k]))
+ for k, v in inspect.getmembers(p_udp)
+ if k in udp_values])
+ udp_str = '%s(%s)' % (udp.udp.__name__, _udp_str)
+
+ pkt_str = '%s, %s, %s, %s' % (eth_str, ipv6_str, udp_str,
+ repr(protocols['payload']))
+
+ eq_(eth_str, str(p_eth))
+ eq_(eth_str, repr(p_eth))
+
+ eq_(ipv6_str, str(p_ipv6))
+ eq_(ipv6_str, repr(p_ipv6))
+
+ eq_(udp_str, str(p_udp))
+ eq_(udp_str, repr(p_udp))
+
+ eq_(pkt_str, str(pkt))
+ eq_(pkt_str, repr(pkt))
+
+ def test_ipv6_tcp(self):
+ # build packet
+ e = ethernet.ethernet(ethertype=ether.ETH_TYPE_IPV6)
+ ip = ipv6.ipv6()
+ t = tcp.tcp(option=b'\x01\x02')
+
+ p = e / ip / t / self.payload
+ p.serialize()
+
+ ipaddr = addrconv.ipv6.text_to_bin('::')
+
+ # ethernet !6s6sH
+ e_buf = b'\xff\xff\xff\xff\xff\xff' \
+ + b'\x00\x00\x00\x00\x00\x00' \
+ + b'\x86\xdd'
+
+ # ipv6 !IHBB16s16s'
+ ip_buf = b'\x60\x00\x00\x00' \
+ + b'\x00\x00' \
+ + b'\x06' \
+ + b'\xff' \
+ + b'\x00\x00' \
+ + ipaddr \
+ + ipaddr
+
+ # tcp !HHIIBBHHH + option
+ t_buf = b'\x00\x00' \
+ + b'\x00\x00' \
+ + b'\x00\x00\x00\x00' \
+ + b'\x00\x00\x00\x00' \
+ + b'\x60' \
+ + b'\x00' \
+ + b'\x00\x00' \
+ + b'\x00\x00' \
+ + b'\x00\x00' \
+ + b'\x01\x02\x00\x00'
+
+ buf = e_buf + ip_buf + t_buf + self.payload
+
+ # parse
+ pkt = packet.Packet(p.data)
+ protocols = self.get_protocols(pkt)
+ p_eth = protocols['ethernet']
+ p_ipv6 = protocols['ipv6']
+ p_tcp = protocols['tcp']
+
+ # ethernet
+ ok_(p_eth)
+ eq_('ff:ff:ff:ff:ff:ff', p_eth.dst)
+ eq_('00:00:00:00:00:00', p_eth.src)
+ eq_(ether.ETH_TYPE_IPV6, p_eth.ethertype)
+
+ # ipv6
+ ok_(p_ipv6)
+ eq_(6, p_ipv6.version)
+ eq_(0, p_ipv6.traffic_class)
+ eq_(0, p_ipv6.flow_label)
+ eq_(len(t_buf) + len(self.payload), p_ipv6.payload_length)
+ eq_(inet.IPPROTO_TCP, p_ipv6.nxt)
+ eq_(255, p_ipv6.hop_limit)
+ eq_('10::10', p_ipv6.src)
+ eq_('20::20', p_ipv6.dst)
+
+ # tcp
+ ok_(p_tcp)
+ eq_(1, p_tcp.src_port)
+ eq_(1, p_tcp.dst_port)
+ eq_(0, p_tcp.seq)
+ eq_(0, p_tcp.ack)
+ eq_(6, p_tcp.offset)
+ eq_(0, p_tcp.bits)
+ eq_(0, p_tcp.window_size)
+ eq_(0, p_tcp.urgent)
+ eq_(len(t_buf), len(p_tcp))
+ t = bytearray(t_buf)
+ struct.pack_into('!H', t, 16, p_tcp.csum)
+ ph = struct.pack('!16s16sI3xB', ipaddr, ipaddr,
+ len(t_buf) + len(self.payload), 6)
+ t = ph + t + self.payload
+ eq_(packet_utils.checksum(t), 0x62)
+
+ # payload
+ ok_('payload' in protocols)
+ eq_(self.payload, protocols['payload'])
+
+ # to string
+ eth_values = {'dst': 'ff:ff:ff:ff:ff:ff',
+ 'src': '00:00:00:00:00:00',
+ 'ethertype': ether.ETH_TYPE_IPV6}
+ _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k]))
+ for k, v in inspect.getmembers(p_eth)
+ if k in eth_values])
+ eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str)
+
+ ipv6_values = {'version': 6,
+ 'traffic_class': 0,
+ 'flow_label': 0,
+ 'payload_length': len(t_buf) + len(self.payload),
+ 'nxt': inet.IPPROTO_TCP,
+ 'hop_limit': 255,
+ 'src': '10::10',
+ 'dst': '20::20',
+ 'ext_hdrs': []}
+ _ipv6_str = ','.join(['%s=%s' % (k, repr(ipv6_values[k]))
+ for k, v in inspect.getmembers(p_ipv6)
+ if k in ipv6_values])
+ ipv6_str = '%s(%s)' % (ipv6.ipv6.__name__, _ipv6_str)
+
+ tcp_values = {'src_port': 1,
+ 'dst_port': 1,
+ 'seq': 0,
+ 'ack': 0,
+ 'offset': 6,
+ 'bits': 0,
+ 'window_size': 0,
+ 'csum': p_tcp.csum,
+ 'urgent': 0,
+ 'option': p_tcp.option}
+ _tcp_str = ','.join(['%s=%s' % (k, repr(tcp_values[k]))
+ for k, v in inspect.getmembers(p_tcp)
+ if k in tcp_values])
+ tcp_str = '%s(%s)' % (tcp.tcp.__name__, _tcp_str)
+
+ pkt_str = '%s, %s, %s, %s' % (eth_str, ipv6_str, tcp_str,
+ repr(protocols['payload']))
+
+ eq_(eth_str, str(p_eth))
+ eq_(eth_str, repr(p_eth))
+
+ eq_(ipv6_str, str(p_ipv6))
+ eq_(ipv6_str, repr(p_ipv6))
+
+ eq_(tcp_str, str(p_tcp))
+ eq_(tcp_str, repr(p_tcp))
+
+ eq_(pkt_str, str(pkt))
+ eq_(pkt_str, repr(pkt))
+
+ def test_ipv6_sctp(self):
+ # build packet
+ e = ethernet.ethernet(ethertype=ether.ETH_TYPE_IPV6)
+ ip = ipv6.ipv6(nxt=inet.IPPROTO_SCTP)
+ s = sctp.sctp(chunks=[sctp.chunk_data(payload_data=self.payload)])
+
+ p = e / ip / s
+ p.serialize()
+
+ ipaddr = addrconv.ipv6.text_to_bin('::')
+
+ # ethernet !6s6sH
+ e_buf = b'\xff\xff\xff\xff\xff\xff' \
+ + b'\x00\x00\x00\x00\x00\x00' \
+ + b'\x86\xdd'
+
+ # ipv6 !IHBB16s16s'
+ ip_buf = b'\x60\x00\x00\x00' \
+ + b'\x00\x00' \
+ + b'\x84' \
+ + b'\xff' \
+ + b'\x00\x00' \
+ + ipaddr \
+ + ipaddr
+
+ # sctp !HHII + chunk_data !BBHIHHI + payload
+ s_buf = b'\x00\x00' \
+ + b'\x00\x00' \
+ + b'\x00\x00\x00\x00' \
+ + b'\x00\x00\x00\x00' \
+ + b'\x00' \
+ + b'\x00' \
+ + b'\x00\x00' \
+ + b'\x00\x00\x00\x00' \
+ + b'\x00\x00' \
+ + b'\x00\x00' \
+ + b'\x00\x00\x00\x00' \
+ + self.payload
+
+ buf = e_buf + ip_buf + s_buf
+
+ # parse
+ pkt = packet.Packet(p.data)
+ protocols = self.get_protocols(pkt)
+ p_eth = protocols['ethernet']
+ p_ipv6 = protocols['ipv6']
+ p_sctp = protocols['sctp']
+
+ # ethernet
+ ok_(p_eth)
+ eq_('ff:ff:ff:ff:ff:ff', p_eth.dst)
+ eq_('00:00:00:00:00:00', p_eth.src)
+ eq_(ether.ETH_TYPE_IPV6, p_eth.ethertype)
+
+ # ipv6
+ ok_(p_ipv6)
+ eq_(6, p_ipv6.version)
+ eq_(0, p_ipv6.traffic_class)
+ eq_(0, p_ipv6.flow_label)
+ eq_(len(s_buf), p_ipv6.payload_length)
+ eq_(inet.IPPROTO_SCTP, p_ipv6.nxt)
+ eq_(255, p_ipv6.hop_limit)
+ eq_('10::10', p_ipv6.src)
+ eq_('20::20', p_ipv6.dst)
+
+ # sctp
+ ok_(p_sctp)
+ eq_(1, p_sctp.src_port)
+ eq_(1, p_sctp.dst_port)
+ eq_(0, p_sctp.vtag)
+ assert isinstance(p_sctp.chunks[0], sctp.chunk_data)
+ eq_(0, p_sctp.chunks[0]._type)
+ eq_(0, p_sctp.chunks[0].unordered)
+ eq_(0, p_sctp.chunks[0].begin)
+ eq_(0, p_sctp.chunks[0].end)
+ eq_(16 + len(self.payload), p_sctp.chunks[0].length)
+ eq_(0, p_sctp.chunks[0].tsn)
+ eq_(0, p_sctp.chunks[0].sid)
+ eq_(0, p_sctp.chunks[0].seq)
+ eq_(0, p_sctp.chunks[0].payload_id)
+ eq_(self.payload, p_sctp.chunks[0].payload_data)
+ eq_(len(s_buf), len(p_sctp))
+
+ # to string
+ eth_values = {'dst': 'ff:ff:ff:ff:ff:ff',
+ 'src': '00:00:00:00:00:00',
+ 'ethertype': ether.ETH_TYPE_IPV6}
+ _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k]))
+ for k, v in inspect.getmembers(p_eth)
+ if k in eth_values])
+ eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str)
+
+ ipv6_values = {'version': 6,
+ 'traffic_class': 0,
+ 'flow_label': 0,
+ 'payload_length': len(s_buf),
+ 'nxt': inet.IPPROTO_SCTP,
+ 'hop_limit': 255,
+ 'src': '10::10',
+ 'dst': '20::20',
+ 'ext_hdrs': []}
+ _ipv6_str = ','.join(['%s=%s' % (k, repr(ipv6_values[k]))
+ for k, v in inspect.getmembers(p_ipv6)
+ if k in ipv6_values])
+ ipv6_str = '%s(%s)' % (ipv6.ipv6.__name__, _ipv6_str)
+
+ data_values = {'unordered': 0,
+ 'begin': 0,
+ 'end': 0,
+ 'length': 16 + len(self.payload),
+ 'tsn': 0,
+ 'sid': 0,
+ 'seq': 0,
+ 'payload_id': 0,
+ 'payload_data': self.payload}
+ _data_str = ','.join(['%s=%s' % (k, repr(data_values[k]))
+ for k in sorted(data_values.keys())])
+ data_str = '[%s(%s)]' % (sctp.chunk_data.__name__, _data_str)
+
+ sctp_values = {'src_port': 1,
+ 'dst_port': 1,
+ 'vtag': 0,
+ 'csum': repr(p_sctp.csum),
+ 'chunks': data_str}
+ _sctp_str = ','.join(['%s=%s' % (k, sctp_values[k])
+ for k, _ in inspect.getmembers(p_sctp)
+ if k in sctp_values])
+ sctp_str = '%s(%s)' % (sctp.sctp.__name__, _sctp_str)
+
+ pkt_str = '%s, %s, %s' % (eth_str, ipv6_str, sctp_str)
+
+ eq_(eth_str, str(p_eth))
+ eq_(eth_str, repr(p_eth))
+
+ eq_(ipv6_str, str(p_ipv6))
+ eq_(ipv6_str, repr(p_ipv6))
+
+ eq_(sctp_str, str(p_sctp))
+ eq_(sctp_str, repr(p_sctp))
+
+ eq_(pkt_str, str(pkt))
+ eq_(pkt_str, repr(pkt))
+
+ def test_ipv6_icmpv6(self):
+ # build packet
+ e = ethernet.ethernet(ethertype=ether.ETH_TYPE_IPV6)
+ ip = ipv6.ipv6(nxt=inet.IPPROTO_ICMPV6)
+ ic = icmpv6.icmpv6()
+
+ p = e / ip / ic
+ p.serialize()
+
+ ipaddr = addrconv.ipv6.text_to_bin('::')
+
+ # ethernet !6s6sH
+ e_buf = b'\xff\xff\xff\xff\xff\xff' \
+ + b'\x00\x00\x00\x00\x00\x00' \
+ + b'\x86\xdd'
+
+ # ipv6 !IHBB16s16s'
+ ip_buf = b'\x60\x00\x00\x00' \
+ + b'\x00\x00' \
+ + b'\x3a' \
+ + b'\xff' \
+ + b'\x00\x00' \
+ + ipaddr \
+ + ipaddr
+
+ # icmpv6 !BBH
+ ic_buf = b'\x00' \
+ + b'\x00' \
+ + b'\x00\x00'
+
+ buf = e_buf + ip_buf + ic_buf
+
+ # parse
+ pkt = packet.Packet(p.data)
+ protocols = self.get_protocols(pkt)
+ p_eth = protocols['ethernet']
+ p_ipv6 = protocols['ipv6']
+ p_icmpv6 = protocols['icmpv6']
+
+ # ethernet
+ ok_(p_eth)
+ eq_('ff:ff:ff:ff:ff:ff', p_eth.dst)
+ eq_('00:00:00:00:00:00', p_eth.src)
+ eq_(ether.ETH_TYPE_IPV6, p_eth.ethertype)
+
+ # ipv6
+ ok_(p_ipv6)
+ eq_(6, p_ipv6.version)
+ eq_(0, p_ipv6.traffic_class)
+ eq_(0, p_ipv6.flow_label)
+ eq_(len(ic_buf), p_ipv6.payload_length)
+ eq_(inet.IPPROTO_ICMPV6, p_ipv6.nxt)
+ eq_(255, p_ipv6.hop_limit)
+ eq_('10::10', p_ipv6.src)
+ eq_('20::20', p_ipv6.dst)
+
+ # icmpv6
+ ok_(p_icmpv6)
+ eq_(0, p_icmpv6.type_)
+ eq_(0, p_icmpv6.code)
+ eq_(len(ic_buf), len(p_icmpv6))
+ t = bytearray(ic_buf)
+ struct.pack_into('!H', t, 2, p_icmpv6.csum)
+ ph = struct.pack('!16s16sI3xB', ipaddr, ipaddr, len(ic_buf), 58)
+ t = ph + t
+ eq_(packet_utils.checksum(t), 0x60)
+
+ # to string
+ eth_values = {'dst': 'ff:ff:ff:ff:ff:ff',
+ 'src': '00:00:00:00:00:00',
+ 'ethertype': ether.ETH_TYPE_IPV6}
+ _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k]))
+ for k, _ in inspect.getmembers(p_eth)
+ if k in eth_values])
+ eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str)
+
+ ipv6_values = {'version': 6,
+ 'traffic_class': 0,
+ 'flow_label': 0,
+ 'payload_length': len(ic_buf),
+ 'nxt': inet.IPPROTO_ICMPV6,
+ 'hop_limit': 255,
+ 'src': '10::10',
+ 'dst': '20::20',
+ 'ext_hdrs': []}
+ _ipv6_str = ','.join(['%s=%s' % (k, repr(ipv6_values[k]))
+ for k, _ in inspect.getmembers(p_ipv6)
+ if k in ipv6_values])
+ ipv6_str = '%s(%s)' % (ipv6.ipv6.__name__, _ipv6_str)
+
+ icmpv6_values = {'type_': 0,
+ 'code': 0,
+ 'csum': p_icmpv6.csum,
+ 'data': b''}
+ _icmpv6_str = ','.join(['%s=%s' % (k, repr(icmpv6_values[k]))
+ for k, _ in inspect.getmembers(p_icmpv6)
+ if k in icmpv6_values])
+ icmpv6_str = '%s(%s)' % (icmpv6.icmpv6.__name__, _icmpv6_str)
+
+ pkt_str = '%s, %s, %s' % (eth_str, ipv6_str, icmpv6_str)
+
+ eq_(eth_str, str(p_eth))
+ eq_(eth_str, repr(p_eth))
+
+ eq_(ipv6_str, str(p_ipv6))
+ eq_(ipv6_str, repr(p_ipv6))
+
+ eq_(icmpv6_str, str(p_icmpv6))
+ eq_(icmpv6_str, repr(p_icmpv6))
+
+ eq_(pkt_str, str(pkt))
+ eq_(pkt_str, repr(pkt))
+
+ def test_llc_bpdu(self):
+ # buid packet
+ e = ethernet.ethernet(self.dst_mac, self.src_mac,
+ ether.ETH_TYPE_IEEE802_3)
+ llc_control = llc.ControlFormatU(0, 0, 0)
+ l = llc.llc(llc.SAP_BPDU, llc.SAP_BPDU, llc_control)
+ b = bpdu.ConfigurationBPDUs(flags=0,
+ root_priority=32768,
+ root_system_id_extension=0,
+ root_mac_address=self.src_mac,
+ root_path_cost=0,
+ bridge_priority=32768,
+ bridge_system_id_extension=0,
+ bridge_mac_address=self.dst_mac,
+ port_priority=128,
+ port_number=4,
+ message_age=1,
+ max_age=20,
+ hello_time=2,
+ forward_delay=15)
+
+ p = packet.Packet()
+ p.add_protocol(e)
+ p.add_protocol(l)
+ p.add_protocol(b)
+ p.serialize()
+
+ # ethernet !6s6sH
+ e_buf = self.dst_mac_bin + self.src_mac_bin + b'\x05\xdc'
+
+ # llc !BBB
+ l_buf = (b'\x42'
+ b'\x42'
+ b'\x03')
+
+ # bpdu !HBBBQIQHHHHH
+ b_buf = (b'\x00\x00'
+ b'\x00'
+ b'\x00'
+ b'\x00'
+ b'\x80\x00\xbb\xbb\xbb\xbb\xbb\xbb'
+ b'\x00\x00\x00\x00'
+ b'\x80\x00\xaa\xaa\xaa\xaa\xaa\xaa'
+ b'\x80\x04'
+ b'\x01\x00'
+ b'\x14\x00'
+ b'\x02\x00'
+ b'\x0f\x00')
+
+ buf = e_buf + l_buf + b_buf
+
+ # Append padding if ethernet frame is less than 60 bytes length
+ pad_len = 60 - len(buf)
+ if pad_len > 0:
+ buf += b'\x00' * pad_len
+ eq_(buf, p.data)
+
+ # parse
+ pkt = packet.Packet(p.data)
+ protocols = self.get_protocols(pkt)
+ p_eth = protocols['ethernet']
+ p_llc = protocols['llc']
+ p_bpdu = protocols['ConfigurationBPDUs']
+
+ # ethernet
+ ok_(p_eth)
+ eq_(self.dst_mac, p_eth.dst)
+ eq_(self.src_mac, p_eth.src)
+ eq_(ether.ETH_TYPE_IEEE802_3, p_eth.ethertype)
+
+ # llc
+ ok_(p_llc)
+ eq_(llc.SAP_BPDU, p_llc.dsap_addr)
+ eq_(llc.SAP_BPDU, p_llc.ssap_addr)
+ eq_(0, p_llc.control.modifier_function1)
+ eq_(0, p_llc.control.pf_bit)
+ eq_(0, p_llc.control.modifier_function2)
+
+ # bpdu
+ ok_(p_bpdu)
+ eq_(bpdu.PROTOCOL_IDENTIFIER, p_bpdu._protocol_id)
+ eq_(bpdu.PROTOCOLVERSION_ID_BPDU, p_bpdu._version_id)
+ eq_(bpdu.TYPE_CONFIG_BPDU, p_bpdu._bpdu_type)
+ eq_(0, p_bpdu.flags)
+ eq_(32768, p_bpdu.root_priority)
+ eq_(0, p_bpdu.root_system_id_extension)
+ eq_(self.src_mac, p_bpdu.root_mac_address)
+ eq_(0, p_bpdu.root_path_cost)
+ eq_(32768, p_bpdu.bridge_priority)
+ eq_(0, p_bpdu.bridge_system_id_extension)
+ eq_(self.dst_mac, p_bpdu.bridge_mac_address)
+ eq_(128, p_bpdu.port_priority)
+ eq_(4, p_bpdu.port_number)
+ eq_(1, p_bpdu.message_age)
+ eq_(20, p_bpdu.max_age)
+ eq_(2, p_bpdu.hello_time)
+ eq_(15, p_bpdu.forward_delay)
+
+ # to string
+ eth_values = {'dst': self.dst_mac,
+ 'src': self.src_mac,
+ 'ethertype': ether.ETH_TYPE_IEEE802_3}
+ _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k]))
+ for k, v in inspect.getmembers(p_eth)
+ if k in eth_values])
+ eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str)
+
+ ctrl_values = {'modifier_function1': 0,
+ 'pf_bit': 0,
+ 'modifier_function2': 0}
+ _ctrl_str = ','.join(['%s=%s' % (k, repr(ctrl_values[k]))
+ for k, v in inspect.getmembers(p_llc.control)
+ if k in ctrl_values])
+ ctrl_str = '%s(%s)' % (llc.ControlFormatU.__name__, _ctrl_str)
+
+ llc_values = {'dsap_addr': repr(llc.SAP_BPDU),
+ 'ssap_addr': repr(llc.SAP_BPDU),
+ 'control': ctrl_str}
+ _llc_str = ','.join(['%s=%s' % (k, llc_values[k])
+ for k, v in inspect.getmembers(p_llc)
+ if k in llc_values])
+ llc_str = '%s(%s)' % (llc.llc.__name__, _llc_str)
+
+ _long = int if six.PY3 else long
+ bpdu_values = {'flags': 0,
+ 'root_priority': _long(32768),
+ 'root_system_id_extension': _long(0),
+ 'root_mac_address': self.src_mac,
+ 'root_path_cost': 0,
+ 'bridge_priority': _long(32768),
+ 'bridge_system_id_extension': _long(0),
+ 'bridge_mac_address': self.dst_mac,
+ 'port_priority': 128,
+ 'port_number': 4,
+ 'message_age': float(1),
+ 'max_age': float(20),
+ 'hello_time': float(2),
+ 'forward_delay': float(15)}
+ _bpdu_str = ','.join(['%s=%s' % (k, repr(bpdu_values[k]))
+ for k, v in inspect.getmembers(p_bpdu)
+ if k in bpdu_values])
+ bpdu_str = '%s(%s)' % (bpdu.ConfigurationBPDUs.__name__, _bpdu_str)
+
+ pkt_str = '%s, %s, %s' % (eth_str, llc_str, bpdu_str)
+
+ eq_(eth_str, str(p_eth))
+ eq_(eth_str, repr(p_eth))
+
+ eq_(llc_str, str(p_llc))
+ eq_(llc_str, repr(p_llc))
+
+ eq_(bpdu_str, str(p_bpdu))
+ eq_(bpdu_str, repr(p_bpdu))
+
+ eq_(pkt_str, str(pkt))
+ eq_(pkt_str, repr(pkt))
+
+ def test_div_api(self):
+ e = ethernet.ethernet(self.dst_mac, self.src_mac, ether.ETH_TYPE_IP)
+ i = ipv4.ipv4()
+ u = udp.udp(self.src_port, self.dst_port)
+ pkt = e / i / u
+ ok_(isinstance(pkt, packet.Packet))
+ ok_(isinstance(pkt.protocols[0], ethernet.ethernet))
+ ok_(isinstance(pkt.protocols[1], ipv4.ipv4))
+ ok_(isinstance(pkt.protocols[2], udp.udp))