1 # Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 # vim: tabstop=4 shiftwidth=4 softtabstop=4
20 from ryu.tests.integrated import tester
21 from ryu.ofproto import ofproto_v1_0
22 from ryu.ofproto import ether
23 from ryu.ofproto import nx_match
25 LOG = logging.getLogger(__name__)
28 class RunTest(tester.TestFlowBase):
29 """ Test case for add flows of1.0
31 OFP_VERSIONS = [ofproto_v1_0.OFP_VERSION]
33 def __init__(self, *args, **kwargs):
34 super(RunTest, self).__init__(*args, **kwargs)
37 def add_action(self, dp, action):
38 rule = nx_match.ClsRule()
40 dp, rule, 0, dp.ofproto.OFPFC_ADD, 0, 0, None,
41 0xffffffff, None, dp.ofproto.OFPFF_SEND_FLOW_REM, action)
43 def add_rule(self, dp, rule):
45 dp, rule, 0, dp.ofproto.OFPFC_ADD, 0, 0, None,
46 0xffffffff, None, dp.ofproto.OFPFF_SEND_FLOW_REM, [])
48 def send_flow_mod(self, dp, rule, cookie, command, idle_timeout,
49 hard_timeout, priority=None, buffer_id=0xffffffff,
50 out_port=None, flags=0, actions=None):
53 priority = dp.ofproto.OFP_DEFAULT_PRIORITY
55 out_port = dp.ofproto.OFPP_NONE
57 match_tuple = rule.match_tuple()
58 match = dp.ofproto_parser.OFPMatch(*match_tuple)
60 m = dp.ofproto_parser.OFPFlowMod(
61 dp, match, cookie, command, idle_timeout, hard_timeout,
62 priority, buffer_id, out_port, flags, actions)
66 def _verify_action(self, actions, type_, name, value):
69 if action.cls_action_type != type_:
70 return "Action type error. send:%s, val:%s" \
71 % (type_, action.cls_action_type)
73 return "Action is not setting."
78 if isinstance(name, list):
79 f_value = [getattr(action, n) for n in name]
81 f_value = getattr(action, name)
82 except AttributeError:
86 return "Value error. send:%s=%s val:%s" \
87 % (name, value, f_value)
90 def _verify_rule(self, rule, name, value):
91 f_value = getattr(rule, name)
93 return "Value error. send:%s=%s val:%s" \
94 % (name, value, f_value)
97 def verify_default(self, dp, stats):
100 match = stats[0].match
101 actions = stats[0].actions
104 return self._verify_rule(match, *verify)
105 elif len(verify) == 3:
106 return self._verify_action(actions, *verify)
108 return "self._verify is invalid."
111 def test_action_output(self, dp):
113 self._verify = [dp.ofproto.OFPAT_OUTPUT,
115 action = dp.ofproto_parser.OFPActionOutput(out_port)
116 self.add_action(dp, [action, ])
118 def test_action_vlan_vid(self, dp):
120 self._verify = [dp.ofproto.OFPAT_SET_VLAN_VID,
121 'vlan_vid', vlan_vid]
122 action = dp.ofproto_parser.OFPActionVlanVid(vlan_vid)
123 self.add_action(dp, [action, ])
125 def test_action_vlan_pcp(self, dp):
127 self._verify = [dp.ofproto.OFPAT_SET_VLAN_PCP,
128 'vlan_pcp', vlan_pcp]
129 action = dp.ofproto_parser.OFPActionVlanPcp(vlan_pcp)
130 self.add_action(dp, [action, ])
132 def test_action_strip_vlan(self, dp):
134 self._verify = [dp.ofproto.OFPAT_STRIP_VLAN,
136 action = dp.ofproto_parser.OFPActionStripVlan()
137 self.add_action(dp, [action, ])
139 def test_action_set_dl_src(self, dp):
140 dl_src = '56:b3:42:04:b2:7a'
141 dl_src_bin = self.haddr_to_bin(dl_src)
142 self._verify = [dp.ofproto.OFPAT_SET_DL_SRC,
143 'dl_addr', dl_src_bin]
144 action = dp.ofproto_parser.OFPActionSetDlSrc(dl_src_bin)
145 self.add_action(dp, [action, ])
147 def test_action_set_dl_dst(self, dp):
148 dl_dst = 'c2:93:a2:fb:d0:f4'
149 dl_dst_bin = self.haddr_to_bin(dl_dst)
150 self._verify = [dp.ofproto.OFPAT_SET_DL_DST,
151 'dl_addr', dl_dst_bin]
152 action = dp.ofproto_parser.OFPActionSetDlDst(dl_dst_bin)
153 self.add_action(dp, [action, ])
155 def test_action_set_nw_src(self, dp):
156 nw_src = '216.132.81.105'
157 nw_src_int = self.ipv4_to_int(nw_src)
158 self._verify = [dp.ofproto.OFPAT_SET_NW_SRC,
159 'nw_addr', nw_src_int]
160 action = dp.ofproto_parser.OFPActionSetNwSrc(nw_src_int)
161 self.add_action(dp, [action, ])
163 def test_action_set_nw_dst(self, dp):
164 nw_dst = '223.201.206.3'
165 nw_dst_int = self.ipv4_to_int(nw_dst)
166 self._verify = [dp.ofproto.OFPAT_SET_NW_DST,
167 'nw_addr', nw_dst_int]
168 action = dp.ofproto_parser.OFPActionSetNwDst(nw_dst_int)
169 self.add_action(dp, [action, ])
171 def test_action_set_nw_tos(self, dp):
172 # lowest two bits must be zero
174 self._verify = [dp.ofproto.OFPAT_SET_NW_TOS,
176 action = dp.ofproto_parser.OFPActionSetNwTos(nw_tos)
177 self.add_action(dp, [action, ])
179 def test_action_set_tp_src(self, dp):
181 self._verify = [dp.ofproto.OFPAT_SET_TP_SRC,
183 action = dp.ofproto_parser.OFPActionSetTpSrc(tp_src)
184 self.add_action(dp, [action, ])
186 def test_action_set_tp_dst(self, dp):
188 self._verify = [dp.ofproto.OFPAT_SET_TP_DST,
190 action = dp.ofproto_parser.OFPActionSetTpDst(tp_dst)
191 self.add_action(dp, [action, ])
193 def test_action_enqueue(self, dp):
195 queue_id = 4287508753
196 self._verify = [dp.ofproto.OFPAT_ENQUEUE,
197 ['port', 'queue_id'], [port, queue_id]]
198 action = dp.ofproto_parser.OFPActionEnqueue(port, queue_id)
199 self.add_action(dp, [action, ])
202 def test_rule_set_in_port(self, dp):
204 self._verify = ['in_port', in_port]
206 rule = nx_match.ClsRule()
207 rule.set_in_port(in_port)
208 self.add_rule(dp, rule)
210 def test_rule_set_dl_src(self, dp):
211 dl_src = 'b8:a1:94:51:78:83'
212 dl_src_bin = self.haddr_to_bin(dl_src)
213 self._verify = ['dl_src', dl_src_bin]
215 rule = nx_match.ClsRule()
216 rule.set_dl_src(dl_src_bin)
217 self.add_rule(dp, rule)
219 def test_rule_set_dl_type_ip(self, dp):
220 dl_type = ether.ETH_TYPE_IP
221 self._verify = ['dl_type', dl_type]
223 rule = nx_match.ClsRule()
224 rule.set_dl_type(dl_type)
225 self.add_rule(dp, rule)
227 def test_rule_set_dl_type_arp(self, dp):
228 dl_type = ether.ETH_TYPE_ARP
229 self._verify = ['dl_type', dl_type]
231 rule = nx_match.ClsRule()
232 rule.set_dl_type(dl_type)
233 self.add_rule(dp, rule)
235 def test_rule_set_dl_type_vlan(self, dp):
236 dl_type = ether.ETH_TYPE_8021Q
237 self._verify = ['dl_type', dl_type]
239 rule = nx_match.ClsRule()
240 rule.set_dl_type(dl_type)
241 self.add_rule(dp, rule)
243 def test_rule_set_dl_type_ipv6(self, dp):
244 dl_type = ether.ETH_TYPE_IPV6
245 self._verify = ['dl_type', dl_type]
247 rule = nx_match.ClsRule()
248 rule.set_dl_type(dl_type)
249 self.add_rule(dp, rule)
251 def test_rule_set_dl_type_lacp(self, dp):
252 dl_type = ether.ETH_TYPE_SLOW
253 self._verify = ['dl_type', dl_type]
255 rule = nx_match.ClsRule()
256 rule.set_dl_type(dl_type)
257 self.add_rule(dp, rule)