1 # Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
23 from nose.tools import eq_
24 from ryu.lib.packet import icmp
25 from ryu.lib.packet import packet_utils
28 LOG = logging.getLogger(__name__)
31 class Test_icmp(unittest.TestCase):
39 unreach_data_len = None
45 self.type_ = icmp.ICMP_ECHO_REQUEST
50 self.ic = icmp.icmp(self.type_, self.code, self.csum, self.data)
52 self.buf = bytearray(struct.pack(
53 icmp.icmp._PACK_STR, self.type_, self.code, self.csum))
54 self.csum_calc = packet_utils.checksum(self.buf)
55 struct.pack_into('!H', self.buf, 2, self.csum_calc)
57 def setUp_with_echo(self):
60 self.echo_data = b'\x30\x0e\x09\x00\x00\x00\x00\x00' \
61 + b'\x10\x11\x12\x13\x14\x15\x16\x17' \
62 + b'\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f' \
63 + b'\x20\x21\x22\x23\x24\x25\x26\x27' \
64 + b'\x28\x29\x2a\x2b\x2c\x2d\x2e\x2f' \
65 + b'\x30\x31\x32\x33\x34\x35\x36\x37'
66 self.data = icmp.echo(
67 id_=self.echo_id, seq=self.echo_seq, data=self.echo_data)
69 self.type_ = icmp.ICMP_ECHO_REQUEST
71 self.ic = icmp.icmp(self.type_, self.code, self.csum, self.data)
73 self.buf = bytearray(struct.pack(
74 icmp.icmp._PACK_STR, self.type_, self.code, self.csum))
75 self.buf += self.data.serialize()
76 self.csum_calc = packet_utils.checksum(self.buf)
77 struct.pack_into('!H', self.buf, 2, self.csum_calc)
79 def setUp_with_dest_unreach(self):
81 self.unreach_data = b'abc'
82 self.unreach_data_len = len(self.unreach_data)
83 self.data = icmp.dest_unreach(
84 data_len=self.unreach_data_len, mtu=self.unreach_mtu,
85 data=self.unreach_data)
87 self.type_ = icmp.ICMP_DEST_UNREACH
88 self.code = icmp.ICMP_HOST_UNREACH_CODE
89 self.ic = icmp.icmp(self.type_, self.code, self.csum, self.data)
91 self.buf = bytearray(struct.pack(
92 icmp.icmp._PACK_STR, self.type_, self.code, self.csum))
93 self.buf += self.data.serialize()
94 self.csum_calc = packet_utils.checksum(self.buf)
95 struct.pack_into('!H', self.buf, 2, self.csum_calc)
97 def setUp_with_TimeExceeded(self):
99 self.te_data_len = len(self.te_data)
100 self.data = icmp.TimeExceeded(
101 data_len=self.te_data_len, data=self.te_data)
103 self.type_ = icmp.ICMP_TIME_EXCEEDED
105 self.ic = icmp.icmp(self.type_, self.code, self.csum, self.data)
107 self.buf = bytearray(struct.pack(
108 icmp.icmp._PACK_STR, self.type_, self.code, self.csum))
109 self.buf += self.data.serialize()
110 self.csum_calc = packet_utils.checksum(self.buf)
111 struct.pack_into('!H', self.buf, 2, self.csum_calc)
114 eq_(self.type_, self.ic.type)
115 eq_(self.code, self.ic.code)
116 eq_(self.csum, self.ic.csum)
117 eq_(str(self.data), str(self.ic.data))
119 def test_init_with_echo(self):
120 self.setUp_with_echo()
123 def test_init_with_dest_unreach(self):
124 self.setUp_with_dest_unreach()
127 def test_init_with_TimeExceeded(self):
128 self.setUp_with_TimeExceeded()
131 def test_parser(self):
132 _res = icmp.icmp.parser(six.binary_type(self.buf))
133 if type(_res) is tuple:
138 eq_(self.type_, res.type)
139 eq_(self.code, res.code)
140 eq_(self.csum_calc, res.csum)
141 eq_(str(self.data), str(res.data))
143 def test_parser_with_echo(self):
144 self.setUp_with_echo()
147 def test_parser_with_dest_unreach(self):
148 self.setUp_with_dest_unreach()
151 def test_parser_with_TimeExceeded(self):
152 self.setUp_with_TimeExceeded()
155 def test_serialize(self):
158 buf = self.ic.serialize(data, prev)
160 res = struct.unpack_from(icmp.icmp._PACK_STR, six.binary_type(buf))
162 eq_(self.type_, res[0])
163 eq_(self.code, res[1])
164 eq_(self.csum_calc, res[2])
166 def test_serialize_with_echo(self):
167 self.setUp_with_echo()
168 self.test_serialize()
172 buf = self.ic.serialize(data, prev)
173 echo = icmp.echo.parser(six.binary_type(buf), icmp.icmp._MIN_LEN)
174 eq_(repr(self.data), repr(echo))
176 def test_serialize_with_dest_unreach(self):
177 self.setUp_with_dest_unreach()
178 self.test_serialize()
182 buf = self.ic.serialize(data, prev)
183 unreach = icmp.dest_unreach.parser(six.binary_type(buf), icmp.icmp._MIN_LEN)
184 eq_(repr(self.data), repr(unreach))
186 def test_serialize_with_TimeExceeded(self):
187 self.setUp_with_TimeExceeded()
188 self.test_serialize()
192 buf = self.ic.serialize(data, prev)
193 te = icmp.TimeExceeded.parser(six.binary_type(buf), icmp.icmp._MIN_LEN)
194 eq_(repr(self.data), repr(te))
196 def test_to_string(self):
197 icmp_values = {'type': repr(self.type_),
198 'code': repr(self.code),
199 'csum': repr(self.csum),
200 'data': repr(self.data)}
201 _ic_str = ','.join(['%s=%s' % (k, icmp_values[k])
202 for k, v in inspect.getmembers(self.ic)
203 if k in icmp_values])
204 ic_str = '%s(%s)' % (icmp.icmp.__name__, _ic_str)
206 eq_(str(self.ic), ic_str)
207 eq_(repr(self.ic), ic_str)
209 def test_to_string_with_echo(self):
210 self.setUp_with_echo()
211 self.test_to_string()
213 def test_to_string_with_dest_unreach(self):
214 self.setUp_with_dest_unreach()
215 self.test_to_string()
217 def test_to_string_with_TimeExceeded(self):
218 self.setUp_with_TimeExceeded()
219 self.test_to_string()
221 def test_default_args(self):
223 buf = ic.serialize(bytearray(), None)
224 res = struct.unpack(icmp.icmp._PACK_STR, six.binary_type(buf[:4]))
228 eq_(buf[4:], b'\x00\x00\x00\x00')
231 ic = icmp.icmp(type_=icmp.ICMP_DEST_UNREACH, data=icmp.dest_unreach())
232 buf = ic.serialize(bytearray(), None)
233 res = struct.unpack(icmp.icmp._PACK_STR, six.binary_type(buf[:4]))
237 eq_(buf[4:], b'\x00\x00\x00\x00')
240 jsondict = self.ic.to_jsondict()
241 ic = icmp.icmp.from_jsondict(jsondict['icmp'])
242 eq_(str(self.ic), str(ic))
244 def test_json_with_echo(self):
245 self.setUp_with_echo()
248 def test_json_with_dest_unreach(self):
249 self.setUp_with_dest_unreach()
252 def test_json_with_TimeExceeded(self):
253 self.setUp_with_TimeExceeded()
257 class Test_echo(unittest.TestCase):
262 self.data = b'\x30\x0e\x09\x00\x00\x00\x00\x00' \
263 + b'\x10\x11\x12\x13\x14\x15\x16\x17' \
264 + b'\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f' \
265 + b'\x20\x21\x22\x23\x24\x25\x26\x27' \
266 + b'\x28\x29\x2a\x2b\x2c\x2d\x2e\x2f' \
267 + b'\x30\x31\x32\x33\x34\x35\x36\x37'
268 self.echo = icmp.echo(
269 self.id_, self.seq, self.data)
270 self.buf = struct.pack('!HH', self.id_, self.seq)
271 self.buf += self.data
274 eq_(self.id_, self.echo.id)
275 eq_(self.seq, self.echo.seq)
276 eq_(self.data, self.echo.data)
278 def test_parser(self):
279 _res = icmp.echo.parser(self.buf, 0)
280 if type(_res) is tuple:
284 eq_(self.id_, res.id)
285 eq_(self.seq, res.seq)
286 eq_(self.data, res.data)
288 def test_serialize(self):
289 buf = self.echo.serialize()
290 res = struct.unpack_from('!HH', six.binary_type(buf))
291 eq_(self.id_, res[0])
292 eq_(self.seq, res[1])
293 eq_(self.data, buf[struct.calcsize('!HH'):])
295 def test_default_args(self):
298 res = struct.unpack(icmp.echo._PACK_STR, six.binary_type(buf))
304 class Test_dest_unreach(unittest.TestCase):
309 self.data_len = len(self.data)
310 self.dest_unreach = icmp.dest_unreach(
311 data_len=self.data_len, mtu=self.mtu, data=self.data)
312 self.buf = struct.pack('!xBH', self.data_len, self.mtu)
313 self.buf += self.data
316 eq_(self.data_len, self.dest_unreach.data_len)
317 eq_(self.mtu, self.dest_unreach.mtu)
318 eq_(self.data, self.dest_unreach.data)
320 def test_parser(self):
321 _res = icmp.dest_unreach.parser(self.buf, 0)
322 if type(_res) is tuple:
326 eq_(self.data_len, res.data_len)
327 eq_(self.mtu, res.mtu)
328 eq_(self.data, res.data)
330 def test_serialize(self):
331 buf = self.dest_unreach.serialize()
332 res = struct.unpack_from('!xBH', six.binary_type(buf))
333 eq_(self.data_len, res[0])
334 eq_(self.mtu, res[1])
335 eq_(self.data, buf[struct.calcsize('!xBH'):])
337 def test_default_args(self):
338 du = icmp.dest_unreach()
340 res = struct.unpack(icmp.dest_unreach._PACK_STR, six.binary_type(buf))
346 class Test_TimeExceeded(unittest.TestCase):
350 self.data_len = len(self.data)
351 self.te = icmp.TimeExceeded(
352 data_len=self.data_len, data=self.data)
353 self.buf = struct.pack('!xBxx', self.data_len)
354 self.buf += self.data
357 eq_(self.data_len, self.te.data_len)
358 eq_(self.data, self.te.data)
360 def test_parser(self):
361 _res = icmp.TimeExceeded.parser(self.buf, 0)
362 if type(_res) is tuple:
366 eq_(self.data_len, res.data_len)
367 eq_(self.data, res.data)
369 def test_serialize(self):
370 buf = self.te.serialize()
371 res = struct.unpack_from('!xBxx', six.binary_type(buf))
372 eq_(self.data_len, res[0])
373 eq_(self.data, buf[struct.calcsize('!xBxx'):])
375 def test_default_args(self):
376 te = icmp.TimeExceeded()
378 res = struct.unpack(icmp.TimeExceeded._PACK_STR, six.binary_type(buf))