backing up
[vsorcdistro/.git] / ryu / build / lib.linux-armv7l-2.7 / ryu / tests / switch / tester.py
1 # Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #    http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 import binascii
17 import inspect
18 import json
19 import logging
20 import math
21 import netaddr
22 import os
23 import signal
24 import six
25 import sys
26 import time
27 import traceback
28 from random import randint
29
30 from ryu import cfg
31 from ryu.base import app_manager
32 from ryu.controller import handler
33 from ryu.controller import ofp_event
34 from ryu.controller.handler import set_ev_cls
35 from ryu.exception import RyuException
36 from ryu.lib import dpid as dpid_lib
37 from ryu.lib import hub
38 from ryu.lib import stringify
39 from ryu.lib.packet import packet
40 from ryu.ofproto import ofproto_parser
41 from ryu.ofproto import ofproto_protocol
42 from ryu.ofproto import ofproto_v1_0
43 from ryu.ofproto import ofproto_v1_2
44 from ryu.ofproto import ofproto_v1_3
45 from ryu.ofproto import ofproto_v1_4
46 from ryu.ofproto import ofproto_v1_5
47
48 # import all packet libraries.
49 PKT_LIB_PATH = 'ryu.lib.packet'
50 CLSNAME_ALIASES = {
51     ('ryu.lib.packet.ipv6', 'option'): 'ipv6option',
52     ('ryu.lib.packet.icmpv6', 'echo'): 'icmpv6echo',
53     ('ryu.lib.packet.bgp', 'StreamParser'): '',
54     ('ryu.lib.packet.bgp', 'StringifyMixin'): '',
55     ('ryu.lib.packet.dhcp', 'option'): 'dhcpoption',
56     ('ryu.lib.packet.dhcp', 'options'): 'dhcpoptions',
57     ('ryu.lib.packet.ospf', 'StringifyMixin'): ''
58 }
59
60 for modname, moddef in sys.modules.items():
61     if not modname.startswith(PKT_LIB_PATH) or not moddef:
62         continue
63     for (clsname, clsdef, ) in inspect.getmembers(moddef):
64         if not inspect.isclass(clsdef):
65             continue
66         clsname_alias = CLSNAME_ALIASES.get((modname, clsname))
67         if clsname_alias == '':
68             continue
69         elif clsname_alias is not None:
70             exec('from %s import %s as %s' % (modname, clsname, clsname_alias))
71         else:
72             assert clsname not in globals(), (
73                 "%s.%s already defined" % (modname, clsname))
74             exec('from %s import %s' % (modname, clsname))
75
76
77 """ Required test network:
78
79                       +-------------------+
80            +----------|     target sw     | The switch to be tested
81            |          +-------------------+
82     +------------+      (1)   (2)   (3)
83     | controller |       |     |     |
84     +------------+      (1)   (2)   (3)
85            |          +-------------------+
86            +----------|     tester sw     | OpenFlow Switch
87                       +-------------------+
88
89       (X) : port number
90
91     Tests send a packet from port 1 of the tester sw.
92     If the packet matched with a flow entry of the target sw,
93      the target sw resends the packet from port 2 (or the port which
94      connected with the controller), according to the flow entry.
95     Then the tester sw receives the packet and sends a PacketIn message.
96     If the packet did not match, the target sw drops the packet.
97
98     If you want to use the other port number which differ from above chart,
99     you can specify the port number in the options when this tool is started.
100     For details of this options, please refer to the Help command.
101     Also, if you describe the name of an option argument
102     (e.g. "target_send_port_1") in test files,
103     this tool sets the argument value in the port number.
104
105         e.g.)
106             "OFPActionOutput":{
107                 "port":"target_send_port_1"
108             }
109
110 """
111
112
113 CONF = cfg.CONF
114
115
116 # Default settings.
117 INTERVAL = 1  # sec
118 WAIT_TIMER = 3  # sec
119 CONTINUOUS_THREAD_INTVL = float(0.01)  # sec
120 CONTINUOUS_PROGRESS_SPAN = 3  # sec
121 THROUGHPUT_PRIORITY = ofproto_v1_3.OFP_DEFAULT_PRIORITY + 1
122 THROUGHPUT_COOKIE = THROUGHPUT_PRIORITY
123 THROUGHPUT_THRESHOLD = float(0.10)  # expected throughput plus/minus 10 %
124
125 # Default settings for 'ingress: packets'
126 DEFAULT_DURATION_TIME = 30
127 DEFAULT_PKTPS = 1000
128
129 # Test file format.
130 KEY_DESC = 'description'
131 KEY_PREREQ = 'prerequisite'
132 KEY_FLOW = 'OFPFlowMod'
133 KEY_METER = 'OFPMeterMod'
134 KEY_GROUP = 'OFPGroupMod'
135 KEY_TESTS = 'tests'
136 KEY_INGRESS = 'ingress'
137 KEY_EGRESS = 'egress'
138 KEY_PKT_IN = 'PACKET_IN'
139 KEY_TBL_MISS = 'table-miss'
140 KEY_PACKETS = 'packets'
141 KEY_DATA = 'data'
142 KEY_KBPS = 'kbps'
143 KEY_PKTPS = 'pktps'
144 KEY_DURATION_TIME = 'duration_time'
145 KEY_THROUGHPUT = 'throughput'
146 KEY_MATCH = 'OFPMatch'
147
148 # Test state.
149 STATE_INIT_FLOW = 0
150 STATE_FLOW_INSTALL = 1
151 STATE_FLOW_EXIST_CHK = 2
152 STATE_TARGET_PKT_COUNT = 3
153 STATE_TESTER_PKT_COUNT = 4
154 STATE_FLOW_MATCH_CHK = 5
155 STATE_NO_PKTIN_REASON = 6
156 STATE_GET_MATCH_COUNT = 7
157 STATE_SEND_BARRIER = 8
158 STATE_FLOW_UNMATCH_CHK = 9
159 STATE_INIT_METER = 10
160 STATE_METER_INSTALL = 11
161 STATE_METER_EXIST_CHK = 12
162 STATE_INIT_THROUGHPUT_FLOW = 13
163 STATE_THROUGHPUT_FLOW_INSTALL = 14
164 STATE_THROUGHPUT_FLOW_EXIST_CHK = 15
165 STATE_GET_THROUGHPUT = 16
166 STATE_THROUGHPUT_CHK = 17
167 STATE_INIT_GROUP = 18
168 STATE_GROUP_INSTALL = 19
169 STATE_GROUP_EXIST_CHK = 20
170
171 STATE_DISCONNECTED = 99
172
173 # Test result.
174 TEST_OK = 'OK'
175 TEST_ERROR = 'ERROR'
176 RYU_INTERNAL_ERROR = '- (Ryu internal error.)'
177 TEST_FILE_ERROR = '%(file)s : Test file format error (%(detail)s)'
178 NO_TEST_FILE = 'Test file (*.json) is not found.'
179 INVALID_PATH = '%(path)s : No such file or directory.'
180
181 # Test result details.
182 FAILURE = 0
183 ERROR = 1
184 TIMEOUT = 2
185 RCV_ERR = 3
186
187 MSG = {STATE_INIT_FLOW:
188        {TIMEOUT: 'Failed to initialize flow tables: barrier request timeout.',
189         RCV_ERR: 'Failed to initialize flow tables: %(err_msg)s'},
190        STATE_INIT_THROUGHPUT_FLOW:
191        {TIMEOUT: 'Failed to initialize flow tables of tester_sw: '
192                  'barrier request timeout.',
193         RCV_ERR: 'Failed to initialize flow tables of tester_sw: '
194                  '%(err_msg)s'},
195        STATE_FLOW_INSTALL:
196        {TIMEOUT: 'Failed to add flows: barrier request timeout.',
197         RCV_ERR: 'Failed to add flows: %(err_msg)s'},
198        STATE_THROUGHPUT_FLOW_INSTALL:
199        {TIMEOUT: 'Failed to add flows to tester_sw: barrier request timeout.',
200         RCV_ERR: 'Failed to add flows to tester_sw: %(err_msg)s'},
201        STATE_METER_INSTALL:
202        {TIMEOUT: 'Failed to add meters: barrier request timeout.',
203         RCV_ERR: 'Failed to add meters: %(err_msg)s'},
204        STATE_GROUP_INSTALL:
205        {TIMEOUT: 'Failed to add groups: barrier request timeout.',
206         RCV_ERR: 'Failed to add groups: %(err_msg)s'},
207        STATE_FLOW_EXIST_CHK:
208        {FAILURE: 'Added incorrect flows: %(flows)s',
209         TIMEOUT: 'Failed to add flows: flow stats request timeout.',
210         RCV_ERR: 'Failed to add flows: %(err_msg)s'},
211        STATE_METER_EXIST_CHK:
212        {FAILURE: 'Added incorrect meters: %(meters)s',
213         TIMEOUT: 'Failed to add meters: meter config stats request timeout.',
214         RCV_ERR: 'Failed to add meters: %(err_msg)s'},
215        STATE_GROUP_EXIST_CHK:
216        {FAILURE: 'Added incorrect groups: %(groups)s',
217         TIMEOUT: 'Failed to add groups: group desc stats request timeout.',
218         RCV_ERR: 'Failed to add groups: %(err_msg)s'},
219        STATE_TARGET_PKT_COUNT:
220        {TIMEOUT: 'Failed to request port stats from target: request timeout.',
221         RCV_ERR: 'Failed to request port stats from target: %(err_msg)s'},
222        STATE_TESTER_PKT_COUNT:
223        {TIMEOUT: 'Failed to request port stats from tester: request timeout.',
224         RCV_ERR: 'Failed to request port stats from tester: %(err_msg)s'},
225        STATE_FLOW_MATCH_CHK:
226        {FAILURE: 'Received incorrect %(pkt_type)s: %(detail)s',
227         TIMEOUT: '',  # for check no packet-in reason.
228         RCV_ERR: 'Failed to send packet: %(err_msg)s'},
229        STATE_NO_PKTIN_REASON:
230        {FAILURE: 'Receiving timeout: %(detail)s'},
231        STATE_GET_MATCH_COUNT:
232        {TIMEOUT: 'Failed to request table stats: request timeout.',
233         RCV_ERR: 'Failed to request table stats: %(err_msg)s'},
234        STATE_SEND_BARRIER:
235        {TIMEOUT: 'Failed to send packet: barrier request timeout.',
236         RCV_ERR: 'Failed to send packet: %(err_msg)s'},
237        STATE_FLOW_UNMATCH_CHK:
238        {FAILURE: 'Table-miss error: increment in matched_count.',
239         ERROR: 'Table-miss error: no change in lookup_count.',
240         TIMEOUT: 'Failed to request table stats: request timeout.',
241         RCV_ERR: 'Failed to request table stats: %(err_msg)s'},
242        STATE_THROUGHPUT_FLOW_EXIST_CHK:
243        {FAILURE: 'Added incorrect flows to tester_sw: %(flows)s',
244         TIMEOUT: 'Failed to add flows to tester_sw: '
245                  'flow stats request timeout.',
246         RCV_ERR: 'Failed to add flows to tester_sw: %(err_msg)s'},
247        STATE_GET_THROUGHPUT:
248        {TIMEOUT: 'Failed to request flow stats: request timeout.',
249         RCV_ERR: 'Failed to request flow stats: %(err_msg)s'},
250        STATE_THROUGHPUT_CHK:
251        {FAILURE: 'Received unexpected throughput: %(detail)s'},
252        STATE_DISCONNECTED:
253        {ERROR: 'Disconnected from switch'}}
254
255 ERR_MSG = 'OFPErrorMsg[type=0x%02x, code=0x%02x]'
256
257
258 class TestMessageBase(RyuException):
259     def __init__(self, state, message_type, **argv):
260         msg = MSG[state][message_type] % argv
261         super(TestMessageBase, self).__init__(msg=msg)
262
263
264 class TestFailure(TestMessageBase):
265     def __init__(self, state, **argv):
266         super(TestFailure, self).__init__(state, FAILURE, **argv)
267
268
269 class TestTimeout(TestMessageBase):
270     def __init__(self, state):
271         super(TestTimeout, self).__init__(state, TIMEOUT)
272
273
274 class TestReceiveError(TestMessageBase):
275     def __init__(self, state, err_msg):
276         argv = {'err_msg': ERR_MSG % (err_msg.type, err_msg.code)}
277         super(TestReceiveError, self).__init__(state, RCV_ERR, **argv)
278
279
280 class TestError(TestMessageBase):
281     def __init__(self, state, **argv):
282         super(TestError, self).__init__(state, ERROR, **argv)
283
284
285 class OfTester(app_manager.RyuApp):
286     """ OpenFlow Switch Tester. """
287
288     tester_ver = None
289     target_ver = None
290
291     def __init__(self):
292         super(OfTester, self).__init__()
293         self._set_logger()
294
295         self.interval = CONF['test-switch']['interval']
296         self.target_dpid = self._convert_dpid(CONF['test-switch']['target'])
297         self.target_send_port_1 = CONF['test-switch']['target_send_port_1']
298         self.target_send_port_2 = CONF['test-switch']['target_send_port_2']
299         self.target_recv_port = CONF['test-switch']['target_recv_port']
300         self.tester_dpid = self._convert_dpid(CONF['test-switch']['tester'])
301         self.tester_send_port = CONF['test-switch']['tester_send_port']
302         self.tester_recv_port_1 = CONF['test-switch']['tester_recv_port_1']
303         self.tester_recv_port_2 = CONF['test-switch']['tester_recv_port_2']
304         self.logger.info('target_dpid=%s',
305                          dpid_lib.dpid_to_str(self.target_dpid))
306         self.logger.info('tester_dpid=%s',
307                          dpid_lib.dpid_to_str(self.tester_dpid))
308
309         def __get_version(opt):
310             vers = {
311                 'openflow10': ofproto_v1_0.OFP_VERSION,
312                 'openflow13': ofproto_v1_3.OFP_VERSION,
313                 'openflow14': ofproto_v1_4.OFP_VERSION,
314                 'openflow15': ofproto_v1_5.OFP_VERSION
315             }
316             ver = vers.get(opt.lower())
317             if ver is None:
318                 self.logger.error(
319                     '%s is not supported. '
320                     'Supported versions are %s.',
321                     opt, list(vers.keys()))
322                 self._test_end()
323             return ver
324
325         target_opt = CONF['test-switch']['target_version']
326         self.logger.info('target ofp version=%s', target_opt)
327         OfTester.target_ver = __get_version(target_opt)
328         tester_opt = CONF['test-switch']['tester_version']
329         self.logger.info('tester ofp version=%s', tester_opt)
330         OfTester.tester_ver = __get_version(tester_opt)
331         # set app_supported_versions later.
332         ofproto_protocol.set_app_supported_versions(
333             [OfTester.target_ver, OfTester.tester_ver])
334
335         test_dir = CONF['test-switch']['dir']
336         self.logger.info('Test files directory = %s', test_dir)
337
338         self.target_sw = OpenFlowSw(DummyDatapath(), self.logger)
339         self.tester_sw = OpenFlowSw(DummyDatapath(), self.logger)
340         self.state = STATE_INIT_FLOW
341         self.sw_waiter = None
342         self.waiter = None
343         self.send_msg_xids = []
344         self.rcv_msgs = []
345         self.ingress_event = None
346         self.ingress_threads = []
347         self.thread_msg = None
348         self.test_thread = hub.spawn(
349             self._test_sequential_execute, test_dir)
350
351     def _set_logger(self):
352         self.logger.propagate = False
353         s_hdlr = logging.StreamHandler()
354         self.logger.addHandler(s_hdlr)
355         if CONF.log_file:
356             f_hdlr = logging.handlers.WatchedFileHandler(CONF.log_file)
357             self.logger.addHandler(f_hdlr)
358
359     def _convert_dpid(self, dpid_str):
360         try:
361             return int(dpid_str, 16)
362         except ValueError as err:
363             self.logger.error('Invarid dpid parameter. %s', err)
364             self._test_end()
365
366     def close(self):
367         if self.test_thread is not None:
368             hub.kill(self.test_thread)
369         if self.ingress_event:
370             self.ingress_event.set()
371         hub.joinall([self.test_thread])
372         self._test_end('--- Test terminated ---')
373
374     @set_ev_cls(ofp_event.EventOFPStateChange,
375                 [handler.MAIN_DISPATCHER, handler.DEAD_DISPATCHER])
376     def dispatcher_change(self, ev):
377         assert ev.datapath is not None
378         if ev.state == handler.MAIN_DISPATCHER:
379             self._register_sw(ev.datapath)
380         elif ev.state == handler.DEAD_DISPATCHER:
381             self._unregister_sw(ev.datapath)
382
383     def _register_sw(self, dp):
384         vers = {
385             ofproto_v1_0.OFP_VERSION: 'openflow10',
386             ofproto_v1_3.OFP_VERSION: 'openflow13',
387             ofproto_v1_4.OFP_VERSION: 'openflow14',
388             ofproto_v1_5.OFP_VERSION: 'openflow15'
389         }
390         if dp.id == self.target_dpid:
391             if dp.ofproto.OFP_VERSION != OfTester.target_ver:
392                 msg = 'Join target SW, but ofp version is not %s.' % \
393                     vers[OfTester.target_ver]
394             else:
395                 self.target_sw.dp = dp
396                 msg = 'Join target SW.'
397         elif dp.id == self.tester_dpid:
398             if dp.ofproto.OFP_VERSION != OfTester.tester_ver:
399                 msg = 'Join tester SW, but ofp version is not %s.' % \
400                     vers[OfTester.tester_ver]
401             else:
402                 self.tester_sw.dp = dp
403                 msg = 'Join tester SW.'
404         else:
405             msg = 'Connect unknown SW.'
406         if dp.id:
407             self.logger.info('dpid=%s : %s',
408                              dpid_lib.dpid_to_str(dp.id), msg)
409
410         if not (isinstance(self.target_sw.dp, DummyDatapath) or
411                 isinstance(self.tester_sw.dp, DummyDatapath)):
412             if self.sw_waiter is not None:
413                 self.sw_waiter.set()
414
415     def _unregister_sw(self, dp):
416         if dp.id == self.target_dpid:
417             self.target_sw.dp = DummyDatapath()
418             msg = 'Leave target SW.'
419         elif dp.id == self.tester_dpid:
420             self.tester_sw.dp = DummyDatapath()
421             msg = 'Leave tester SW.'
422         else:
423             msg = 'Disconnect unknown SW.'
424         if dp.id:
425             self.logger.info('dpid=%s : %s',
426                              dpid_lib.dpid_to_str(dp.id), msg)
427
428     def _test_sequential_execute(self, test_dir):
429         """ Execute OpenFlow Switch test. """
430         # Parse test pattern from test files.
431         tests = TestPatterns(test_dir, self.logger)
432         if not tests:
433             self.logger.warning(NO_TEST_FILE)
434             self._test_end()
435
436         test_report = {}
437         self.logger.info('--- Test start ---')
438         test_keys = list(tests.keys())
439         test_keys.sort()
440         for file_name in test_keys:
441             report = self._test_file_execute(tests[file_name])
442             for result, descriptions in report.items():
443                 test_report.setdefault(result, [])
444                 test_report[result].extend(descriptions)
445         self._test_end(msg='---  Test end  ---', report=test_report)
446
447     def _test_file_execute(self, testfile):
448         report = {}
449         for i, test in enumerate(testfile.tests):
450             desc = testfile.description if i == 0 else None
451             result = self._test_execute(test, desc)
452             report.setdefault(result, [])
453             report[result].append([testfile.description, test.description])
454             hub.sleep(self.interval)
455         return report
456
457     def _test_execute(self, test, description):
458         if isinstance(self.target_sw.dp, DummyDatapath) or \
459                 isinstance(self.tester_sw.dp, DummyDatapath):
460             self.logger.info('waiting for switches connection...')
461             self.sw_waiter = hub.Event()
462             self.sw_waiter.wait()
463             self.sw_waiter = None
464
465         if description:
466             self.logger.info('%s', description)
467         self.thread_msg = None
468
469         # Test execute.
470         try:
471             # Initialize.
472             self._test(STATE_INIT_METER)
473             self._test(STATE_INIT_GROUP)
474             self._test(STATE_INIT_FLOW, self.target_sw)
475             self._test(STATE_INIT_THROUGHPUT_FLOW, self.tester_sw)
476
477             # Install flows.
478             for flow in test.prerequisite:
479                 if isinstance(
480                         flow, self.target_sw.dp.ofproto_parser.OFPFlowMod):
481                     self._test(STATE_FLOW_INSTALL, self.target_sw, flow)
482                     self._test(STATE_FLOW_EXIST_CHK,
483                                self.target_sw.send_flow_stats, flow)
484                 elif isinstance(
485                         flow, self.target_sw.dp.ofproto_parser.OFPMeterMod):
486                     self._test(STATE_METER_INSTALL, self.target_sw, flow)
487                     self._test(STATE_METER_EXIST_CHK,
488                                self.target_sw.send_meter_config_stats, flow)
489                 elif isinstance(
490                         flow, self.target_sw.dp.ofproto_parser.OFPGroupMod):
491                     self._test(STATE_GROUP_INSTALL, self.target_sw, flow)
492                     self._test(STATE_GROUP_EXIST_CHK,
493                                self.target_sw.send_group_desc_stats, flow)
494             # Do tests.
495             for pkt in test.tests:
496
497                 # Get stats before sending packet(s).
498                 if KEY_EGRESS in pkt or KEY_PKT_IN in pkt:
499                     target_pkt_count = [self._test(STATE_TARGET_PKT_COUNT,
500                                                    True)]
501                     tester_pkt_count = [self._test(STATE_TESTER_PKT_COUNT,
502                                                    False)]
503                 elif KEY_THROUGHPUT in pkt:
504                     # install flows for throughput analysis
505                     for throughput in pkt[KEY_THROUGHPUT]:
506                         flow = throughput[KEY_FLOW]
507                         self._test(STATE_THROUGHPUT_FLOW_INSTALL,
508                                    self.tester_sw, flow)
509                         self._test(STATE_THROUGHPUT_FLOW_EXIST_CHK,
510                                    self.tester_sw.send_flow_stats, flow)
511                     start = self._test(STATE_GET_THROUGHPUT)
512                 elif KEY_TBL_MISS in pkt:
513                     before_stats = self._test(STATE_GET_MATCH_COUNT)
514
515                 # Send packet(s).
516                 if KEY_INGRESS in pkt:
517                     self._one_time_packet_send(pkt)
518                 elif KEY_PACKETS in pkt:
519                     self._continuous_packet_send(pkt)
520
521                 # Check a result.
522                 if KEY_EGRESS in pkt or KEY_PKT_IN in pkt:
523                     result = self._test(STATE_FLOW_MATCH_CHK, pkt)
524                     if result == TIMEOUT:
525                         target_pkt_count.append(self._test(
526                             STATE_TARGET_PKT_COUNT, True))
527                         tester_pkt_count.append(self._test(
528                             STATE_TESTER_PKT_COUNT, False))
529                         test_type = (KEY_EGRESS if KEY_EGRESS in pkt
530                                      else KEY_PKT_IN)
531                         self._test(STATE_NO_PKTIN_REASON, test_type,
532                                    target_pkt_count, tester_pkt_count)
533                 elif KEY_THROUGHPUT in pkt:
534                     end = self._test(STATE_GET_THROUGHPUT)
535                     self._test(STATE_THROUGHPUT_CHK, pkt[KEY_THROUGHPUT],
536                                start, end)
537                 elif KEY_TBL_MISS in pkt:
538                     self._test(STATE_SEND_BARRIER)
539                     hub.sleep(INTERVAL)
540                     self._test(STATE_FLOW_UNMATCH_CHK, before_stats, pkt)
541
542             result = [TEST_OK]
543             result_type = TEST_OK
544         except (TestFailure, TestError,
545                 TestTimeout, TestReceiveError) as err:
546             result = [TEST_ERROR, str(err)]
547             result_type = str(err).split(':', 1)[0]
548         finally:
549             self.ingress_event = None
550             for tid in self.ingress_threads:
551                 hub.kill(tid)
552             self.ingress_threads = []
553
554         # Output test result.
555         self.logger.info('    %-100s %s', test.description, result[0])
556         if 1 < len(result):
557             self.logger.info('        %s', result[1])
558             if result[1] == RYU_INTERNAL_ERROR\
559                     or result == 'An unknown exception':
560                 self.logger.error(traceback.format_exc())
561
562         hub.sleep(0)
563         return result_type
564
565     def _test_end(self, msg=None, report=None):
566         self.test_thread = None
567         if msg:
568             self.logger.info(msg)
569         if report:
570             self._output_test_report(report)
571         pid = os.getpid()
572         os.kill(pid, signal.SIGTERM)
573
574     def _output_test_report(self, report):
575         self.logger.info('%s--- Test report ---', os.linesep)
576         error_count = 0
577         for result_type in sorted(list(report.keys())):
578             test_descriptions = report[result_type]
579             if result_type == TEST_OK:
580                 continue
581             error_count += len(test_descriptions)
582             self.logger.info('%s(%d)', result_type, len(test_descriptions))
583             for file_desc, test_desc in test_descriptions:
584                 self.logger.info('    %-40s %s', file_desc, test_desc)
585         self.logger.info('%s%s(%d) / %s(%d)', os.linesep,
586                          TEST_OK, len(report.get(TEST_OK, [])),
587                          TEST_ERROR, error_count)
588
589     def _test(self, state, *args):
590         test = {STATE_INIT_FLOW: self._test_initialize_flow,
591                 STATE_INIT_THROUGHPUT_FLOW: self._test_initialize_flow,
592                 STATE_INIT_METER: self.target_sw.del_meters,
593                 STATE_INIT_GROUP: self.target_sw.del_groups,
594                 STATE_FLOW_INSTALL: self._test_msg_install,
595                 STATE_THROUGHPUT_FLOW_INSTALL: self._test_msg_install,
596                 STATE_METER_INSTALL: self._test_msg_install,
597                 STATE_GROUP_INSTALL: self._test_msg_install,
598                 STATE_FLOW_EXIST_CHK: self._test_exist_check,
599                 STATE_THROUGHPUT_FLOW_EXIST_CHK: self._test_exist_check,
600                 STATE_METER_EXIST_CHK: self._test_exist_check,
601                 STATE_GROUP_EXIST_CHK: self._test_exist_check,
602                 STATE_TARGET_PKT_COUNT: self._test_get_packet_count,
603                 STATE_TESTER_PKT_COUNT: self._test_get_packet_count,
604                 STATE_FLOW_MATCH_CHK: self._test_flow_matching_check,
605                 STATE_NO_PKTIN_REASON: self._test_no_pktin_reason_check,
606                 STATE_GET_MATCH_COUNT: self._test_get_match_count,
607                 STATE_SEND_BARRIER: self._test_send_barrier,
608                 STATE_FLOW_UNMATCH_CHK: self._test_flow_unmatching_check,
609                 STATE_GET_THROUGHPUT: self._test_get_throughput,
610                 STATE_THROUGHPUT_CHK: self._test_throughput_check}
611
612         self.send_msg_xids = []
613         self.rcv_msgs = []
614
615         self.state = state
616         return test[state](*args)
617
618     def _test_initialize_flow(self, datapath):
619         # Note: Because DELETE and DELETE_STRICT commands in OpenFlow 1.0
620         # can not be filtered by the cookie value, this tool deletes all
621         # flow entries of the tester switch temporarily and inserts default
622         # flow entry immediately.
623         xid = datapath.del_flows()
624         self.send_msg_xids.append(xid)
625
626         xid = datapath.add_flow(
627             in_port=self.tester_recv_port_1,
628             out_port=datapath.dp.ofproto.OFPP_CONTROLLER)
629         self.send_msg_xids.append(xid)
630
631         xid = datapath.send_barrier_request()
632         self.send_msg_xids.append(xid)
633
634         self._wait()
635         assert len(self.rcv_msgs) == 1
636         msg = self.rcv_msgs[0]
637         assert isinstance(msg, datapath.dp.ofproto_parser.OFPBarrierReply)
638
639     def _test_msg_install(self, datapath, message):
640         xid = datapath.send_msg(message)
641         self.send_msg_xids.append(xid)
642
643         xid = datapath.send_barrier_request()
644         self.send_msg_xids.append(xid)
645
646         self._wait()
647         assert len(self.rcv_msgs) == 1
648         msg = self.rcv_msgs[0]
649         assert isinstance(msg, datapath.dp.ofproto_parser.OFPBarrierReply)
650
651     def _test_exist_check(self, method, message):
652         ofp = method.__self__.dp.ofproto
653         parser = method.__self__.dp.ofproto_parser
654         method_dict = {
655             OpenFlowSw.send_flow_stats.__name__: {
656                 'reply': parser.OFPFlowStatsReply,
657                 'compare': self._compare_flow
658             }
659         }
660         if ofp.OFP_VERSION >= ofproto_v1_2.OFP_VERSION:
661             method_dict[OpenFlowSw.send_group_desc_stats.__name__] = {
662                 'reply': parser.OFPGroupDescStatsReply,
663                 'compare': self._compare_group
664             }
665         if ofp.OFP_VERSION >= ofproto_v1_3.OFP_VERSION:
666             method_dict[OpenFlowSw.send_meter_config_stats.__name__] = {
667                 'reply': parser.OFPMeterConfigStatsReply,
668                 'compare': self._compare_meter
669             }
670         xid = method()
671         self.send_msg_xids.append(xid)
672         self._wait()
673
674         ng_stats = []
675         for msg in self.rcv_msgs:
676             assert isinstance(msg, method_dict[method.__name__]['reply'])
677             for stats in msg.body:
678                 result, stats = method_dict[method.__name__]['compare'](
679                     stats, message)
680                 if result:
681                     return
682                 else:
683                     ng_stats.append(stats)
684
685         error_dict = {
686             OpenFlowSw.send_flow_stats.__name__: {
687                 'flows': ', '.join(ng_stats)
688             }
689         }
690         if ofp.OFP_VERSION >= ofproto_v1_2.OFP_VERSION:
691             error_dict[OpenFlowSw.send_group_desc_stats.__name__] = {
692                 'groups': ', '.join(ng_stats)
693             }
694         if ofp.OFP_VERSION >= ofproto_v1_3.OFP_VERSION:
695             error_dict[OpenFlowSw.send_meter_config_stats.__name__] = {
696                 'meters': ', '.join(ng_stats)
697             }
698         raise TestFailure(self.state, **error_dict[method.__name__])
699
700     def _test_get_packet_count(self, is_target):
701         sw = self.target_sw if is_target else self.tester_sw
702         xid = sw.send_port_stats()
703         self.send_msg_xids.append(xid)
704         self._wait()
705         result = {}
706         for msg in self.rcv_msgs:
707             for stats in msg.body:
708                 result[stats.port_no] = {'rx': stats.rx_packets,
709                                          'tx': stats.tx_packets}
710         return result
711
712     def _test_flow_matching_check(self, pkt):
713         self.logger.debug("egress:[%s]", packet.Packet(pkt.get(KEY_EGRESS)))
714         self.logger.debug("packet_in:[%s]",
715                           packet.Packet(pkt.get(KEY_PKT_IN)))
716
717         # receive a PacketIn message.
718         try:
719             self._wait()
720         except TestTimeout:
721             return TIMEOUT
722
723         assert len(self.rcv_msgs) == 1
724         msg = self.rcv_msgs[0]
725         # Compare a received message with OFPPacketIn
726         #
727         # We compare names of classes instead of classes themselves
728         # due to OVS bug. The code below should be as follows:
729         #
730         # assert isinstance(msg, msg.datapath.ofproto_parser.OFPPacketIn)
731         #
732         # At this moment, OVS sends Packet-In messages of of13 even if
733         # OVS is configured to use of14, so the above code causes an
734         # assertion.
735         assert msg.__class__.__name__ == 'OFPPacketIn'
736         self.logger.debug("dpid=%s : receive_packet[%s]",
737                           dpid_lib.dpid_to_str(msg.datapath.id),
738                           packet.Packet(msg.data))
739
740         # check the SW which sended PacketIn and output packet.
741         pkt_in_src_model = (self.tester_sw if KEY_EGRESS in pkt
742                             else self.target_sw)
743         model_pkt = (pkt[KEY_EGRESS] if KEY_EGRESS in pkt
744                      else pkt[KEY_PKT_IN])
745
746         if hasattr(msg.datapath.ofproto, "OFPR_NO_MATCH"):
747             invalid_packet_in_reason = [msg.datapath.ofproto.OFPR_NO_MATCH]
748         else:
749             invalid_packet_in_reason = [msg.datapath.ofproto.OFPR_TABLE_MISS]
750         if hasattr(msg.datapath.ofproto, "OFPR_INVALID_TTL"):
751             invalid_packet_in_reason.append(
752                 msg.datapath.ofproto.OFPR_INVALID_TTL)
753
754         if msg.datapath.id != pkt_in_src_model.dp.id:
755             pkt_type = 'packet-in'
756             err_msg = 'SW[dpid=%s]' % dpid_lib.dpid_to_str(msg.datapath.id)
757         elif msg.reason in invalid_packet_in_reason:
758             pkt_type = 'packet-in'
759             err_msg = 'OFPPacketIn[reason=%d]' % msg.reason
760         elif repr(msg.data) != repr(model_pkt):
761             pkt_type = 'packet'
762             err_msg = self._diff_packets(packet.Packet(model_pkt),
763                                          packet.Packet(msg.data))
764         else:
765             return TEST_OK
766
767         raise TestFailure(self.state, pkt_type=pkt_type,
768                           detail=err_msg)
769
770     def _test_no_pktin_reason_check(self, test_type,
771                                     target_pkt_count, tester_pkt_count):
772         before_target_receive = target_pkt_count[
773             0][self.target_recv_port]['rx']
774         before_target_send = target_pkt_count[0][self.target_send_port_1]['tx']
775         before_tester_receive = tester_pkt_count[
776             0][self.tester_recv_port_1]['rx']
777         before_tester_send = tester_pkt_count[0][self.tester_send_port]['tx']
778         after_target_receive = target_pkt_count[1][self.target_recv_port]['rx']
779         after_target_send = target_pkt_count[1][self.target_send_port_1]['tx']
780         after_tester_receive = tester_pkt_count[
781             1][self.tester_recv_port_1]['rx']
782         after_tester_send = tester_pkt_count[1][self.tester_send_port]['tx']
783
784         if after_tester_send == before_tester_send:
785             log_msg = 'no change in tx_packets on tester.'
786         elif after_target_receive == before_target_receive:
787             log_msg = 'no change in rx_packets on target.'
788         elif test_type == KEY_EGRESS:
789             if after_target_send == before_target_send:
790                 log_msg = 'no change in tx_packets on target.'
791             elif after_tester_receive == before_tester_receive:
792                 log_msg = 'no change in rx_packets on tester.'
793             else:
794                 log_msg = 'increment in rx_packets in tester.'
795         else:
796             assert test_type == KEY_PKT_IN
797             log_msg = 'no packet-in.'
798
799         raise TestFailure(self.state, detail=log_msg)
800
801     def _test_get_match_count(self):
802         xid = self.target_sw.send_table_stats()
803         self.send_msg_xids.append(xid)
804         self._wait()
805         result = {}
806         for msg in self.rcv_msgs:
807             for stats in msg.body:
808                 result[stats.table_id] = {'lookup': stats.lookup_count,
809                                           'matched': stats.matched_count}
810         return result
811
812     def _test_send_barrier(self):
813         # Wait OFPBarrierReply.
814         xid = self.tester_sw.send_barrier_request()
815         self.send_msg_xids.append(xid)
816         self._wait()
817         assert len(self.rcv_msgs) == 1
818         msg = self.rcv_msgs[0]
819         assert isinstance(
820             msg, self.tester_sw.dp.ofproto_parser.OFPBarrierReply)
821
822     def _test_flow_unmatching_check(self, before_stats, pkt):
823         # Check matched packet count.
824         rcv_msgs = self._test_get_match_count()
825
826         lookup = False
827         for target_tbl_id in pkt[KEY_TBL_MISS]:
828             before = before_stats[target_tbl_id]
829             after = rcv_msgs[target_tbl_id]
830             if before['lookup'] < after['lookup']:
831                 lookup = True
832                 if before['matched'] < after['matched']:
833                     raise TestFailure(self.state)
834         if not lookup:
835             raise TestError(self.state)
836
837     def _one_time_packet_send(self, pkt):
838         self.logger.debug("send_packet:[%s]", packet.Packet(pkt[KEY_INGRESS]))
839         xid = self.tester_sw.send_packet_out(pkt[KEY_INGRESS])
840         self.send_msg_xids.append(xid)
841
842     def _continuous_packet_send(self, pkt):
843         assert self.ingress_event is None
844
845         pkt_text = pkt[KEY_PACKETS]['packet_text']
846         pkt_bin = pkt[KEY_PACKETS]['packet_binary']
847         pktps = pkt[KEY_PACKETS][KEY_PKTPS]
848         duration_time = pkt[KEY_PACKETS][KEY_DURATION_TIME]
849         randomize = pkt[KEY_PACKETS]['randomize']
850
851         self.logger.debug("send_packet:[%s]", packet.Packet(pkt_bin))
852         self.logger.debug("pktps:[%d]", pktps)
853         self.logger.debug("duration_time:[%d]", duration_time)
854
855         arg = {'packet_text': pkt_text,
856                'packet_binary': pkt_bin,
857                'thread_counter': 0,
858                'dot_span': int(CONTINUOUS_PROGRESS_SPAN /
859                                CONTINUOUS_THREAD_INTVL),
860                'packet_counter': float(0),
861                'packet_counter_inc': pktps * CONTINUOUS_THREAD_INTVL,
862                'randomize': randomize}
863
864         try:
865             self.ingress_event = hub.Event()
866             tid = hub.spawn(self._send_packet_thread, arg)
867             self.ingress_threads.append(tid)
868             self.ingress_event.wait(duration_time)
869             if self.thread_msg is not None:
870                 raise self.thread_msg  # pylint: disable=E0702
871         finally:
872             sys.stdout.write("\r\n")
873             sys.stdout.flush()
874
875     def _send_packet_thread(self, arg):
876         """ Send several packets continuously. """
877         if self.ingress_event is None or self.ingress_event._cond:
878             return
879
880         # display dots to express progress of sending packets
881         if not arg['thread_counter'] % arg['dot_span']:
882             sys.stdout.write(".")
883             sys.stdout.flush()
884
885         arg['thread_counter'] += 1
886
887         # pile up float values and
888         # use integer portion as the number of packets this thread sends
889         arg['packet_counter'] += arg['packet_counter_inc']
890         count = int(arg['packet_counter'])
891         arg['packet_counter'] -= count
892
893         hub.sleep(CONTINUOUS_THREAD_INTVL)
894
895         tid = hub.spawn(self._send_packet_thread, arg)
896         self.ingress_threads.append(tid)
897         hub.sleep(0)
898         for _ in range(count):
899             if arg['randomize']:
900                 msg = eval('/'.join(arg['packet_text']))
901                 msg.serialize()
902                 data = msg.data
903             else:
904                 data = arg['packet_binary']
905             try:
906                 self.tester_sw.send_packet_out(data)
907             except Exception as err:
908                 self.thread_msg = err
909                 self.ingress_event.set()
910                 break
911
912     def _compare_flow(self, stats1, stats2):
913
914         def __reasm_match(match):
915             """ reassemble match_fields. """
916             match_fields = match.to_jsondict()
917             # For only OpenFlow1.0
918             match_fields['OFPMatch'].pop('wildcards', None)
919             return match_fields
920
921         attr_list = ['cookie', 'priority', 'hard_timeout', 'idle_timeout',
922                      'match']
923         if self.target_sw.dp.ofproto.OFP_VERSION == ofproto_v1_0.OFP_VERSION:
924             attr_list += ['actions']
925         else:
926             attr_list += ['table_id', 'instructions']
927         for attr in attr_list:
928             value1 = getattr(stats1, attr)
929             value2 = getattr(stats2, attr)
930             if attr in ['actions', 'instructions']:
931                 value1 = sorted(value1, key=lambda x: x.type)
932                 value2 = sorted(value2, key=lambda x: x.type)
933             elif attr == 'match':
934                 value1 = __reasm_match(value1)
935                 value2 = __reasm_match(value2)
936             if str(value1) != str(value2):
937                 return False, 'flow_stats(%s != %s)' % (value1, value2)
938         return True, None
939
940     @classmethod
941     def _compare_meter(cls, stats1, stats2):
942         """compare the message used to install and the message got from
943            the switch."""
944         attr_list = ['flags', 'meter_id', 'bands']
945         for attr in attr_list:
946             value1 = getattr(stats1, attr)
947             value2 = getattr(stats2, attr)
948             if str(value1) != str(value2):
949                 return False, 'meter_stats(%s != %s)' % (value1, value2)
950         return True, None
951
952     @classmethod
953     def _compare_group(cls, stats1, stats2):
954         attr_list = ['type', 'group_id', 'buckets']
955         for attr in attr_list:
956             value1 = getattr(stats1, attr)
957             value2 = getattr(stats2, attr)
958             if str(value1) != str(value2):
959                 return False, 'group_stats(%s != %s)' % (value1, value2)
960             return True, None
961
962     @classmethod
963     def _diff_packets(cls, model_pkt, rcv_pkt):
964         msg = []
965         for rcv_p in rcv_pkt.protocols:
966             if not isinstance(rcv_p, six.binary_type):
967                 model_protocols = model_pkt.get_protocols(type(rcv_p))
968                 if len(model_protocols) == 1:
969                     model_p = model_protocols[0]
970                     diff = []
971                     for attr in rcv_p.__dict__:
972                         if attr.startswith('_'):
973                             continue
974                         if callable(attr):
975                             continue
976                         if hasattr(rcv_p.__class__, attr):
977                             continue
978                         rcv_attr = repr(getattr(rcv_p, attr))
979                         model_attr = repr(getattr(model_p, attr))
980                         if rcv_attr != model_attr:
981                             diff.append('%s=%s' % (attr, rcv_attr))
982                     if diff:
983                         msg.append('%s(%s)' %
984                                    (rcv_p.__class__.__name__,
985                                     ','.join(diff)))
986                 else:
987                     if (not model_protocols or
988                             not str(rcv_p) in str(model_protocols)):
989                         msg.append(str(rcv_p))
990             else:
991                 model_p = ''
992                 for p in model_pkt.protocols:
993                     if isinstance(p, six.binary_type):
994                         model_p = p
995                         break
996                 if model_p != rcv_p:
997                     msg.append('str(%s)' % repr(rcv_p))
998         if msg:
999             return '/'.join(msg)
1000         else:
1001             return ('Encounter an error during packet comparison.'
1002                     ' it is malformed.')
1003
1004     def _test_get_throughput(self):
1005         xid = self.tester_sw.send_flow_stats()
1006         self.send_msg_xids.append(xid)
1007         self._wait()
1008
1009         assert len(self.rcv_msgs) == 1
1010         flow_stats = self.rcv_msgs[0].body
1011         self.logger.debug(flow_stats)
1012         result = {}
1013         for stat in flow_stats:
1014             if stat.cookie != THROUGHPUT_COOKIE:
1015                 continue
1016             result[str(stat.match)] = (stat.byte_count, stat.packet_count)
1017         return time.time(), result
1018
1019     def _test_throughput_check(self, throughputs, start, end):
1020         msgs = []
1021         elapsed_sec = end[0] - start[0]
1022
1023         for throughput in throughputs:
1024             match = str(throughput[KEY_FLOW].match)
1025             # get oxm_fields of OFPMatch
1026             fields = dict(throughput[KEY_FLOW].match._fields2)
1027
1028             if match not in start[1] or match not in end[1]:
1029                 raise TestError(self.state, match=match)
1030             increased_bytes = end[1][match][0] - start[1][match][0]
1031             increased_packets = end[1][match][1] - start[1][match][1]
1032
1033             if throughput[KEY_PKTPS]:
1034                 key = KEY_PKTPS
1035                 conv = 1
1036                 measured_value = increased_packets
1037                 unit = 'pktps'
1038             elif throughput[KEY_KBPS]:
1039                 key = KEY_KBPS
1040                 conv = 1024 / 8  # Kilobits -> bytes
1041                 measured_value = increased_bytes
1042                 unit = 'kbps'
1043             else:
1044                 raise RyuException(
1045                     'An invalid key exists that is neither "%s" nor "%s".'
1046                     % (KEY_KBPS, KEY_PKTPS))
1047
1048             expected_value = throughput[key] * elapsed_sec * conv
1049             margin = expected_value * THROUGHPUT_THRESHOLD
1050             self.logger.debug("measured_value:[%s]", measured_value)
1051             self.logger.debug("expected_value:[%s]", expected_value)
1052             self.logger.debug("margin:[%s]", margin)
1053             if math.fabs(measured_value - expected_value) > margin:
1054                 msgs.append('{0} {1:.2f}{2}'.format(fields,
1055                                                     measured_value / elapsed_sec / conv, unit))
1056
1057         if msgs:
1058             raise TestFailure(self.state, detail=', '.join(msgs))
1059
1060     def _wait(self):
1061         """ Wait until specific OFP message received
1062              or timer is exceeded. """
1063         assert self.waiter is None
1064
1065         self.waiter = hub.Event()
1066         self.rcv_msgs = []
1067         timeout = False
1068
1069         timer = hub.Timeout(WAIT_TIMER)
1070         try:
1071             self.waiter.wait()
1072         except hub.Timeout as t:
1073             if t is not timer:
1074                 raise RyuException('Internal error. Not my timeout.')
1075             timeout = True
1076         finally:
1077             timer.cancel()
1078
1079         self.waiter = None
1080
1081         if timeout:
1082             raise TestTimeout(self.state)
1083         if (self.rcv_msgs and isinstance(
1084                 self.rcv_msgs[0],
1085                 self.rcv_msgs[0].datapath.ofproto_parser.OFPErrorMsg)):
1086             raise TestReceiveError(self.state, self.rcv_msgs[0])
1087
1088     @set_ev_cls([ofp_event.EventOFPFlowStatsReply,
1089                  ofp_event.EventOFPMeterConfigStatsReply,
1090                  ofp_event.EventOFPTableStatsReply,
1091                  ofp_event.EventOFPPortStatsReply,
1092                  ofp_event.EventOFPGroupDescStatsReply],
1093                 handler.MAIN_DISPATCHER)
1094     def stats_reply_handler(self, ev):
1095         # keys: stats reply event classes
1096         # values: states in which the events should be processed
1097         ofp = ev.msg.datapath.ofproto
1098         event_states = {
1099             ofp_event.EventOFPFlowStatsReply:
1100                 [STATE_FLOW_EXIST_CHK,
1101                  STATE_THROUGHPUT_FLOW_EXIST_CHK,
1102                  STATE_GET_THROUGHPUT],
1103             ofp_event.EventOFPTableStatsReply:
1104                 [STATE_GET_MATCH_COUNT,
1105                  STATE_FLOW_UNMATCH_CHK],
1106             ofp_event.EventOFPPortStatsReply:
1107                 [STATE_TARGET_PKT_COUNT,
1108                  STATE_TESTER_PKT_COUNT],
1109         }
1110         if ofp.OFP_VERSION >= ofproto_v1_2.OFP_VERSION:
1111             event_states[ofp_event.EventOFPGroupDescStatsReply] = [
1112                 STATE_GROUP_EXIST_CHK
1113             ]
1114         if ofp.OFP_VERSION >= ofproto_v1_3.OFP_VERSION:
1115             event_states[ofp_event.EventOFPMeterConfigStatsReply] = [
1116                 STATE_METER_EXIST_CHK
1117             ]
1118         if self.state in event_states[ev.__class__]:
1119             if self.waiter and ev.msg.xid in self.send_msg_xids:
1120                 self.rcv_msgs.append(ev.msg)
1121                 if not ev.msg.flags:
1122                     self.waiter.set()
1123                     hub.sleep(0)
1124
1125     @set_ev_cls(ofp_event.EventOFPBarrierReply, handler.MAIN_DISPATCHER)
1126     def barrier_reply_handler(self, ev):
1127         state_list = [STATE_INIT_FLOW,
1128                       STATE_INIT_THROUGHPUT_FLOW,
1129                       STATE_INIT_METER,
1130                       STATE_INIT_GROUP,
1131                       STATE_FLOW_INSTALL,
1132                       STATE_THROUGHPUT_FLOW_INSTALL,
1133                       STATE_METER_INSTALL,
1134                       STATE_GROUP_INSTALL,
1135                       STATE_SEND_BARRIER]
1136         if self.state in state_list:
1137             if self.waiter and ev.msg.xid in self.send_msg_xids:
1138                 self.rcv_msgs.append(ev.msg)
1139                 self.waiter.set()
1140                 hub.sleep(0)
1141
1142     @set_ev_cls(ofp_event.EventOFPPacketIn, handler.MAIN_DISPATCHER)
1143     def packet_in_handler(self, ev):
1144         state_list = [STATE_FLOW_MATCH_CHK]
1145         if self.state in state_list:
1146             if self.waiter:
1147                 self.rcv_msgs.append(ev.msg)
1148                 self.waiter.set()
1149                 hub.sleep(0)
1150
1151     @set_ev_cls(ofp_event.EventOFPErrorMsg, [handler.HANDSHAKE_DISPATCHER,
1152                                              handler.CONFIG_DISPATCHER,
1153                                              handler.MAIN_DISPATCHER])
1154     def error_msg_handler(self, ev):
1155         if ev.msg.xid in self.send_msg_xids:
1156             self.rcv_msgs.append(ev.msg)
1157             if self.waiter:
1158                 self.waiter.set()
1159                 hub.sleep(0)
1160
1161
1162 class OpenFlowSw(object):
1163
1164     def __init__(self, dp, logger):
1165         super(OpenFlowSw, self).__init__()
1166         self.dp = dp
1167         self.logger = logger
1168         self.tester_send_port = CONF['test-switch']['tester_send_port']
1169
1170     def send_msg(self, msg):
1171         if isinstance(self.dp, DummyDatapath):
1172             raise TestError(STATE_DISCONNECTED)
1173         msg.xid = None
1174         self.dp.set_xid(msg)
1175         self.dp.send_msg(msg)
1176         return msg.xid
1177
1178     def add_flow(self, in_port=None, out_port=None):
1179         """ Add flow. """
1180         ofp = self.dp.ofproto
1181         parser = self.dp.ofproto_parser
1182         match = parser.OFPMatch(in_port=in_port)
1183         actions = [parser.OFPActionOutput(out_port)]
1184         if ofp.OFP_VERSION == ofproto_v1_0.OFP_VERSION:
1185             mod = parser.OFPFlowMod(
1186                 self.dp, match=match, cookie=0, command=ofp.OFPFC_ADD,
1187                 actions=actions)
1188         else:
1189             inst = [parser.OFPInstructionActions(
1190                 ofp.OFPIT_APPLY_ACTIONS, actions)]
1191             mod = parser.OFPFlowMod(
1192                 self.dp, cookie=0, command=ofp.OFPFC_ADD, match=match,
1193                 instructions=inst)
1194         return self.send_msg(mod)
1195
1196     def del_flows(self, cookie=0):
1197         """
1198         Delete all flow except default flow by using the cookie value.
1199
1200         Note: In OpenFlow 1.0, DELETE and DELETE_STRICT commands can
1201         not be filtered by the cookie value and this value is ignored.
1202         """
1203         ofp = self.dp.ofproto
1204         parser = self.dp.ofproto_parser
1205         cookie_mask = 0
1206         if cookie:
1207             cookie_mask = 0xffffffffffffffff
1208         if ofp.OFP_VERSION == ofproto_v1_0.OFP_VERSION:
1209             match = parser.OFPMatch()
1210             mod = parser.OFPFlowMod(self.dp, match, cookie, ofp.OFPFC_DELETE)
1211         else:
1212             mod = parser.OFPFlowMod(
1213                 self.dp, cookie=cookie, cookie_mask=cookie_mask,
1214                 table_id=ofp.OFPTT_ALL, command=ofp.OFPFC_DELETE,
1215                 out_port=ofp.OFPP_ANY, out_group=ofp.OFPG_ANY)
1216         return self.send_msg(mod)
1217
1218     def del_meters(self):
1219         """ Delete all meter entries. """
1220         ofp = self.dp.ofproto
1221         parser = self.dp.ofproto_parser
1222         if ofp.OFP_VERSION < ofproto_v1_3.OFP_VERSION:
1223             return None
1224         mod = parser.OFPMeterMod(self.dp,
1225                                  command=ofp.OFPMC_DELETE,
1226                                  flags=0,
1227                                  meter_id=ofp.OFPM_ALL)
1228         return self.send_msg(mod)
1229
1230     def del_groups(self):
1231         """ Delete all group entries. """
1232         ofp = self.dp.ofproto
1233         parser = self.dp.ofproto_parser
1234         if ofp.OFP_VERSION < ofproto_v1_2.OFP_VERSION:
1235             return None
1236         mod = parser.OFPGroupMod(self.dp,
1237                                  command=ofp.OFPGC_DELETE,
1238                                  type_=0,
1239                                  group_id=ofp.OFPG_ALL)
1240         return self.send_msg(mod)
1241
1242     def send_barrier_request(self):
1243         """ send a BARRIER_REQUEST message."""
1244         parser = self.dp.ofproto_parser
1245         req = parser.OFPBarrierRequest(self.dp)
1246         return self.send_msg(req)
1247
1248     def send_port_stats(self):
1249         """ Get port stats."""
1250         ofp = self.dp.ofproto
1251         parser = self.dp.ofproto_parser
1252         flags = 0
1253         if ofp.OFP_VERSION == ofproto_v1_0.OFP_VERSION:
1254             port = ofp.OFPP_NONE
1255         else:
1256             port = ofp.OFPP_ANY
1257         req = parser.OFPPortStatsRequest(self.dp, flags, port)
1258         return self.send_msg(req)
1259
1260     def send_flow_stats(self):
1261         """ Get all flow. """
1262         ofp = self.dp.ofproto
1263         parser = self.dp.ofproto_parser
1264         if ofp.OFP_VERSION == ofproto_v1_0.OFP_VERSION:
1265             req = parser.OFPFlowStatsRequest(
1266                 self.dp, 0, parser.OFPMatch(), 0xff, ofp.OFPP_NONE)
1267         else:
1268             req = parser.OFPFlowStatsRequest(
1269                 self.dp, 0, ofp.OFPTT_ALL, ofp.OFPP_ANY, ofp.OFPG_ANY,
1270                 0, 0, parser.OFPMatch())
1271         return self.send_msg(req)
1272
1273     def send_meter_config_stats(self):
1274         """ Get all meter. """
1275         ofp = self.dp.ofproto
1276         parser = self.dp.ofproto_parser
1277         if ofp.OFP_VERSION < ofproto_v1_3.OFP_VERSION:
1278             return None
1279         stats = parser.OFPMeterConfigStatsRequest(self.dp)
1280         return self.send_msg(stats)
1281
1282     def send_group_desc_stats(self):
1283         """ Get all group. """
1284         ofp = self.dp.ofproto
1285         parser = self.dp.ofproto_parser
1286         if ofp.OFP_VERSION < ofproto_v1_2.OFP_VERSION:
1287             return None
1288         stats = parser.OFPGroupDescStatsRequest(self.dp)
1289         return self.send_msg(stats)
1290
1291     def send_table_stats(self):
1292         """ Get table stats. """
1293         parser = self.dp.ofproto_parser
1294         req = parser.OFPTableStatsRequest(self.dp, 0)
1295         return self.send_msg(req)
1296
1297     def send_packet_out(self, data):
1298         """ send a PacketOut message."""
1299         ofp = self.dp.ofproto
1300         parser = self.dp.ofproto_parser
1301         actions = [parser.OFPActionOutput(self.tester_send_port)]
1302         out = parser.OFPPacketOut(
1303             datapath=self.dp, buffer_id=ofp.OFP_NO_BUFFER,
1304             data=data, in_port=ofp.OFPP_CONTROLLER, actions=actions)
1305         return self.send_msg(out)
1306
1307
1308 class TestPatterns(dict):
1309     """ List of Test class objects. """
1310
1311     def __init__(self, test_dir, logger):
1312         super(TestPatterns, self).__init__()
1313         self.logger = logger
1314         # Parse test pattern from test files.
1315         self._get_tests(test_dir)
1316
1317     def _get_tests(self, path):
1318         if not os.path.exists(path):
1319             msg = INVALID_PATH % {'path': path}
1320             self.logger.warning(msg)
1321             return
1322
1323         if os.path.isdir(path):  # Directory
1324             for test_path in os.listdir(path):
1325                 test_path = path + (test_path if path[-1:] == '/'
1326                                     else '/%s' % test_path)
1327                 self._get_tests(test_path)
1328
1329         elif os.path.isfile(path):  # File
1330             (dummy, ext) = os.path.splitext(path)
1331             if ext == '.json':
1332                 test = TestFile(path, self.logger)
1333                 self[test.description] = test
1334
1335
1336 class TestFile(stringify.StringifyMixin):
1337     """Test File object include Test objects."""
1338
1339     def __init__(self, path, logger):
1340         super(TestFile, self).__init__()
1341         self.logger = logger
1342         self.description = None
1343         self.tests = []
1344         self._get_tests(path)
1345
1346     def _normalize_test_json(self, val):
1347         def __replace_port_name(k, v):
1348             for port_name in [
1349                 'target_recv_port', 'target_send_port_1',
1350                 'target_send_port_2', 'tester_send_port',
1351                     'tester_recv_port_1', 'tester_recv_port_2']:
1352                 if v[k] == port_name:
1353                     v[k] = CONF['test-switch'][port_name]
1354         if isinstance(val, dict):
1355             for k, v in val.items():
1356                 if k == "OFPActionOutput":
1357                     if 'port' in v:
1358                         __replace_port_name("port", v)
1359                 elif k == "OXMTlv":
1360                     if v.get("field", "") == "in_port":
1361                         __replace_port_name("value", v)
1362                 self._normalize_test_json(v)
1363         elif isinstance(val, list):
1364             for v in val:
1365                 self._normalize_test_json(v)
1366
1367     def _get_tests(self, path):
1368         with open(path, 'r') as fhandle:
1369             buf = fhandle.read()
1370             try:
1371                 json_list = json.loads(buf)
1372                 for test_json in json_list:
1373                     if isinstance(test_json, six.text_type):
1374                         self.description = test_json
1375                     else:
1376                         self._normalize_test_json(test_json)
1377                         self.tests.append(Test(test_json))
1378             except (ValueError, TypeError) as e:
1379                 result = (TEST_FILE_ERROR %
1380                           {'file': path, 'detail': str(e)})
1381                 self.logger.warning(result)
1382
1383
1384 class Test(stringify.StringifyMixin):
1385     def __init__(self, test_json):
1386         super(Test, self).__init__()
1387         (self.description,
1388          self.prerequisite,
1389          self.tests) = self._parse_test(test_json)
1390
1391     @classmethod
1392     def _parse_test(cls, buf):
1393         def __test_pkt_from_json(test):
1394             data = eval('/'.join(test))
1395             data.serialize()
1396             return six.binary_type(data.data)
1397
1398         # create Datapath instance using user-specified versions
1399         target_dp = DummyDatapath(OfTester.target_ver)
1400         tester_dp = DummyDatapath(OfTester.tester_ver)
1401
1402         # parse 'description'
1403         description = buf.get(KEY_DESC)
1404
1405         # parse 'prerequisite'
1406         prerequisite = []
1407         if KEY_PREREQ not in buf:
1408             raise ValueError('a test requires a "%s" block' % KEY_PREREQ)
1409         for flow in buf[KEY_PREREQ]:
1410             msg = ofproto_parser.ofp_msg_from_jsondict(
1411                 target_dp, flow)
1412             msg.serialize()
1413             prerequisite.append(msg)
1414
1415         # parse 'tests'
1416         tests = []
1417         if KEY_TESTS not in buf:
1418             raise ValueError('a test requires a "%s" block.' % KEY_TESTS)
1419
1420         for test in buf[KEY_TESTS]:
1421             if len(test) != 2:
1422                 raise ValueError(
1423                     '"%s" block requires "%s" field and one of "%s" or "%s"'
1424                     ' or "%s" field.' % (KEY_TESTS, KEY_INGRESS, KEY_EGRESS,
1425                                          KEY_PKT_IN, KEY_TBL_MISS))
1426             test_pkt = {}
1427             # parse 'ingress'
1428             if KEY_INGRESS not in test:
1429                 raise ValueError('a test requires "%s" field.' % KEY_INGRESS)
1430             if isinstance(test[KEY_INGRESS], list):
1431                 test_pkt[KEY_INGRESS] = __test_pkt_from_json(test[KEY_INGRESS])
1432             elif isinstance(test[KEY_INGRESS], dict):
1433                 test_pkt[KEY_PACKETS] = {
1434                     'packet_text': test[KEY_INGRESS][KEY_PACKETS][KEY_DATA],
1435                     'packet_binary': __test_pkt_from_json(
1436                         test[KEY_INGRESS][KEY_PACKETS][KEY_DATA]),
1437                     KEY_DURATION_TIME: test[KEY_INGRESS][KEY_PACKETS].get(
1438                         KEY_DURATION_TIME, DEFAULT_DURATION_TIME),
1439                     KEY_PKTPS: test[KEY_INGRESS][KEY_PACKETS].get(
1440                         KEY_PKTPS, DEFAULT_PKTPS),
1441                     'randomize': True in [
1442                         line.find('randint') != -1
1443                         for line in test[KEY_INGRESS][KEY_PACKETS][KEY_DATA]]}
1444             else:
1445                 raise ValueError('invalid format: "%s" field' % KEY_INGRESS)
1446             # parse 'egress' or 'PACKET_IN' or 'table-miss'
1447             if KEY_EGRESS in test:
1448                 if isinstance(test[KEY_EGRESS], list):
1449                     test_pkt[KEY_EGRESS] = __test_pkt_from_json(
1450                         test[KEY_EGRESS])
1451                 elif isinstance(test[KEY_EGRESS], dict):
1452                     throughputs = []
1453                     for throughput in test[KEY_EGRESS][KEY_THROUGHPUT]:
1454                         one = {}
1455                         mod = {
1456                             "OFPFlowMod": {
1457                                 'cookie': THROUGHPUT_COOKIE,
1458                                 'priority': THROUGHPUT_PRIORITY,
1459                                 'match': {
1460                                     'OFPMatch': throughput[KEY_MATCH]
1461                                 }
1462                             }
1463                         }
1464                         msg = ofproto_parser.ofp_msg_from_jsondict(
1465                             tester_dp, mod)
1466                         one[KEY_FLOW] = msg
1467                         one[KEY_KBPS] = throughput.get(KEY_KBPS)
1468                         one[KEY_PKTPS] = throughput.get(KEY_PKTPS)
1469                         if not bool(one[KEY_KBPS]) != bool(one[KEY_PKTPS]):
1470                             raise ValueError(
1471                                 '"%s" requires either "%s" or "%s".' % (
1472                                     KEY_THROUGHPUT, KEY_KBPS, KEY_PKTPS))
1473                         throughputs.append(one)
1474                     test_pkt[KEY_THROUGHPUT] = throughputs
1475                 else:
1476                     raise ValueError('invalid format: "%s" field' % KEY_EGRESS)
1477             elif KEY_PKT_IN in test:
1478                 test_pkt[KEY_PKT_IN] = __test_pkt_from_json(test[KEY_PKT_IN])
1479             elif KEY_TBL_MISS in test:
1480                 test_pkt[KEY_TBL_MISS] = test[KEY_TBL_MISS]
1481
1482             tests.append(test_pkt)
1483
1484         return description, prerequisite, tests
1485
1486
1487 class DummyDatapath(ofproto_protocol.ProtocolDesc):
1488     def __init__(self, version=None):
1489         super(DummyDatapath, self).__init__(version)
1490
1491     def set_xid(self, _):
1492         pass
1493
1494     def send_msg(self, _):
1495         pass