1 # Copyright (C) 2016 Nippon Telegraph and Telephone Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 from __future__ import print_function
25 import mock # Python 2
27 from unittest import mock # Python 3
29 from nose.tools import eq_
30 from nose.tools import raises
32 from ryu.utils import binary_str
33 from ryu.lib import pcaplib
35 LOG = logging.getLogger(__name__)
37 PCAP_PACKET_DATA_DIR = os.path.join(
38 os.path.dirname(sys.modules[__name__].__file__),
39 '../../packet_data/pcap/')
42 class Test_PcapFileHdr(unittest.TestCase):
44 Test case for pcaplib.PcapFileHdr class
46 hdr = pcaplib.PcapFileHdr(
47 magic=None, # temporary default
57 b'\xa1\xb2\xc3\xd4' # magic (Big Endian)
58 b'\x00\x02\x00\x04' # version_major, version_minor
59 b'\x11\x22\x33\x44' # thiszone
60 b'\x22\x33\x44\x55' # sigfigs
61 b'\x33\x44\x55\x66' # snaplen
62 b'\x44\x55\x66\x77' # network
66 b'\xd4\xc3\xb2\xa1' # magic (Little Endian)
67 b'\x02\x00\x04\x00' # version_major, version_minor
68 b'\x44\x33\x22\x11' # thiszone
69 b'\x55\x44\x33\x22' # sigfigs
70 b'\x66\x55\x44\x33' # snaplen
71 b'\x77\x66\x55\x44' # network
75 b'\xff\xff\xff\xff' # magic (Invalid)
76 b'\x02\x00\x04\x00' # version_major, version_minor
77 b'\x44\x33\x22\x11' # thiszone
78 b'\x55\x44\x33\x22' # sigfigs
79 b'\x66\x55\x44\x33' # snaplen
80 b'\x77\x66\x55\x44' # network
83 def _assert(self, magic, ret):
84 self.hdr.magic = magic
85 eq_(self.hdr.__dict__, ret.__dict__)
87 def test_parser_with_big_endian(self):
88 ret, byteorder = pcaplib.PcapFileHdr.parser(self.buf_big)
89 self._assert(pcaplib.PcapFileHdr.MAGIC_NUMBER_IDENTICAL, ret)
92 def test_parser_with_little_endian(self):
93 ret, byteorder = pcaplib.PcapFileHdr.parser(self.buf_little)
94 self._assert(pcaplib.PcapFileHdr.MAGIC_NUMBER_SWAPPED, ret)
95 eq_('little', byteorder)
97 @mock.patch('sys.byteorder', 'big')
98 def test_serialize_with_big_endian(self):
99 buf = self.hdr.serialize()
100 eq_(binary_str(self.buf_big), binary_str(buf))
102 @mock.patch('sys.byteorder', 'little')
103 def test_serialize_with_little_endian(self):
104 buf = self.hdr.serialize()
105 eq_(binary_str(self.buf_little), binary_str(buf))
107 @raises(struct.error)
108 def test_parser_with_invalid_magic_number(self):
109 pcaplib.PcapFileHdr.parser(self.buf_invalid)
112 class Test_PcapPktHdr(unittest.TestCase):
114 Test case for pcaplib.PcapPktHdr class
116 expected_buf = b'test_data'
118 hdr = pcaplib.PcapPktHdr(
121 incl_len=len(expected_buf),
126 b'\x11\x22\x33\x44' # ts_sec
127 b'\x22\x33\x44\x55' # ts_usec
128 b'\x00\x00\x00\x09' # incl_len = len(expected_buf)
129 b'\x44\x55\x66\x77' # orig_len
133 b'\x44\x33\x22\x11' # ts_sec
134 b'\x55\x44\x33\x22' # ts_usec
135 b'\x09\x00\x00\x00' # incl_len = len(expected_buf)
136 b'\x77\x66\x55\x44' # orig_len
139 def test_parser_with_big_endian(self):
140 ret, buf = pcaplib.PcapPktHdr.parser(
141 self.buf_big + self.expected_buf, 'big')
142 eq_(self.hdr.__dict__, ret.__dict__)
143 eq_(self.expected_buf, buf)
145 def test_parser_with_little_endian(self):
146 ret, buf = pcaplib.PcapPktHdr.parser(
147 self.buf_little + self.expected_buf, 'little')
148 eq_(self.hdr.__dict__, ret.__dict__)
149 eq_(self.expected_buf, buf)
151 @mock.patch('sys.byteorder', 'big')
152 def test_serialize_with_big_endian(self):
153 buf = self.hdr.serialize()
154 eq_(binary_str(self.buf_big), binary_str(buf))
156 @mock.patch('sys.byteorder', 'little')
157 def test_serialize_with_little_endian(self):
158 buf = self.hdr.serialize()
159 eq_(binary_str(self.buf_little), binary_str(buf))
162 class Test_pcaplib_Reader(unittest.TestCase):
164 Test case for pcaplib.Reader class
168 (0x1234 + (0x5678 / 1e6), b'test_data_1'), # sec=0x1234, usec=0x5678
169 (0x2345 + (0x6789 / 1e6), b'test_data_2'), # sec=0x2345, usec=0x6789
172 def _test(self, file_name):
174 for ts, buf in pcaplib.Reader(open(file_name, 'rb')):
175 outputs.append((ts, buf))
177 eq_(self.expected_outputs, outputs)
179 def test_with_big_endian(self):
180 self._test(os.path.join(PCAP_PACKET_DATA_DIR, 'big_endian.pcap'))
182 def test_with_little_endian(self):
183 self._test(os.path.join(PCAP_PACKET_DATA_DIR, 'little_endian.pcap'))
186 class DummyFile(object):
191 def write(self, buf):
198 class Test_pcaplib_Writer(unittest.TestCase):
200 Test case for pcaplib.Writer class
204 def _test(file_name):
205 expected_buf = open(file_name, 'rb').read()
207 w = pcaplib.Writer(f)
208 w.write_pkt(b'test_data_1', ts=(0x1234 + (0x5678 / 1e6)))
209 w.write_pkt(b'test_data_2', ts=(0x2345 + (0x6789 / 1e6)))
210 eq_(expected_buf, f.buf)
212 @mock.patch('sys.byteorder', 'big')
213 def test_with_big_endian(self):
214 self._test(os.path.join(PCAP_PACKET_DATA_DIR, 'big_endian.pcap'))
216 @mock.patch('sys.byteorder', 'little')
217 def test_with_little_endian(self):
218 self._test(os.path.join(PCAP_PACKET_DATA_DIR, 'little_endian.pcap'))
221 @mock.patch.object(pcaplib.Writer, '_write_pcap_file_hdr', mock.MagicMock)
222 @mock.patch.object(pcaplib.Writer, '_write_pkt_hdr', mock.MagicMock)
223 def test_with_longer_buf():
226 w = pcaplib.Writer(f, snaplen=snaplen)
227 w.write_pkt(b'hogehoge', ts=0)
228 expected_buf = b'hoge' # b'hogehoge'[:snaplen]
229 eq_(expected_buf, f.buf)
230 eq_(snaplen, len(f.buf))