1 # Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 # vim: tabstop=4 shiftwidth=4 softtabstop=4
22 from nose.tools import *
23 from ryu.ofproto import ether, inet
24 from ryu.lib.packet.ethernet import ethernet
25 from ryu.lib.packet.packet import Packet
26 from ryu.lib.packet.ipv4 import ipv4
27 from ryu.lib.packet.vlan import vlan
28 from ryu.lib.packet.vlan import svlan
31 LOG = logging.getLogger('test_vlan')
34 class Test_vlan(unittest.TestCase):
35 """ Test case for vlan
41 tci = pcp << 15 | cfi << 12 | vid
42 ethertype = ether.ETH_TYPE_IP
44 buf = pack(vlan._PACK_STR, tci, ethertype)
46 v = vlan(pcp, cfi, vid, ethertype)
54 def find_protocol(self, pkt, name):
55 for p in pkt.protocols:
56 if p.protocol_name == name:
60 eq_(self.pcp, self.v.pcp)
61 eq_(self.cfi, self.v.cfi)
62 eq_(self.vid, self.v.vid)
63 eq_(self.ethertype, self.v.ethertype)
65 def test_parser(self):
66 res, ptype, _ = self.v.parser(self.buf)
68 eq_(res.pcp, self.pcp)
69 eq_(res.cfi, self.cfi)
70 eq_(res.vid, self.vid)
71 eq_(res.ethertype, self.ethertype)
74 def test_serialize(self):
77 buf = self.v.serialize(data, prev)
80 res = struct.unpack(fmt, buf)
83 eq_(res[1], self.ethertype)
85 def _build_vlan(self):
86 src_mac = '00:07:0d:af:f4:54'
87 dst_mac = '00:00:00:00:00:00'
88 ethertype = ether.ETH_TYPE_8021Q
89 e = ethernet(dst_mac, src_mac, ethertype)
95 identification = 0x8a5d
99 proto = inet.IPPROTO_ICMP
101 src = '131.151.32.21'
102 dst = '131.151.32.129'
104 ip = ipv4(version, header_length, tos, total_length, identification,
105 flags, offset, ttl, proto, csum, src, dst, option)
110 p.add_protocol(self.v)
116 def test_build_vlan(self):
117 p = self._build_vlan()
119 e = self.find_protocol(p, "ethernet")
121 eq_(e.ethertype, ether.ETH_TYPE_8021Q)
123 v = self.find_protocol(p, "vlan")
125 eq_(v.ethertype, ether.ETH_TYPE_IP)
127 ip = self.find_protocol(p, "ipv4")
133 eq_(v.ethertype, self.ethertype)
136 def test_malformed_vlan(self):
137 m_short_buf = self.buf[1:vlan._MIN_LEN]
138 vlan.parser(m_short_buf)
141 jsondict = self.v.to_jsondict()
142 v = vlan.from_jsondict(jsondict['vlan'])
143 eq_(str(self.v), str(v))
146 class Test_svlan(unittest.TestCase):
151 tci = pcp << 15 | cfi << 12 | vid
152 ethertype = ether.ETH_TYPE_8021Q
154 buf = pack(svlan._PACK_STR, tci, ethertype)
156 sv = svlan(pcp, cfi, vid, ethertype)
164 def find_protocol(self, pkt, name):
165 for p in pkt.protocols:
166 if p.protocol_name == name:
170 eq_(self.pcp, self.sv.pcp)
171 eq_(self.cfi, self.sv.cfi)
172 eq_(self.vid, self.sv.vid)
173 eq_(self.ethertype, self.sv.ethertype)
175 def test_parser(self):
176 res, ptype, _ = self.sv.parser(self.buf)
178 eq_(res.pcp, self.pcp)
179 eq_(res.cfi, self.cfi)
180 eq_(res.vid, self.vid)
181 eq_(res.ethertype, self.ethertype)
184 def test_serialize(self):
187 buf = self.sv.serialize(data, prev)
189 fmt = svlan._PACK_STR
190 res = struct.unpack(fmt, buf)
192 eq_(res[0], self.tci)
193 eq_(res[1], self.ethertype)
195 def _build_svlan(self):
196 src_mac = '00:07:0d:af:f4:54'
197 dst_mac = '00:00:00:00:00:00'
198 ethertype = ether.ETH_TYPE_8021AD
199 e = ethernet(dst_mac, src_mac, ethertype)
204 tci = pcp << 15 | cfi << 12 | vid
205 ethertype = ether.ETH_TYPE_IP
206 v = vlan(pcp, cfi, vid, ethertype)
212 identification = 0x8a5d
216 proto = inet.IPPROTO_ICMP
218 src = '131.151.32.21'
219 dst = '131.151.32.129'
221 ip = ipv4(version, header_length, tos, total_length, identification,
222 flags, offset, ttl, proto, csum, src, dst, option)
227 p.add_protocol(self.sv)
234 def test_build_svlan(self):
235 p = self._build_svlan()
237 e = self.find_protocol(p, "ethernet")
239 eq_(e.ethertype, ether.ETH_TYPE_8021AD)
241 sv = self.find_protocol(p, "svlan")
243 eq_(sv.ethertype, ether.ETH_TYPE_8021Q)
245 v = self.find_protocol(p, "vlan")
247 eq_(v.ethertype, ether.ETH_TYPE_IP)
249 ip = self.find_protocol(p, "ipv4")
252 eq_(sv.pcp, self.pcp)
253 eq_(sv.cfi, self.cfi)
254 eq_(sv.vid, self.vid)
255 eq_(sv.ethertype, self.ethertype)
258 def test_malformed_svlan(self):
259 m_short_buf = self.buf[1:svlan._MIN_LEN]
260 svlan.parser(m_short_buf)
263 jsondict = self.sv.to_jsondict()
264 sv = svlan.from_jsondict(jsondict['svlan'])
265 eq_(str(self.sv), str(sv))