2 # Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 from __future__ import print_function
20 from nose.tools import ok_, eq_, timed, nottest
22 from subprocess import Popen, PIPE, STDOUT
25 from mininet.net import Mininet
26 from mininet.node import RemoteController, OVSKernelSwitch
29 RYU_HOST = '127.0.0.1'
31 PYTHON_BIN = '.venv/bin/python'
32 RYU_MGR = './bin/ryu-manager'
35 class OVS12KernelSwitch(OVSKernelSwitch):
36 """Set protocols parameter for OVS version 1.10"""
38 def start(self, controllers):
39 super(OVS12KernelSwitch, self).start(controllers)
40 self.cmd('ovs-vsctl set Bridge', self,
41 "protocols='[OpenFlow10, OpenFlow12]'")
44 class TestWithOVS12(unittest.TestCase):
48 c = cls.mn.addController(controller=RemoteController,
49 ip=RYU_HOST, port=RYU_PORT)
52 s1 = cls.mn.addSwitch('s1', cls=OVS12KernelSwitch)
53 s1.start(cls.mn.controllers)
55 h1 = cls.mn.addHost('h1', ip='0.0.0.0/0')
57 link = cls.mn.addLink(h1, s1)
61 def tearDownClass(cls):
65 def test_add_flow_v10(self):
66 app = 'ryu/tests/integrated/test_add_flow_v10.py'
67 self._run_ryu_manager_and_check_output(app)
70 def test_request_reply_v12(self):
71 app = 'ryu/tests/integrated/test_request_reply_v12.py'
72 self._run_ryu_manager_and_check_output(app)
75 def test_add_flow_v12_actions(self):
76 app = 'ryu/tests/integrated/test_add_flow_v12_actions.py'
77 self._run_ryu_manager_and_check_output(app)
80 def test_add_flow_v12_matches(self):
81 app = 'ryu/tests/integrated/test_add_flow_v12_matches.py'
82 self._run_ryu_manager_and_check_output(app)
85 def test_of_config(self):
86 # OVS 1.10 does not support of_config
89 def _run_ryu_manager_and_check_output(self, app):
90 cmd = [PYTHON_BIN, RYU_MGR, app]
91 p = Popen(cmd, stdout=PIPE, stderr=STDOUT)
94 if p.poll() is not None:
95 raise Exception('Another ryu-manager already running?')
97 line = p.stdout.readline().strip()
102 print("ryu-manager: %s" % line)
103 if line.find('TEST_FINISHED') != -1:
104 ok_(line.find('Completed=[True]') != -1)
106 p.communicate() # wait for subprocess is terminated
110 if __name__ == '__main__':