backing up
[vsorcdistro/.git] / ryu / build / lib.linux-armv7l-2.7 / ryu / tests / unit / ofproto / test_parser_v13.py
1 # Copyright (C) 2014 Nippon Telegraph and Telephone Corporation.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #    http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 # vim: tabstop=4 shiftwidth=4 softtabstop=4
17
18 import unittest
19 import logging
20 import six
21 import socket
22 from struct import *
23 from nose.tools import *
24 from ryu.ofproto.ofproto_v1_3_parser import *
25 from ryu.ofproto import ofproto_v1_3_parser
26 from ryu.ofproto import ofproto_v1_3
27 from ryu.ofproto import ofproto_protocol
28 from ryu.ofproto import ether
29 from ryu.ofproto.ofproto_parser import MsgBase
30 from ryu import utils
31 from ryu.lib import addrconv
32
33 LOG = logging.getLogger('test_ofproto_v13')
34
35
36 _Datapath = ofproto_protocol.ProtocolDesc(version=ofproto_v1_3.OFP_VERSION)
37
38
39 class TestOFPMatch(unittest.TestCase):
40
41     """ Test case for ofproto_v1_3_parser.OFPMatch
42     """
43
44     def test_init(self):
45         res = OFPMatch()
46
47         # wc check
48         eq_(res._wc.vlan_vid_mask, 0)
49
50         # flow check
51         eq_(res._flow.vlan_vid, 0)
52
53     def _test_serialize_and_parser(self, match, header, value, mask=None):
54         cls_ = OFPMatchField._FIELDS_HEADERS.get(header)
55         pack_str = cls_.pack_str.replace('!', '')
56         fmt = '!HHI' + pack_str
57
58         # serialize
59         buf = bytearray()
60         length = match.serialize(buf, 0)
61         eq_(length, len(buf))
62         if mask and len(buf) > calcsize(fmt):
63             fmt += pack_str
64
65         res = list(unpack_from(fmt, six.binary_type(buf), 0)[3:])
66         if type(value) is list:
67             res_value = res[:calcsize(pack_str) // 2]
68             eq_(res_value, value)
69             if mask:
70                 res_mask = res[calcsize(pack_str) // 2:]
71                 eq_(res_mask, mask)
72         else:
73             res_value = res.pop(0)
74             if cls_.__name__ == 'MTVlanVid':
75                 eq_(res_value, value | ofproto.OFPVID_PRESENT)
76             else:
77                 eq_(res_value, value)
78             if mask and res and res[0]:
79                 res_mask = res[0]
80                 eq_(res_mask, mask)
81
82         # parser
83         res = match.parser(six.binary_type(buf), 0)
84         eq_(res.type, ofproto.OFPMT_OXM)
85         eq_(res.fields[0].header, header)
86         eq_(res.fields[0].value, value)
87         if mask and res.fields[0].mask is not None:
88             eq_(res.fields[0].mask, mask)
89
90         # to_jsondict
91         jsondict = match.to_jsondict()
92
93         # from_jsondict
94         match2 = match.from_jsondict(jsondict["OFPMatch"])
95         buf2 = bytearray()
96         match2.serialize(buf2, 0)
97         eq_(str(match), str(match2))
98         eq_(buf, buf2)
99
100     # set_vlan_vid
101     def _test_set_vlan_vid(self, vid, mask=None):
102         header = ofproto.OXM_OF_VLAN_VID
103         match = OFPMatch()
104         if mask is None:
105             match.set_vlan_vid(vid)
106         else:
107             header = ofproto.OXM_OF_VLAN_VID_W
108             match.set_vlan_vid_masked(vid, mask)
109         self._test_serialize_and_parser(match, header, vid, mask)
110
111     def _test_set_vlan_vid_none(self):
112         header = ofproto.OXM_OF_VLAN_VID
113         match = OFPMatch()
114         match.set_vlan_vid_none()
115         value = ofproto.OFPVID_NONE
116         cls_ = OFPMatchField._FIELDS_HEADERS.get(header)
117         pack_str = cls_.pack_str.replace('!', '')
118         fmt = '!HHI' + pack_str
119
120         # serialize
121         buf = bytearray()
122         length = match.serialize(buf, 0)
123         eq_(length, len(buf))
124
125         res = list(unpack_from(fmt, six.binary_type(buf), 0)[3:])
126         res_value = res.pop(0)
127         eq_(res_value, value)
128
129         # parser
130         res = match.parser(six.binary_type(buf), 0)
131         eq_(res.type, ofproto.OFPMT_OXM)
132         eq_(res.fields[0].header, header)
133         eq_(res.fields[0].value, value)
134
135         # to_jsondict
136         jsondict = match.to_jsondict()
137
138         # from_jsondict
139         match2 = match.from_jsondict(jsondict["OFPMatch"])
140         buf2 = bytearray()
141         match2.serialize(buf2, 0)
142         eq_(str(match), str(match2))
143         eq_(buf, buf2)
144
145     def test_set_vlan_vid_mid(self):
146         self._test_set_vlan_vid(2047)
147
148     def test_set_vlan_vid_max(self):
149         self._test_set_vlan_vid(0xfff)
150
151     def test_set_vlan_vid_min(self):
152         self._test_set_vlan_vid(0)
153
154     def test_set_vlan_vid_masked_mid(self):
155         self._test_set_vlan_vid(2047, 0xf0f)
156
157     def test_set_vlan_vid_masked_max(self):
158         self._test_set_vlan_vid(2047, 0xfff)
159
160     def test_set_vlan_vid_masked_min(self):
161         self._test_set_vlan_vid(2047, 0)
162
163     def test_set_vlan_vid_none(self):
164         self._test_set_vlan_vid_none()