backing up
[vsorcdistro/.git] / ryu / build / lib.linux-armv7l-2.7 / ryu / tests / unit / controller / test_controller.py
1 # Copyright (C) 2015 Nippon Telegraph and Telephone Corporation.
2 # Copyright (C) 2015 Stratosphere Inc.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #    http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 # vim: tabstop=4 shiftwidth=4 softtabstop=4
17
18 try:
19     import mock  # Python 2
20 except ImportError:
21     from unittest import mock  # Python 3
22
23 import json
24 import os
25 import ssl
26 import sys
27 import warnings
28 import logging
29 import random
30 import unittest
31
32 from nose.tools import eq_, raises
33
34 from ryu.base import app_manager  # To suppress cyclic import
35 from ryu.controller import controller
36 from ryu.controller import handler
37 from ryu.lib import hub
38 from ryu.ofproto import ofproto_v1_3_parser
39 from ryu.ofproto import ofproto_v1_2_parser
40 from ryu.ofproto import ofproto_v1_0_parser
41 hub.patch()
42
43
44 LOG = logging.getLogger('test_controller')
45
46
47 class TestUtils(unittest.TestCase):
48     """
49     Test cases for utilities defined in controller module.
50     """
51
52     def test_split_addr_with_ipv4(self):
53         addr, port = controller._split_addr('127.0.0.1:6653')
54         eq_('127.0.0.1', addr)
55         eq_(6653, port)
56
57     def test_split_addr_with_ipv6(self):
58         addr, port = controller._split_addr('[::1]:6653')
59         eq_('::1', addr)
60         eq_(6653, port)
61
62     @raises(ValueError)
63     def test_split_addr_with_invalid_addr(self):
64         controller._split_addr('127.0.0.1')
65
66     @raises(ValueError)
67     def test_split_addr_with_invalid_ipv4_addr(self):
68         controller._split_addr('xxx.xxx.xxx.xxx:6653')
69
70     @raises(ValueError)
71     def test_split_addr_with_invalid_ipv6_addr(self):
72         controller._split_addr('[::xxxx]:6653')
73
74     @raises(ValueError)
75     def test_split_addr_with_non_bracketed_ipv6_addr(self):
76         controller._split_addr('::1:6653')
77
78
79 class Test_Datapath(unittest.TestCase):
80     """
81     Test cases for controller.Datapath
82     """
83
84     def _test_ports_accessibility(self, ofproto_parser, msgs_len):
85         with mock.patch('ryu.controller.controller.Datapath.set_state'):
86
87             # Ignore warnings
88             with warnings.catch_warnings(record=True) as msgs:
89                 warnings.simplefilter('always')
90
91                 # Test target
92                 sock_mock = mock.Mock()
93                 addr_mock = mock.Mock()
94                 dp = controller.Datapath(sock_mock, addr_mock)
95                 dp.ofproto_parser = ofproto_parser
96
97                 # Create
98                 dp.ports = {}
99
100                 # Update
101                 port_mock = mock.Mock()
102                 dp.ports[0] = port_mock
103
104                 # Read & Delete
105                 del dp.ports[0]
106
107                 self.assertEqual(len(msgs), msgs_len)
108                 for msg in msgs:
109                     self.assertTrue(issubclass(msg.category, UserWarning))
110
111     def test_ports_accessibility_v13(self):
112         self._test_ports_accessibility(ofproto_v1_3_parser, 0)
113
114     def test_ports_accessibility_v12(self):
115         self._test_ports_accessibility(ofproto_v1_2_parser, 0)
116
117     def test_ports_accessibility_v10(self):
118         self._test_ports_accessibility(ofproto_v1_0_parser, 0)
119
120     @mock.patch("ryu.base.app_manager", spec=app_manager)
121     def test_recv_loop(self, app_manager_mock):
122         # Prepare test data
123         test_messages = [
124             "4-6-ofp_features_reply.packet",
125             "4-14-ofp_echo_reply.packet",
126             "4-14-ofp_echo_reply.packet",
127             "4-4-ofp_packet_in.packet",
128             "4-14-ofp_echo_reply.packet",
129             "4-14-ofp_echo_reply.packet",
130         ]
131         this_dir = os.path.dirname(sys.modules[__name__].__file__)
132         packet_data_dir = os.path.join(this_dir, '../../packet_data/of13')
133         json_dir = os.path.join(this_dir, '../ofproto/json/of13')
134
135         packet_buf = bytearray()
136         expected_json = list()
137         for msg in test_messages:
138             # Construct the received packet buffer as one packet data in order
139             # to test the case of the OpenFlow messages composed in one packet.
140             packet_data_file = os.path.join(packet_data_dir, msg)
141             packet_buf += open(packet_data_file, 'rb').read()
142             json_data_file = os.path.join(json_dir, msg + '.json')
143             expected_json.append(json.load(open(json_data_file)))
144
145         # Prepare mock for socket
146         class SocketMock(mock.MagicMock):
147             buf = bytearray()
148             random = None
149
150             def recv(self, bufsize):
151                 size = self.random.randint(1, bufsize)
152                 out = self.buf[:size]
153                 self.buf = self.buf[size:]
154                 return out
155
156         # Prepare mock
157         ofp_brick_mock = mock.MagicMock(spec=app_manager.RyuApp)
158         app_manager_mock.lookup_service_brick.return_value = ofp_brick_mock
159         sock_mock = SocketMock()
160         sock_mock.buf = packet_buf
161         sock_mock.random = random.Random('Ryu SDN Framework')
162         addr_mock = mock.MagicMock()
163
164         # Prepare test target
165         dp = controller.Datapath(sock_mock, addr_mock)
166         dp.set_state(handler.MAIN_DISPATCHER)
167         ofp_brick_mock.reset_mock()
168
169         # Test
170         dp._recv_loop()
171
172         # Assert calls
173         output_json = list()
174         for call in ofp_brick_mock.send_event_to_observers.call_args_list:
175             args, kwargs = call
176             ev, state = args
177             if not hasattr(ev, 'msg'):
178                 continue
179             output_json.append(ev.msg.to_jsondict())
180             self.assertEqual(state, handler.MAIN_DISPATCHER)
181             self.assertEqual(kwargs, {})
182         self.assertEqual(expected_json, output_json)
183
184
185 class TestOpenFlowController(unittest.TestCase):
186     """
187     Test cases for OpenFlowController
188     """
189     @mock.patch("ryu.controller.controller.CONF")
190     def _test_ssl(self, this_dir, port, conf_mock):
191         conf_mock.ofp_ssl_listen_port = port
192         conf_mock.ofp_listen_host = "127.0.0.1"
193         conf_mock.ca_certs = None
194         conf_mock.ctl_cert = os.path.join(this_dir, 'cert.crt')
195         conf_mock.ctl_privkey = os.path.join(this_dir, 'cert.key')
196         c = controller.OpenFlowController()
197         c()
198
199     def test_ssl(self):
200         """Tests SSL server functionality."""
201         # TODO: TLS version enforcement is necessary to avoid
202         # vulnerable versions. Currently, this only tests TLS
203         # connectivity.
204         this_dir = os.path.dirname(sys.modules[__name__].__file__)
205         saved_exception = None
206         try:
207             ssl_version = ssl.PROTOCOL_TLS
208         except AttributeError:
209             # For compatibility with older pythons.
210             ssl_version = ssl.PROTOCOL_TLSv1
211         for i in range(3):
212             try:
213                 # Try a few times as this can fail with EADDRINUSE
214                 port = random.randint(5000, 10000)
215                 server = hub.spawn(self._test_ssl, this_dir, port)
216                 hub.sleep(1)
217                 client = hub.StreamClient(("127.0.0.1", port),
218                                           timeout=5,
219                                           ssl_version=ssl_version)
220                 if client.connect() is not None:
221                     break
222             except Exception as e:
223                 saved_exception = e
224                 continue
225             finally:
226                 try:
227                     hub.kill(server)
228                 except Exception:
229                     pass
230         else:
231             self.fail("Failed to connect: " + str(saved_exception))