backing up
[vsorcdistro/.git] / ryu / build / lib.linux-armv7l-2.7 / ryu / tests / integrated / test_add_flow_v10.py
1 # Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #    http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 # vim: tabstop=4 shiftwidth=4 softtabstop=4
17
18 import logging
19
20 from ryu.tests.integrated import tester
21 from ryu.ofproto import ofproto_v1_0
22 from ryu.ofproto import ether
23 from ryu.ofproto import nx_match
24
25 LOG = logging.getLogger(__name__)
26
27
28 class RunTest(tester.TestFlowBase):
29     """ Test case for add flows of1.0
30     """
31     OFP_VERSIONS = [ofproto_v1_0.OFP_VERSION]
32
33     def __init__(self, *args, **kwargs):
34         super(RunTest, self).__init__(*args, **kwargs)
35         self._verify = []
36
37     def add_action(self, dp, action):
38         rule = nx_match.ClsRule()
39         self.send_flow_mod(
40             dp, rule, 0, dp.ofproto.OFPFC_ADD, 0, 0, None,
41             0xffffffff, None, dp.ofproto.OFPFF_SEND_FLOW_REM, action)
42
43     def add_rule(self, dp, rule):
44         self.send_flow_mod(
45             dp, rule, 0, dp.ofproto.OFPFC_ADD, 0, 0, None,
46             0xffffffff, None, dp.ofproto.OFPFF_SEND_FLOW_REM, [])
47
48     def send_flow_mod(self, dp, rule, cookie, command, idle_timeout,
49                       hard_timeout, priority=None, buffer_id=0xffffffff,
50                       out_port=None, flags=0, actions=None):
51
52         if priority is None:
53             priority = dp.ofproto.OFP_DEFAULT_PRIORITY
54         if out_port is None:
55             out_port = dp.ofproto.OFPP_NONE
56
57         match_tuple = rule.match_tuple()
58         match = dp.ofproto_parser.OFPMatch(*match_tuple)
59
60         m = dp.ofproto_parser.OFPFlowMod(
61             dp, match, cookie, command, idle_timeout, hard_timeout,
62             priority, buffer_id, out_port, flags, actions)
63
64         dp.send_msg(m)
65
66     def _verify_action(self, actions, type_, name, value):
67         try:
68             action = actions[0]
69             if action.cls_action_type != type_:
70                 return "Action type error. send:%s, val:%s" \
71                     % (type_, action.cls_action_type)
72         except IndexError:
73             return "Action is not setting."
74
75         f_value = None
76         if name:
77             try:
78                 if isinstance(name, list):
79                     f_value = [getattr(action, n) for n in name]
80                 else:
81                     f_value = getattr(action, name)
82             except AttributeError:
83                 pass
84
85         if f_value != value:
86             return "Value error. send:%s=%s val:%s" \
87                 % (name, value, f_value)
88         return True
89
90     def _verify_rule(self, rule, name, value):
91         f_value = getattr(rule, name)
92         if f_value != value:
93             return "Value error. send:%s=%s val:%s" \
94                 % (name, value, f_value)
95         return True
96
97     def verify_default(self, dp, stats):
98         verify = self._verify
99         self._verify = []
100         match = stats[0].match
101         actions = stats[0].actions
102
103         if len(verify) == 2:
104             return self._verify_rule(match, *verify)
105         elif len(verify) == 3:
106             return self._verify_action(actions, *verify)
107         else:
108             return "self._verify is invalid."
109
110     # Test of Actions
111     def test_action_output(self, dp):
112         out_port = 2
113         self._verify = [dp.ofproto.OFPAT_OUTPUT,
114                         'port', out_port]
115         action = dp.ofproto_parser.OFPActionOutput(out_port)
116         self.add_action(dp, [action, ])
117
118     def test_action_vlan_vid(self, dp):
119         vlan_vid = 2
120         self._verify = [dp.ofproto.OFPAT_SET_VLAN_VID,
121                         'vlan_vid', vlan_vid]
122         action = dp.ofproto_parser.OFPActionVlanVid(vlan_vid)
123         self.add_action(dp, [action, ])
124
125     def test_action_vlan_pcp(self, dp):
126         vlan_pcp = 4
127         self._verify = [dp.ofproto.OFPAT_SET_VLAN_PCP,
128                         'vlan_pcp', vlan_pcp]
129         action = dp.ofproto_parser.OFPActionVlanPcp(vlan_pcp)
130         self.add_action(dp, [action, ])
131
132     def test_action_strip_vlan(self, dp):
133         vlan_pcp = 4
134         self._verify = [dp.ofproto.OFPAT_STRIP_VLAN,
135                         None, None]
136         action = dp.ofproto_parser.OFPActionStripVlan()
137         self.add_action(dp, [action, ])
138
139     def test_action_set_dl_src(self, dp):
140         dl_src = '56:b3:42:04:b2:7a'
141         dl_src_bin = self.haddr_to_bin(dl_src)
142         self._verify = [dp.ofproto.OFPAT_SET_DL_SRC,
143                         'dl_addr', dl_src_bin]
144         action = dp.ofproto_parser.OFPActionSetDlSrc(dl_src_bin)
145         self.add_action(dp, [action, ])
146
147     def test_action_set_dl_dst(self, dp):
148         dl_dst = 'c2:93:a2:fb:d0:f4'
149         dl_dst_bin = self.haddr_to_bin(dl_dst)
150         self._verify = [dp.ofproto.OFPAT_SET_DL_DST,
151                         'dl_addr', dl_dst_bin]
152         action = dp.ofproto_parser.OFPActionSetDlDst(dl_dst_bin)
153         self.add_action(dp, [action, ])
154
155     def test_action_set_nw_src(self, dp):
156         nw_src = '216.132.81.105'
157         nw_src_int = self.ipv4_to_int(nw_src)
158         self._verify = [dp.ofproto.OFPAT_SET_NW_SRC,
159                         'nw_addr', nw_src_int]
160         action = dp.ofproto_parser.OFPActionSetNwSrc(nw_src_int)
161         self.add_action(dp, [action, ])
162
163     def test_action_set_nw_dst(self, dp):
164         nw_dst = '223.201.206.3'
165         nw_dst_int = self.ipv4_to_int(nw_dst)
166         self._verify = [dp.ofproto.OFPAT_SET_NW_DST,
167                         'nw_addr', nw_dst_int]
168         action = dp.ofproto_parser.OFPActionSetNwDst(nw_dst_int)
169         self.add_action(dp, [action, ])
170
171     def test_action_set_nw_tos(self, dp):
172         # lowest two bits must be zero
173         nw_tos = 1 << 2
174         self._verify = [dp.ofproto.OFPAT_SET_NW_TOS,
175                         'tos', nw_tos]
176         action = dp.ofproto_parser.OFPActionSetNwTos(nw_tos)
177         self.add_action(dp, [action, ])
178
179     def test_action_set_tp_src(self, dp):
180         tp_src = 55420
181         self._verify = [dp.ofproto.OFPAT_SET_TP_SRC,
182                         'tp', tp_src]
183         action = dp.ofproto_parser.OFPActionSetTpSrc(tp_src)
184         self.add_action(dp, [action, ])
185
186     def test_action_set_tp_dst(self, dp):
187         tp_dst = 15430
188         self._verify = [dp.ofproto.OFPAT_SET_TP_DST,
189                         'tp', tp_dst]
190         action = dp.ofproto_parser.OFPActionSetTpDst(tp_dst)
191         self.add_action(dp, [action, ])
192
193     def test_action_enqueue(self, dp):
194         port = 207
195         queue_id = 4287508753
196         self._verify = [dp.ofproto.OFPAT_ENQUEUE,
197                         ['port', 'queue_id'], [port, queue_id]]
198         action = dp.ofproto_parser.OFPActionEnqueue(port, queue_id)
199         self.add_action(dp, [action, ])
200
201     # Test of Rules
202     def test_rule_set_in_port(self, dp):
203         in_port = 32
204         self._verify = ['in_port', in_port]
205
206         rule = nx_match.ClsRule()
207         rule.set_in_port(in_port)
208         self.add_rule(dp, rule)
209
210     def test_rule_set_dl_src(self, dp):
211         dl_src = 'b8:a1:94:51:78:83'
212         dl_src_bin = self.haddr_to_bin(dl_src)
213         self._verify = ['dl_src', dl_src_bin]
214
215         rule = nx_match.ClsRule()
216         rule.set_dl_src(dl_src_bin)
217         self.add_rule(dp, rule)
218
219     def test_rule_set_dl_type_ip(self, dp):
220         dl_type = ether.ETH_TYPE_IP
221         self._verify = ['dl_type', dl_type]
222
223         rule = nx_match.ClsRule()
224         rule.set_dl_type(dl_type)
225         self.add_rule(dp, rule)
226
227     def test_rule_set_dl_type_arp(self, dp):
228         dl_type = ether.ETH_TYPE_ARP
229         self._verify = ['dl_type', dl_type]
230
231         rule = nx_match.ClsRule()
232         rule.set_dl_type(dl_type)
233         self.add_rule(dp, rule)
234
235     def test_rule_set_dl_type_vlan(self, dp):
236         dl_type = ether.ETH_TYPE_8021Q
237         self._verify = ['dl_type', dl_type]
238
239         rule = nx_match.ClsRule()
240         rule.set_dl_type(dl_type)
241         self.add_rule(dp, rule)
242
243     def test_rule_set_dl_type_ipv6(self, dp):
244         dl_type = ether.ETH_TYPE_IPV6
245         self._verify = ['dl_type', dl_type]
246
247         rule = nx_match.ClsRule()
248         rule.set_dl_type(dl_type)
249         self.add_rule(dp, rule)
250
251     def test_rule_set_dl_type_lacp(self, dp):
252         dl_type = ether.ETH_TYPE_SLOW
253         self._verify = ['dl_type', dl_type]
254
255         rule = nx_match.ClsRule()
256         rule.set_dl_type(dl_type)
257         self.add_rule(dp, rule)