1 # Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
20 from nose.tools import eq_
21 from nose.tools import ok_
22 from nose.tools import raises
23 from ryu.ofproto import ether
24 from ryu.ofproto import inet
25 from ryu.lib.packet import ethernet
26 from ryu.lib.packet import packet
27 from ryu.lib.packet import ipv4
28 from ryu.lib.packet import vlan
29 from ryu.lib.packet import pbb
32 LOG = logging.getLogger(__name__)
35 class Test_itag(unittest.TestCase):
41 data = pcp << 29 | dei << 28 | uca << 27 | sid
42 buf = struct.pack(pbb.itag._PACK_STR, data)
43 it = pbb.itag(pcp, dei, uca, sid)
52 eq_(self.pcp, self.it.pcp)
53 eq_(self.dei, self.it.dei)
54 eq_(self.uca, self.it.uca)
55 eq_(self.sid, self.it.sid)
57 def test_parser(self):
58 _res = pbb.itag.parser(self.buf)
59 if type(_res) is tuple:
63 eq_(res.pcp, self.pcp)
64 eq_(res.dei, self.dei)
65 eq_(res.uca, self.uca)
66 eq_(res.sid, self.sid)
68 def test_serialize(self):
71 buf = self.it.serialize(data, prev)
72 res = struct.unpack(pbb.itag._PACK_STR, buf)
73 eq_(res[0], self.data)
75 def _build_itag(self):
76 b_src_mac = '00:07:0d:af:f4:54'
77 b_dst_mac = '00:00:00:00:00:00'
78 b_ethertype = ether.ETH_TYPE_8021AD
79 e1 = ethernet.ethernet(b_dst_mac, b_src_mac, b_ethertype)
84 b_ethertype = ether.ETH_TYPE_8021Q
85 bt = vlan.svlan(b_pcp, b_cfi, b_vid, b_ethertype)
87 c_src_mac = '11:11:11:11:11:11'
88 c_dst_mac = 'aa:aa:aa:aa:aa:aa'
89 c_ethertype = ether.ETH_TYPE_8021AD
90 e2 = ethernet.ethernet(c_dst_mac, c_src_mac, c_ethertype)
95 s_ethertype = ether.ETH_TYPE_8021Q
96 st = vlan.svlan(s_pcp, s_cfi, s_vid, s_ethertype)
101 c_ethertype = ether.ETH_TYPE_IP
102 ct = vlan.vlan(c_pcp, c_cfi, c_vid, c_ethertype)
108 identification = 0x8a5d
112 proto = inet.IPPROTO_ICMP
114 src = '131.151.32.21'
115 dst = '131.151.32.129'
117 ip = ipv4.ipv4(version, header_length, tos, total_length,
118 identification, flags, offset, ttl, proto, csum,
125 p.add_protocol(self.it)
134 def test_build_itag(self):
135 p = self._build_itag()
137 e = p.get_protocols(ethernet.ethernet)
139 ok_(isinstance(e, list))
140 eq_(e[0].ethertype, ether.ETH_TYPE_8021AD)
141 eq_(e[1].ethertype, ether.ETH_TYPE_8021AD)
143 sv = p.get_protocols(vlan.svlan)
145 ok_(isinstance(sv, list))
146 eq_(sv[0].ethertype, ether.ETH_TYPE_8021Q)
147 eq_(sv[1].ethertype, ether.ETH_TYPE_8021Q)
149 it = p.get_protocol(pbb.itag)
152 v = p.get_protocol(vlan.vlan)
154 eq_(v.ethertype, ether.ETH_TYPE_IP)
156 ip = p.get_protocol(ipv4.ipv4)
159 eq_(it.pcp, self.pcp)
160 eq_(it.dei, self.dei)
161 eq_(it.uca, self.uca)
162 eq_(it.sid, self.sid)
165 def test_malformed_itag(self):
166 m_short_buf = self.buf[1:pbb.itag._MIN_LEN]
167 pbb.itag.parser(m_short_buf)
170 jsondict = self.it.to_jsondict()
171 it = pbb.itag.from_jsondict(jsondict['itag'])
172 eq_(str(self.it), str(it))