1 # Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 # vim: tabstop=4 shiftwidth=4 softtabstop=4
22 from nose.tools import *
23 from ryu.ofproto import ether
24 from ryu.lib.packet.ethernet import ethernet
25 from ryu.lib.packet.packet import Packet
26 from ryu.lib.packet.arp import arp
27 from ryu.lib.packet.vlan import vlan
28 from ryu.lib import addrconv
31 LOG = logging.getLogger('test_arp')
34 class Test_arp(unittest.TestCase):
43 src_mac = '00:07:0d:af:f4:54'
44 src_ip = '24.166.172.1'
45 dst_mac = '00:00:00:00:00:00'
46 dst_ip = '24.166.173.159'
49 buf = pack(fmt, hwtype, proto, hlen, plen, opcode,
50 addrconv.mac.text_to_bin(src_mac),
51 addrconv.ipv4.text_to_bin(src_ip),
52 addrconv.mac.text_to_bin(dst_mac),
53 addrconv.ipv4.text_to_bin(dst_ip))
55 a = arp(hwtype, proto, hlen, plen, opcode, src_mac, src_ip, dst_mac,
64 def find_protocol(self, pkt, name):
65 for p in pkt.protocols:
66 if p.protocol_name == name:
70 eq_(self.hwtype, self.a.hwtype)
71 eq_(self.proto, self.a.proto)
72 eq_(self.hlen, self.a.hlen)
73 eq_(self.plen, self.a.plen)
74 eq_(self.opcode, self.a.opcode)
75 eq_(self.src_mac, self.a.src_mac)
76 eq_(self.src_ip, self.a.src_ip)
77 eq_(self.dst_mac, self.a.dst_mac)
78 eq_(self.dst_ip, self.a.dst_ip)
80 def test_parser(self):
81 _res = self.a.parser(self.buf)
82 if type(_res) is tuple:
87 eq_(res.hwtype, self.hwtype)
88 eq_(res.proto, self.proto)
89 eq_(res.hlen, self.hlen)
90 eq_(res.plen, self.plen)
91 eq_(res.opcode, self.opcode)
92 eq_(res.src_mac, self.src_mac)
93 eq_(res.src_ip, self.src_ip)
94 eq_(res.dst_mac, self.dst_mac)
95 eq_(res.dst_ip, self.dst_ip)
97 def test_serialize(self):
100 buf = self.a.serialize(data, prev)
103 res = struct.unpack(fmt, buf)
105 eq_(res[0], self.hwtype)
106 eq_(res[1], self.proto)
107 eq_(res[2], self.hlen)
108 eq_(res[3], self.plen)
109 eq_(res[4], self.opcode)
110 eq_(res[5], addrconv.mac.text_to_bin(self.src_mac))
111 eq_(res[6], addrconv.ipv4.text_to_bin(self.src_ip))
112 eq_(res[7], addrconv.mac.text_to_bin(self.dst_mac))
113 eq_(res[8], addrconv.ipv4.text_to_bin(self.dst_ip))
115 def _build_arp(self, vlan_enabled):
116 if vlan_enabled is True:
117 ethertype = ether.ETH_TYPE_8021Q
118 v = vlan(1, 1, 3, ether.ETH_TYPE_ARP)
120 ethertype = ether.ETH_TYPE_ARP
121 e = ethernet(self.dst_mac, self.src_mac, ethertype)
125 if vlan_enabled is True:
127 p.add_protocol(self.a)
131 def test_build_arp_vlan(self):
132 p = self._build_arp(True)
134 e = self.find_protocol(p, "ethernet")
136 eq_(e.ethertype, ether.ETH_TYPE_8021Q)
138 v = self.find_protocol(p, "vlan")
140 eq_(v.ethertype, ether.ETH_TYPE_ARP)
142 a = self.find_protocol(p, "arp")
145 eq_(a.hwtype, self.hwtype)
146 eq_(a.proto, self.proto)
147 eq_(a.hlen, self.hlen)
148 eq_(a.plen, self.plen)
149 eq_(a.opcode, self.opcode)
150 eq_(a.src_mac, self.src_mac)
151 eq_(a.src_ip, self.src_ip)
152 eq_(a.dst_mac, self.dst_mac)
153 eq_(a.dst_ip, self.dst_ip)
155 def test_build_arp_novlan(self):
156 p = self._build_arp(False)
158 e = self.find_protocol(p, "ethernet")
160 eq_(e.ethertype, ether.ETH_TYPE_ARP)
162 a = self.find_protocol(p, "arp")
165 eq_(a.hwtype, self.hwtype)
166 eq_(a.proto, self.proto)
167 eq_(a.hlen, self.hlen)
168 eq_(a.plen, self.plen)
169 eq_(a.opcode, self.opcode)
170 eq_(a.src_mac, self.src_mac)
171 eq_(a.src_ip, self.src_ip)
172 eq_(a.dst_mac, self.dst_mac)
173 eq_(a.dst_ip, self.dst_ip)
176 def test_malformed_arp(self):
177 m_short_buf = self.buf[1:arp._MIN_LEN]
178 arp.parser(m_short_buf)
181 jsondict = self.a.to_jsondict()
182 a = arp.from_jsondict(jsondict['arp'])
183 eq_(str(self.a), str(a))