backing up
[vsorcdistro/.git] / ryu / build / lib.linux-armv7l-2.7 / ryu / tests / unit / packet / test_pbb.py
1 # Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #    http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 import logging
17 import struct
18 import unittest
19
20 from nose.tools import eq_
21 from nose.tools import ok_
22 from nose.tools import raises
23 from ryu.ofproto import ether
24 from ryu.ofproto import inet
25 from ryu.lib.packet import ethernet
26 from ryu.lib.packet import packet
27 from ryu.lib.packet import ipv4
28 from ryu.lib.packet import vlan
29 from ryu.lib.packet import pbb
30
31
32 LOG = logging.getLogger(__name__)
33
34
35 class Test_itag(unittest.TestCase):
36
37     pcp = 3
38     dei = 0
39     uca = 1
40     sid = 16770000
41     data = pcp << 29 | dei << 28 | uca << 27 | sid
42     buf = struct.pack(pbb.itag._PACK_STR, data)
43     it = pbb.itag(pcp, dei, uca, sid)
44
45     def setUp(self):
46         pass
47
48     def tearDown(self):
49         pass
50
51     def test_init(self):
52         eq_(self.pcp, self.it.pcp)
53         eq_(self.dei, self.it.dei)
54         eq_(self.uca, self.it.uca)
55         eq_(self.sid, self.it.sid)
56
57     def test_parser(self):
58         _res = pbb.itag.parser(self.buf)
59         if type(_res) is tuple:
60             res = _res[0]
61         else:
62             res = _res
63         eq_(res.pcp, self.pcp)
64         eq_(res.dei, self.dei)
65         eq_(res.uca, self.uca)
66         eq_(res.sid, self.sid)
67
68     def test_serialize(self):
69         data = bytearray()
70         prev = None
71         buf = self.it.serialize(data, prev)
72         res = struct.unpack(pbb.itag._PACK_STR, buf)
73         eq_(res[0], self.data)
74
75     def _build_itag(self):
76         b_src_mac = '00:07:0d:af:f4:54'
77         b_dst_mac = '00:00:00:00:00:00'
78         b_ethertype = ether.ETH_TYPE_8021AD
79         e1 = ethernet.ethernet(b_dst_mac, b_src_mac, b_ethertype)
80
81         b_pcp = 0
82         b_cfi = 0
83         b_vid = 32
84         b_ethertype = ether.ETH_TYPE_8021Q
85         bt = vlan.svlan(b_pcp, b_cfi, b_vid, b_ethertype)
86
87         c_src_mac = '11:11:11:11:11:11'
88         c_dst_mac = 'aa:aa:aa:aa:aa:aa'
89         c_ethertype = ether.ETH_TYPE_8021AD
90         e2 = ethernet.ethernet(c_dst_mac, c_src_mac, c_ethertype)
91
92         s_pcp = 0
93         s_cfi = 0
94         s_vid = 32
95         s_ethertype = ether.ETH_TYPE_8021Q
96         st = vlan.svlan(s_pcp, s_cfi, s_vid, s_ethertype)
97
98         c_pcp = 0
99         c_cfi = 0
100         c_vid = 32
101         c_ethertype = ether.ETH_TYPE_IP
102         ct = vlan.vlan(c_pcp, c_cfi, c_vid, c_ethertype)
103
104         version = 4
105         header_length = 20
106         tos = 0
107         total_length = 24
108         identification = 0x8a5d
109         flags = 0
110         offset = 1480
111         ttl = 64
112         proto = inet.IPPROTO_ICMP
113         csum = 0xa7f2
114         src = '131.151.32.21'
115         dst = '131.151.32.129'
116         option = b'TEST'
117         ip = ipv4.ipv4(version, header_length, tos, total_length,
118                        identification, flags, offset, ttl, proto, csum,
119                        src, dst, option)
120
121         p = packet.Packet()
122
123         p.add_protocol(e1)
124         p.add_protocol(bt)
125         p.add_protocol(self.it)
126         p.add_protocol(e2)
127         p.add_protocol(st)
128         p.add_protocol(ct)
129         p.add_protocol(ip)
130         p.serialize()
131
132         return p
133
134     def test_build_itag(self):
135         p = self._build_itag()
136
137         e = p.get_protocols(ethernet.ethernet)
138         ok_(e)
139         ok_(isinstance(e, list))
140         eq_(e[0].ethertype, ether.ETH_TYPE_8021AD)
141         eq_(e[1].ethertype, ether.ETH_TYPE_8021AD)
142
143         sv = p.get_protocols(vlan.svlan)
144         ok_(sv)
145         ok_(isinstance(sv, list))
146         eq_(sv[0].ethertype, ether.ETH_TYPE_8021Q)
147         eq_(sv[1].ethertype, ether.ETH_TYPE_8021Q)
148
149         it = p.get_protocol(pbb.itag)
150         ok_(it)
151
152         v = p.get_protocol(vlan.vlan)
153         ok_(v)
154         eq_(v.ethertype, ether.ETH_TYPE_IP)
155
156         ip = p.get_protocol(ipv4.ipv4)
157         ok_(ip)
158
159         eq_(it.pcp, self.pcp)
160         eq_(it.dei, self.dei)
161         eq_(it.uca, self.uca)
162         eq_(it.sid, self.sid)
163
164     @raises(Exception)
165     def test_malformed_itag(self):
166         m_short_buf = self.buf[1:pbb.itag._MIN_LEN]
167         pbb.itag.parser(m_short_buf)
168
169     def test_json(self):
170         jsondict = self.it.to_jsondict()
171         it = pbb.itag.from_jsondict(jsondict['itag'])
172         eq_(str(self.it), str(it))