backing up
[vsorcdistro/.git] / ryu / build / lib.linux-armv7l-2.7 / ryu / tests / integrated / test_add_flow_v10.py
diff --git a/ryu/build/lib.linux-armv7l-2.7/ryu/tests/integrated/test_add_flow_v10.py b/ryu/build/lib.linux-armv7l-2.7/ryu/tests/integrated/test_add_flow_v10.py
new file mode 100644 (file)
index 0000000..6a09646
--- /dev/null
@@ -0,0 +1,257 @@
+# Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+import logging
+
+from ryu.tests.integrated import tester
+from ryu.ofproto import ofproto_v1_0
+from ryu.ofproto import ether
+from ryu.ofproto import nx_match
+
+LOG = logging.getLogger(__name__)
+
+
+class RunTest(tester.TestFlowBase):
+    """ Test case for add flows of1.0
+    """
+    OFP_VERSIONS = [ofproto_v1_0.OFP_VERSION]
+
+    def __init__(self, *args, **kwargs):
+        super(RunTest, self).__init__(*args, **kwargs)
+        self._verify = []
+
+    def add_action(self, dp, action):
+        rule = nx_match.ClsRule()
+        self.send_flow_mod(
+            dp, rule, 0, dp.ofproto.OFPFC_ADD, 0, 0, None,
+            0xffffffff, None, dp.ofproto.OFPFF_SEND_FLOW_REM, action)
+
+    def add_rule(self, dp, rule):
+        self.send_flow_mod(
+            dp, rule, 0, dp.ofproto.OFPFC_ADD, 0, 0, None,
+            0xffffffff, None, dp.ofproto.OFPFF_SEND_FLOW_REM, [])
+
+    def send_flow_mod(self, dp, rule, cookie, command, idle_timeout,
+                      hard_timeout, priority=None, buffer_id=0xffffffff,
+                      out_port=None, flags=0, actions=None):
+
+        if priority is None:
+            priority = dp.ofproto.OFP_DEFAULT_PRIORITY
+        if out_port is None:
+            out_port = dp.ofproto.OFPP_NONE
+
+        match_tuple = rule.match_tuple()
+        match = dp.ofproto_parser.OFPMatch(*match_tuple)
+
+        m = dp.ofproto_parser.OFPFlowMod(
+            dp, match, cookie, command, idle_timeout, hard_timeout,
+            priority, buffer_id, out_port, flags, actions)
+
+        dp.send_msg(m)
+
+    def _verify_action(self, actions, type_, name, value):
+        try:
+            action = actions[0]
+            if action.cls_action_type != type_:
+                return "Action type error. send:%s, val:%s" \
+                    % (type_, action.cls_action_type)
+        except IndexError:
+            return "Action is not setting."
+
+        f_value = None
+        if name:
+            try:
+                if isinstance(name, list):
+                    f_value = [getattr(action, n) for n in name]
+                else:
+                    f_value = getattr(action, name)
+            except AttributeError:
+                pass
+
+        if f_value != value:
+            return "Value error. send:%s=%s val:%s" \
+                % (name, value, f_value)
+        return True
+
+    def _verify_rule(self, rule, name, value):
+        f_value = getattr(rule, name)
+        if f_value != value:
+            return "Value error. send:%s=%s val:%s" \
+                % (name, value, f_value)
+        return True
+
+    def verify_default(self, dp, stats):
+        verify = self._verify
+        self._verify = []
+        match = stats[0].match
+        actions = stats[0].actions
+
+        if len(verify) == 2:
+            return self._verify_rule(match, *verify)
+        elif len(verify) == 3:
+            return self._verify_action(actions, *verify)
+        else:
+            return "self._verify is invalid."
+
+    # Test of Actions
+    def test_action_output(self, dp):
+        out_port = 2
+        self._verify = [dp.ofproto.OFPAT_OUTPUT,
+                        'port', out_port]
+        action = dp.ofproto_parser.OFPActionOutput(out_port)
+        self.add_action(dp, [action, ])
+
+    def test_action_vlan_vid(self, dp):
+        vlan_vid = 2
+        self._verify = [dp.ofproto.OFPAT_SET_VLAN_VID,
+                        'vlan_vid', vlan_vid]
+        action = dp.ofproto_parser.OFPActionVlanVid(vlan_vid)
+        self.add_action(dp, [action, ])
+
+    def test_action_vlan_pcp(self, dp):
+        vlan_pcp = 4
+        self._verify = [dp.ofproto.OFPAT_SET_VLAN_PCP,
+                        'vlan_pcp', vlan_pcp]
+        action = dp.ofproto_parser.OFPActionVlanPcp(vlan_pcp)
+        self.add_action(dp, [action, ])
+
+    def test_action_strip_vlan(self, dp):
+        vlan_pcp = 4
+        self._verify = [dp.ofproto.OFPAT_STRIP_VLAN,
+                        None, None]
+        action = dp.ofproto_parser.OFPActionStripVlan()
+        self.add_action(dp, [action, ])
+
+    def test_action_set_dl_src(self, dp):
+        dl_src = '56:b3:42:04:b2:7a'
+        dl_src_bin = self.haddr_to_bin(dl_src)
+        self._verify = [dp.ofproto.OFPAT_SET_DL_SRC,
+                        'dl_addr', dl_src_bin]
+        action = dp.ofproto_parser.OFPActionSetDlSrc(dl_src_bin)
+        self.add_action(dp, [action, ])
+
+    def test_action_set_dl_dst(self, dp):
+        dl_dst = 'c2:93:a2:fb:d0:f4'
+        dl_dst_bin = self.haddr_to_bin(dl_dst)
+        self._verify = [dp.ofproto.OFPAT_SET_DL_DST,
+                        'dl_addr', dl_dst_bin]
+        action = dp.ofproto_parser.OFPActionSetDlDst(dl_dst_bin)
+        self.add_action(dp, [action, ])
+
+    def test_action_set_nw_src(self, dp):
+        nw_src = '216.132.81.105'
+        nw_src_int = self.ipv4_to_int(nw_src)
+        self._verify = [dp.ofproto.OFPAT_SET_NW_SRC,
+                        'nw_addr', nw_src_int]
+        action = dp.ofproto_parser.OFPActionSetNwSrc(nw_src_int)
+        self.add_action(dp, [action, ])
+
+    def test_action_set_nw_dst(self, dp):
+        nw_dst = '223.201.206.3'
+        nw_dst_int = self.ipv4_to_int(nw_dst)
+        self._verify = [dp.ofproto.OFPAT_SET_NW_DST,
+                        'nw_addr', nw_dst_int]
+        action = dp.ofproto_parser.OFPActionSetNwDst(nw_dst_int)
+        self.add_action(dp, [action, ])
+
+    def test_action_set_nw_tos(self, dp):
+        # lowest two bits must be zero
+        nw_tos = 1 << 2
+        self._verify = [dp.ofproto.OFPAT_SET_NW_TOS,
+                        'tos', nw_tos]
+        action = dp.ofproto_parser.OFPActionSetNwTos(nw_tos)
+        self.add_action(dp, [action, ])
+
+    def test_action_set_tp_src(self, dp):
+        tp_src = 55420
+        self._verify = [dp.ofproto.OFPAT_SET_TP_SRC,
+                        'tp', tp_src]
+        action = dp.ofproto_parser.OFPActionSetTpSrc(tp_src)
+        self.add_action(dp, [action, ])
+
+    def test_action_set_tp_dst(self, dp):
+        tp_dst = 15430
+        self._verify = [dp.ofproto.OFPAT_SET_TP_DST,
+                        'tp', tp_dst]
+        action = dp.ofproto_parser.OFPActionSetTpDst(tp_dst)
+        self.add_action(dp, [action, ])
+
+    def test_action_enqueue(self, dp):
+        port = 207
+        queue_id = 4287508753
+        self._verify = [dp.ofproto.OFPAT_ENQUEUE,
+                        ['port', 'queue_id'], [port, queue_id]]
+        action = dp.ofproto_parser.OFPActionEnqueue(port, queue_id)
+        self.add_action(dp, [action, ])
+
+    # Test of Rules
+    def test_rule_set_in_port(self, dp):
+        in_port = 32
+        self._verify = ['in_port', in_port]
+
+        rule = nx_match.ClsRule()
+        rule.set_in_port(in_port)
+        self.add_rule(dp, rule)
+
+    def test_rule_set_dl_src(self, dp):
+        dl_src = 'b8:a1:94:51:78:83'
+        dl_src_bin = self.haddr_to_bin(dl_src)
+        self._verify = ['dl_src', dl_src_bin]
+
+        rule = nx_match.ClsRule()
+        rule.set_dl_src(dl_src_bin)
+        self.add_rule(dp, rule)
+
+    def test_rule_set_dl_type_ip(self, dp):
+        dl_type = ether.ETH_TYPE_IP
+        self._verify = ['dl_type', dl_type]
+
+        rule = nx_match.ClsRule()
+        rule.set_dl_type(dl_type)
+        self.add_rule(dp, rule)
+
+    def test_rule_set_dl_type_arp(self, dp):
+        dl_type = ether.ETH_TYPE_ARP
+        self._verify = ['dl_type', dl_type]
+
+        rule = nx_match.ClsRule()
+        rule.set_dl_type(dl_type)
+        self.add_rule(dp, rule)
+
+    def test_rule_set_dl_type_vlan(self, dp):
+        dl_type = ether.ETH_TYPE_8021Q
+        self._verify = ['dl_type', dl_type]
+
+        rule = nx_match.ClsRule()
+        rule.set_dl_type(dl_type)
+        self.add_rule(dp, rule)
+
+    def test_rule_set_dl_type_ipv6(self, dp):
+        dl_type = ether.ETH_TYPE_IPV6
+        self._verify = ['dl_type', dl_type]
+
+        rule = nx_match.ClsRule()
+        rule.set_dl_type(dl_type)
+        self.add_rule(dp, rule)
+
+    def test_rule_set_dl_type_lacp(self, dp):
+        dl_type = ether.ETH_TYPE_SLOW
+        self._verify = ['dl_type', dl_type]
+
+        rule = nx_match.ClsRule()
+        rule.set_dl_type(dl_type)
+        self.add_rule(dp, rule)