backing up
[vsorcdistro/.git] / ryu / build / lib.linux-armv7l-2.7 / ryu / tests / unit / packet / test_vlan.py
1 # Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #    http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 # vim: tabstop=4 shiftwidth=4 softtabstop=4
17
18 import unittest
19 import logging
20 import struct
21 from struct import *
22 from nose.tools import *
23 from ryu.ofproto import ether, inet
24 from ryu.lib.packet.ethernet import ethernet
25 from ryu.lib.packet.packet import Packet
26 from ryu.lib.packet.ipv4 import ipv4
27 from ryu.lib.packet.vlan import vlan
28 from ryu.lib.packet.vlan import svlan
29
30
31 LOG = logging.getLogger('test_vlan')
32
33
34 class Test_vlan(unittest.TestCase):
35     """ Test case for vlan
36     """
37
38     pcp = 0
39     cfi = 0
40     vid = 32
41     tci = pcp << 15 | cfi << 12 | vid
42     ethertype = ether.ETH_TYPE_IP
43
44     buf = pack(vlan._PACK_STR, tci, ethertype)
45
46     v = vlan(pcp, cfi, vid, ethertype)
47
48     def setUp(self):
49         pass
50
51     def tearDown(self):
52         pass
53
54     def find_protocol(self, pkt, name):
55         for p in pkt.protocols:
56             if p.protocol_name == name:
57                 return p
58
59     def test_init(self):
60         eq_(self.pcp, self.v.pcp)
61         eq_(self.cfi, self.v.cfi)
62         eq_(self.vid, self.v.vid)
63         eq_(self.ethertype, self.v.ethertype)
64
65     def test_parser(self):
66         res, ptype, _ = self.v.parser(self.buf)
67
68         eq_(res.pcp, self.pcp)
69         eq_(res.cfi, self.cfi)
70         eq_(res.vid, self.vid)
71         eq_(res.ethertype, self.ethertype)
72         eq_(ptype, ipv4)
73
74     def test_serialize(self):
75         data = bytearray()
76         prev = None
77         buf = self.v.serialize(data, prev)
78
79         fmt = vlan._PACK_STR
80         res = struct.unpack(fmt, buf)
81
82         eq_(res[0], self.tci)
83         eq_(res[1], self.ethertype)
84
85     def _build_vlan(self):
86         src_mac = '00:07:0d:af:f4:54'
87         dst_mac = '00:00:00:00:00:00'
88         ethertype = ether.ETH_TYPE_8021Q
89         e = ethernet(dst_mac, src_mac, ethertype)
90
91         version = 4
92         header_length = 20
93         tos = 0
94         total_length = 24
95         identification = 0x8a5d
96         flags = 0
97         offset = 1480
98         ttl = 64
99         proto = inet.IPPROTO_ICMP
100         csum = 0xa7f2
101         src = '131.151.32.21'
102         dst = '131.151.32.129'
103         option = b'TEST'
104         ip = ipv4(version, header_length, tos, total_length, identification,
105                   flags, offset, ttl, proto, csum, src, dst, option)
106
107         p = Packet()
108
109         p.add_protocol(e)
110         p.add_protocol(self.v)
111         p.add_protocol(ip)
112         p.serialize()
113
114         return p
115
116     def test_build_vlan(self):
117         p = self._build_vlan()
118
119         e = self.find_protocol(p, "ethernet")
120         ok_(e)
121         eq_(e.ethertype, ether.ETH_TYPE_8021Q)
122
123         v = self.find_protocol(p, "vlan")
124         ok_(v)
125         eq_(v.ethertype, ether.ETH_TYPE_IP)
126
127         ip = self.find_protocol(p, "ipv4")
128         ok_(ip)
129
130         eq_(v.pcp, self.pcp)
131         eq_(v.cfi, self.cfi)
132         eq_(v.vid, self.vid)
133         eq_(v.ethertype, self.ethertype)
134
135     @raises(Exception)
136     def test_malformed_vlan(self):
137         m_short_buf = self.buf[1:vlan._MIN_LEN]
138         vlan.parser(m_short_buf)
139
140     def test_json(self):
141         jsondict = self.v.to_jsondict()
142         v = vlan.from_jsondict(jsondict['vlan'])
143         eq_(str(self.v), str(v))
144
145
146 class Test_svlan(unittest.TestCase):
147
148     pcp = 0
149     cfi = 0
150     vid = 32
151     tci = pcp << 15 | cfi << 12 | vid
152     ethertype = ether.ETH_TYPE_8021Q
153
154     buf = pack(svlan._PACK_STR, tci, ethertype)
155
156     sv = svlan(pcp, cfi, vid, ethertype)
157
158     def setUp(self):
159         pass
160
161     def tearDown(self):
162         pass
163
164     def find_protocol(self, pkt, name):
165         for p in pkt.protocols:
166             if p.protocol_name == name:
167                 return p
168
169     def test_init(self):
170         eq_(self.pcp, self.sv.pcp)
171         eq_(self.cfi, self.sv.cfi)
172         eq_(self.vid, self.sv.vid)
173         eq_(self.ethertype, self.sv.ethertype)
174
175     def test_parser(self):
176         res, ptype, _ = self.sv.parser(self.buf)
177
178         eq_(res.pcp, self.pcp)
179         eq_(res.cfi, self.cfi)
180         eq_(res.vid, self.vid)
181         eq_(res.ethertype, self.ethertype)
182         eq_(ptype, vlan)
183
184     def test_serialize(self):
185         data = bytearray()
186         prev = None
187         buf = self.sv.serialize(data, prev)
188
189         fmt = svlan._PACK_STR
190         res = struct.unpack(fmt, buf)
191
192         eq_(res[0], self.tci)
193         eq_(res[1], self.ethertype)
194
195     def _build_svlan(self):
196         src_mac = '00:07:0d:af:f4:54'
197         dst_mac = '00:00:00:00:00:00'
198         ethertype = ether.ETH_TYPE_8021AD
199         e = ethernet(dst_mac, src_mac, ethertype)
200
201         pcp = 0
202         cfi = 0
203         vid = 32
204         tci = pcp << 15 | cfi << 12 | vid
205         ethertype = ether.ETH_TYPE_IP
206         v = vlan(pcp, cfi, vid, ethertype)
207
208         version = 4
209         header_length = 20
210         tos = 0
211         total_length = 24
212         identification = 0x8a5d
213         flags = 0
214         offset = 1480
215         ttl = 64
216         proto = inet.IPPROTO_ICMP
217         csum = 0xa7f2
218         src = '131.151.32.21'
219         dst = '131.151.32.129'
220         option = b'TEST'
221         ip = ipv4(version, header_length, tos, total_length, identification,
222                   flags, offset, ttl, proto, csum, src, dst, option)
223
224         p = Packet()
225
226         p.add_protocol(e)
227         p.add_protocol(self.sv)
228         p.add_protocol(v)
229         p.add_protocol(ip)
230         p.serialize()
231
232         return p
233
234     def test_build_svlan(self):
235         p = self._build_svlan()
236
237         e = self.find_protocol(p, "ethernet")
238         ok_(e)
239         eq_(e.ethertype, ether.ETH_TYPE_8021AD)
240
241         sv = self.find_protocol(p, "svlan")
242         ok_(sv)
243         eq_(sv.ethertype, ether.ETH_TYPE_8021Q)
244
245         v = self.find_protocol(p, "vlan")
246         ok_(v)
247         eq_(v.ethertype, ether.ETH_TYPE_IP)
248
249         ip = self.find_protocol(p, "ipv4")
250         ok_(ip)
251
252         eq_(sv.pcp, self.pcp)
253         eq_(sv.cfi, self.cfi)
254         eq_(sv.vid, self.vid)
255         eq_(sv.ethertype, self.ethertype)
256
257     @raises(Exception)
258     def test_malformed_svlan(self):
259         m_short_buf = self.buf[1:svlan._MIN_LEN]
260         svlan.parser(m_short_buf)
261
262     def test_json(self):
263         jsondict = self.sv.to_jsondict()
264         sv = svlan.from_jsondict(jsondict['svlan'])
265         eq_(str(self.sv), str(sv))