backing up
[vsorcdistro/.git] / ryu / build / lib.linux-armv7l-2.7 / ryu / tests / unit / packet / test_icmp.py
1 # Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16
17 import inspect
18 import logging
19 import six
20 import struct
21 import unittest
22
23 from nose.tools import eq_
24 from ryu.lib.packet import icmp
25 from ryu.lib.packet import packet_utils
26
27
28 LOG = logging.getLogger(__name__)
29
30
31 class Test_icmp(unittest.TestCase):
32
33     echo_id = None
34     echo_seq = None
35     echo_data = None
36
37     unreach_mtu = None
38     unreach_data = None
39     unreach_data_len = None
40
41     te_data = None
42     te_data_len = None
43
44     def setUp(self):
45         self.type_ = icmp.ICMP_ECHO_REQUEST
46         self.code = 0
47         self.csum = 0
48         self.data = b''
49
50         self.ic = icmp.icmp(self.type_, self.code, self.csum, self.data)
51
52         self.buf = bytearray(struct.pack(
53             icmp.icmp._PACK_STR, self.type_, self.code, self.csum))
54         self.csum_calc = packet_utils.checksum(self.buf)
55         struct.pack_into('!H', self.buf, 2, self.csum_calc)
56
57     def setUp_with_echo(self):
58         self.echo_id = 13379
59         self.echo_seq = 1
60         self.echo_data = b'\x30\x0e\x09\x00\x00\x00\x00\x00' \
61             + b'\x10\x11\x12\x13\x14\x15\x16\x17' \
62             + b'\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f' \
63             + b'\x20\x21\x22\x23\x24\x25\x26\x27' \
64             + b'\x28\x29\x2a\x2b\x2c\x2d\x2e\x2f' \
65             + b'\x30\x31\x32\x33\x34\x35\x36\x37'
66         self.data = icmp.echo(
67             id_=self.echo_id, seq=self.echo_seq, data=self.echo_data)
68
69         self.type_ = icmp.ICMP_ECHO_REQUEST
70         self.code = 0
71         self.ic = icmp.icmp(self.type_, self.code, self.csum, self.data)
72
73         self.buf = bytearray(struct.pack(
74             icmp.icmp._PACK_STR, self.type_, self.code, self.csum))
75         self.buf += self.data.serialize()
76         self.csum_calc = packet_utils.checksum(self.buf)
77         struct.pack_into('!H', self.buf, 2, self.csum_calc)
78
79     def setUp_with_dest_unreach(self):
80         self.unreach_mtu = 10
81         self.unreach_data = b'abc'
82         self.unreach_data_len = len(self.unreach_data)
83         self.data = icmp.dest_unreach(
84             data_len=self.unreach_data_len, mtu=self.unreach_mtu,
85             data=self.unreach_data)
86
87         self.type_ = icmp.ICMP_DEST_UNREACH
88         self.code = icmp.ICMP_HOST_UNREACH_CODE
89         self.ic = icmp.icmp(self.type_, self.code, self.csum, self.data)
90
91         self.buf = bytearray(struct.pack(
92             icmp.icmp._PACK_STR, self.type_, self.code, self.csum))
93         self.buf += self.data.serialize()
94         self.csum_calc = packet_utils.checksum(self.buf)
95         struct.pack_into('!H', self.buf, 2, self.csum_calc)
96
97     def setUp_with_TimeExceeded(self):
98         self.te_data = b'abc'
99         self.te_data_len = len(self.te_data)
100         self.data = icmp.TimeExceeded(
101             data_len=self.te_data_len, data=self.te_data)
102
103         self.type_ = icmp.ICMP_TIME_EXCEEDED
104         self.code = 0
105         self.ic = icmp.icmp(self.type_, self.code, self.csum, self.data)
106
107         self.buf = bytearray(struct.pack(
108             icmp.icmp._PACK_STR, self.type_, self.code, self.csum))
109         self.buf += self.data.serialize()
110         self.csum_calc = packet_utils.checksum(self.buf)
111         struct.pack_into('!H', self.buf, 2, self.csum_calc)
112
113     def test_init(self):
114         eq_(self.type_, self.ic.type)
115         eq_(self.code, self.ic.code)
116         eq_(self.csum, self.ic.csum)
117         eq_(str(self.data), str(self.ic.data))
118
119     def test_init_with_echo(self):
120         self.setUp_with_echo()
121         self.test_init()
122
123     def test_init_with_dest_unreach(self):
124         self.setUp_with_dest_unreach()
125         self.test_init()
126
127     def test_init_with_TimeExceeded(self):
128         self.setUp_with_TimeExceeded()
129         self.test_init()
130
131     def test_parser(self):
132         _res = icmp.icmp.parser(six.binary_type(self.buf))
133         if type(_res) is tuple:
134             res = _res[0]
135         else:
136             res = _res
137
138         eq_(self.type_, res.type)
139         eq_(self.code, res.code)
140         eq_(self.csum_calc, res.csum)
141         eq_(str(self.data), str(res.data))
142
143     def test_parser_with_echo(self):
144         self.setUp_with_echo()
145         self.test_parser()
146
147     def test_parser_with_dest_unreach(self):
148         self.setUp_with_dest_unreach()
149         self.test_parser()
150
151     def test_parser_with_TimeExceeded(self):
152         self.setUp_with_TimeExceeded()
153         self.test_parser()
154
155     def test_serialize(self):
156         data = bytearray()
157         prev = None
158         buf = self.ic.serialize(data, prev)
159
160         res = struct.unpack_from(icmp.icmp._PACK_STR, six.binary_type(buf))
161
162         eq_(self.type_, res[0])
163         eq_(self.code, res[1])
164         eq_(self.csum_calc, res[2])
165
166     def test_serialize_with_echo(self):
167         self.setUp_with_echo()
168         self.test_serialize()
169
170         data = bytearray()
171         prev = None
172         buf = self.ic.serialize(data, prev)
173         echo = icmp.echo.parser(six.binary_type(buf), icmp.icmp._MIN_LEN)
174         eq_(repr(self.data), repr(echo))
175
176     def test_serialize_with_dest_unreach(self):
177         self.setUp_with_dest_unreach()
178         self.test_serialize()
179
180         data = bytearray()
181         prev = None
182         buf = self.ic.serialize(data, prev)
183         unreach = icmp.dest_unreach.parser(six.binary_type(buf), icmp.icmp._MIN_LEN)
184         eq_(repr(self.data), repr(unreach))
185
186     def test_serialize_with_TimeExceeded(self):
187         self.setUp_with_TimeExceeded()
188         self.test_serialize()
189
190         data = bytearray()
191         prev = None
192         buf = self.ic.serialize(data, prev)
193         te = icmp.TimeExceeded.parser(six.binary_type(buf), icmp.icmp._MIN_LEN)
194         eq_(repr(self.data), repr(te))
195
196     def test_to_string(self):
197         icmp_values = {'type': repr(self.type_),
198                        'code': repr(self.code),
199                        'csum': repr(self.csum),
200                        'data': repr(self.data)}
201         _ic_str = ','.join(['%s=%s' % (k, icmp_values[k])
202                             for k, v in inspect.getmembers(self.ic)
203                             if k in icmp_values])
204         ic_str = '%s(%s)' % (icmp.icmp.__name__, _ic_str)
205
206         eq_(str(self.ic), ic_str)
207         eq_(repr(self.ic), ic_str)
208
209     def test_to_string_with_echo(self):
210         self.setUp_with_echo()
211         self.test_to_string()
212
213     def test_to_string_with_dest_unreach(self):
214         self.setUp_with_dest_unreach()
215         self.test_to_string()
216
217     def test_to_string_with_TimeExceeded(self):
218         self.setUp_with_TimeExceeded()
219         self.test_to_string()
220
221     def test_default_args(self):
222         ic = icmp.icmp()
223         buf = ic.serialize(bytearray(), None)
224         res = struct.unpack(icmp.icmp._PACK_STR, six.binary_type(buf[:4]))
225
226         eq_(res[0], 8)
227         eq_(res[1], 0)
228         eq_(buf[4:], b'\x00\x00\x00\x00')
229
230         # with data
231         ic = icmp.icmp(type_=icmp.ICMP_DEST_UNREACH, data=icmp.dest_unreach())
232         buf = ic.serialize(bytearray(), None)
233         res = struct.unpack(icmp.icmp._PACK_STR, six.binary_type(buf[:4]))
234
235         eq_(res[0], 3)
236         eq_(res[1], 0)
237         eq_(buf[4:], b'\x00\x00\x00\x00')
238
239     def test_json(self):
240         jsondict = self.ic.to_jsondict()
241         ic = icmp.icmp.from_jsondict(jsondict['icmp'])
242         eq_(str(self.ic), str(ic))
243
244     def test_json_with_echo(self):
245         self.setUp_with_echo()
246         self.test_json()
247
248     def test_json_with_dest_unreach(self):
249         self.setUp_with_dest_unreach()
250         self.test_json()
251
252     def test_json_with_TimeExceeded(self):
253         self.setUp_with_TimeExceeded()
254         self.test_json()
255
256
257 class Test_echo(unittest.TestCase):
258
259     def setUp(self):
260         self.id_ = 13379
261         self.seq = 1
262         self.data = b'\x30\x0e\x09\x00\x00\x00\x00\x00' \
263             + b'\x10\x11\x12\x13\x14\x15\x16\x17' \
264             + b'\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f' \
265             + b'\x20\x21\x22\x23\x24\x25\x26\x27' \
266             + b'\x28\x29\x2a\x2b\x2c\x2d\x2e\x2f' \
267             + b'\x30\x31\x32\x33\x34\x35\x36\x37'
268         self.echo = icmp.echo(
269             self.id_, self.seq, self.data)
270         self.buf = struct.pack('!HH', self.id_, self.seq)
271         self.buf += self.data
272
273     def test_init(self):
274         eq_(self.id_, self.echo.id)
275         eq_(self.seq, self.echo.seq)
276         eq_(self.data, self.echo.data)
277
278     def test_parser(self):
279         _res = icmp.echo.parser(self.buf, 0)
280         if type(_res) is tuple:
281             res = _res[0]
282         else:
283             res = _res
284         eq_(self.id_, res.id)
285         eq_(self.seq, res.seq)
286         eq_(self.data, res.data)
287
288     def test_serialize(self):
289         buf = self.echo.serialize()
290         res = struct.unpack_from('!HH', six.binary_type(buf))
291         eq_(self.id_, res[0])
292         eq_(self.seq, res[1])
293         eq_(self.data, buf[struct.calcsize('!HH'):])
294
295     def test_default_args(self):
296         ec = icmp.echo()
297         buf = ec.serialize()
298         res = struct.unpack(icmp.echo._PACK_STR, six.binary_type(buf))
299
300         eq_(res[0], 0)
301         eq_(res[1], 0)
302
303
304 class Test_dest_unreach(unittest.TestCase):
305
306     def setUp(self):
307         self.mtu = 10
308         self.data = b'abc'
309         self.data_len = len(self.data)
310         self.dest_unreach = icmp.dest_unreach(
311             data_len=self.data_len, mtu=self.mtu, data=self.data)
312         self.buf = struct.pack('!xBH', self.data_len, self.mtu)
313         self.buf += self.data
314
315     def test_init(self):
316         eq_(self.data_len, self.dest_unreach.data_len)
317         eq_(self.mtu, self.dest_unreach.mtu)
318         eq_(self.data, self.dest_unreach.data)
319
320     def test_parser(self):
321         _res = icmp.dest_unreach.parser(self.buf, 0)
322         if type(_res) is tuple:
323             res = _res[0]
324         else:
325             res = _res
326         eq_(self.data_len, res.data_len)
327         eq_(self.mtu, res.mtu)
328         eq_(self.data, res.data)
329
330     def test_serialize(self):
331         buf = self.dest_unreach.serialize()
332         res = struct.unpack_from('!xBH', six.binary_type(buf))
333         eq_(self.data_len, res[0])
334         eq_(self.mtu, res[1])
335         eq_(self.data, buf[struct.calcsize('!xBH'):])
336
337     def test_default_args(self):
338         du = icmp.dest_unreach()
339         buf = du.serialize()
340         res = struct.unpack(icmp.dest_unreach._PACK_STR, six.binary_type(buf))
341
342         eq_(res[0], 0)
343         eq_(res[1], 0)
344
345
346 class Test_TimeExceeded(unittest.TestCase):
347
348     def setUp(self):
349         self.data = b'abc'
350         self.data_len = len(self.data)
351         self.te = icmp.TimeExceeded(
352             data_len=self.data_len, data=self.data)
353         self.buf = struct.pack('!xBxx', self.data_len)
354         self.buf += self.data
355
356     def test_init(self):
357         eq_(self.data_len, self.te.data_len)
358         eq_(self.data, self.te.data)
359
360     def test_parser(self):
361         _res = icmp.TimeExceeded.parser(self.buf, 0)
362         if type(_res) is tuple:
363             res = _res[0]
364         else:
365             res = _res
366         eq_(self.data_len, res.data_len)
367         eq_(self.data, res.data)
368
369     def test_serialize(self):
370         buf = self.te.serialize()
371         res = struct.unpack_from('!xBxx', six.binary_type(buf))
372         eq_(self.data_len, res[0])
373         eq_(self.data, buf[struct.calcsize('!xBxx'):])
374
375     def test_default_args(self):
376         te = icmp.TimeExceeded()
377         buf = te.serialize()
378         res = struct.unpack(icmp.TimeExceeded._PACK_STR, six.binary_type(buf))
379
380         eq_(res[0], 0)