backing up
[vsorcdistro/.git] / ryu / build / lib.linux-armv7l-2.7 / ryu / tests / integrated / run_tests_with_ovs12.py
1 #!/usr/bin/env python
2 # Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #    http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16
17 from __future__ import print_function
18
19 import unittest
20 from nose.tools import ok_, eq_, timed, nottest
21
22 from subprocess import Popen, PIPE, STDOUT
23 import time
24
25 from mininet.net import Mininet
26 from mininet.node import RemoteController, OVSKernelSwitch
27
28 TIMEOUT = 60
29 RYU_HOST = '127.0.0.1'
30 RYU_PORT = 6633
31 PYTHON_BIN = '.venv/bin/python'
32 RYU_MGR = './bin/ryu-manager'
33
34
35 class OVS12KernelSwitch(OVSKernelSwitch):
36     """Set protocols parameter for OVS version 1.10"""
37
38     def start(self, controllers):
39         super(OVS12KernelSwitch, self).start(controllers)
40         self.cmd('ovs-vsctl set Bridge', self,
41                  "protocols='[OpenFlow10, OpenFlow12]'")
42
43
44 class TestWithOVS12(unittest.TestCase):
45     @classmethod
46     def setUpClass(cls):
47         cls.mn = Mininet()
48         c = cls.mn.addController(controller=RemoteController,
49                                  ip=RYU_HOST, port=RYU_PORT)
50         c.start()
51
52         s1 = cls.mn.addSwitch('s1', cls=OVS12KernelSwitch)
53         s1.start(cls.mn.controllers)
54
55         h1 = cls.mn.addHost('h1', ip='0.0.0.0/0')
56
57         link = cls.mn.addLink(h1, s1)
58         s1.attach(link.intf2)
59
60     @classmethod
61     def tearDownClass(cls):
62         cls.mn.stop()
63
64     @timed(TIMEOUT)
65     def test_add_flow_v10(self):
66         app = 'ryu/tests/integrated/test_add_flow_v10.py'
67         self._run_ryu_manager_and_check_output(app)
68
69     @timed(TIMEOUT)
70     def test_request_reply_v12(self):
71         app = 'ryu/tests/integrated/test_request_reply_v12.py'
72         self._run_ryu_manager_and_check_output(app)
73
74     @timed(TIMEOUT)
75     def test_add_flow_v12_actions(self):
76         app = 'ryu/tests/integrated/test_add_flow_v12_actions.py'
77         self._run_ryu_manager_and_check_output(app)
78
79     @timed(TIMEOUT)
80     def test_add_flow_v12_matches(self):
81         app = 'ryu/tests/integrated/test_add_flow_v12_matches.py'
82         self._run_ryu_manager_and_check_output(app)
83
84     @nottest
85     def test_of_config(self):
86         # OVS 1.10 does not support of_config
87         pass
88
89     def _run_ryu_manager_and_check_output(self, app):
90         cmd = [PYTHON_BIN, RYU_MGR, app]
91         p = Popen(cmd, stdout=PIPE, stderr=STDOUT)
92
93         while True:
94             if p.poll() is not None:
95                 raise Exception('Another ryu-manager already running?')
96
97             line = p.stdout.readline().strip()
98             if line == '':
99                 time.sleep(1)
100                 continue
101
102             print("ryu-manager: %s" % line)
103             if line.find('TEST_FINISHED') != -1:
104                 ok_(line.find('Completed=[True]') != -1)
105                 p.terminate()
106                 p.communicate()  # wait for subprocess is terminated
107                 break
108
109
110 if __name__ == '__main__':
111     unittest.main()