1 # Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
2 # Copyright (C) 2013 Isaku Yamahata <yamahata at private email ne jp>
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 # vim: tabstop=4 shiftwidth=4 softtabstop=4
19 from __future__ import print_function
27 from nose.tools import eq_, ok_
28 from nose.tools import raises
30 from ryu.ofproto import inet
31 from ryu.lib.packet import ipv4
32 from ryu.lib.packet import ipv6
33 from ryu.lib.packet import packet
34 from ryu.lib.packet import packet_utils
35 from ryu.lib.packet import vrrp
36 from ryu.lib import addrconv
39 LOG = logging.getLogger(__name__)
42 class Test_vrrpv2(unittest.TestCase):
43 """ Test case for vrrp v2
45 version = vrrp.VRRP_VERSION_V2
46 type_ = vrrp.VRRP_TYPE_ADVERTISEMENT
50 auth_type = vrrp.VRRP_AUTH_NO_AUTH
53 ip_address = '192.168.0.1'
55 vrrpv2 = vrrp.vrrpv2.create(type_, vrid, priority, max_adver_int,
57 buf = struct.pack(vrrp.vrrpv2._PACK_STR + '4sII',
58 vrrp.vrrp_to_version_type(vrrp.VRRP_VERSION_V2, type_),
59 vrid, priority, count_ip,
60 auth_type, max_adver_int, checksum,
61 addrconv.ipv4.text_to_bin(ip_address),
62 auth_data[0], auth_data[1])
71 eq_(self.type_, self.vrrpv2.type)
72 eq_(self.vrid, self.vrrpv2.vrid)
73 eq_(self.priority, self.vrrpv2.priority)
74 eq_(self.count_ip, self.vrrpv2.count_ip)
75 eq_(self.auth_type, self.vrrpv2.auth_type)
76 eq_(1, len(self.vrrpv2.ip_addresses))
77 eq_(self.ip_address, self.vrrpv2.ip_addresses[0])
78 eq_(self.auth_data, self.vrrpv2.auth_data)
80 def test_parser(self):
81 vrrpv2, _cls, _ = self.vrrpv2.parser(self.buf)
83 eq_(self.version, vrrpv2.version)
84 eq_(self.type_, vrrpv2.type)
85 eq_(self.vrid, vrrpv2.vrid)
86 eq_(self.priority, vrrpv2.priority)
87 eq_(self.count_ip, vrrpv2.count_ip)
88 eq_(self.auth_type, vrrpv2.auth_type)
89 eq_(self.max_adver_int, vrrpv2.max_adver_int)
90 eq_(self.checksum, vrrpv2.checksum)
91 eq_(1, len(vrrpv2.ip_addresses))
92 eq_(str, type(vrrpv2.ip_addresses[0]))
93 eq_(self.ip_address, vrrpv2.ip_addresses[0])
94 eq_(self.auth_data, vrrpv2.auth_data)
96 def test_serialize(self):
97 src_ip = '192.168.0.1'
98 dst_ip = vrrp.VRRP_IPV4_DST_ADDRESS
99 prev = ipv4.ipv4(4, 5, 0, 0, 0, 0, 0, vrrp.VRRP_IPV4_TTL,
100 inet.IPPROTO_VRRP, 0, src_ip, dst_ip)
102 type_ = vrrp.VRRP_TYPE_ADVERTISEMENT
106 ip_address = '192.168.0.2'
107 ip_addresses = [ip_address]
109 vrrp_ = vrrp.vrrpv2.create(
110 type_, vrid, priority, max_adver_int, ip_addresses)
112 buf = vrrp_.serialize(bytearray(), prev)
113 pack_str = vrrp.vrrpv2._PACK_STR + '4sII'
114 pack_len = struct.calcsize(pack_str)
115 res = struct.unpack(pack_str, six.binary_type(buf))
116 eq_(res[0], vrrp.vrrp_to_version_type(vrrp.VRRP_VERSION_V2, type_))
118 eq_(res[2], priority)
119 eq_(res[3], len(ip_addresses))
120 eq_(res[4], vrrp.VRRP_AUTH_NO_AUTH)
121 eq_(res[5], max_adver_int)
123 eq_(res[7], addrconv.ipv4.text_to_bin(ip_address))
126 eq_(len(buf), pack_len)
129 s = packet_utils.checksum(buf)
133 def test_malformed_vrrpv2(self):
134 m_short_buf = self.buf[1:vrrp.vrrpv2._MIN_LEN]
135 vrrp.vrrp.parser(m_short_buf)
137 def test_create_packet(self):
138 primary_ip = '192.168.0.2'
139 p0 = self.vrrpv2.create_packet(primary_ip)
141 p1 = packet.Packet(six.binary_type(p0.data))
143 eq_(p0.data, p1.data)
145 def _test_is_valid(self, type_=None, vrid=None, priority=None,
152 priority = self.priority
153 if max_adver_int is None:
154 max_adver_int = self.max_adver_int
156 vrrp_ = vrrp.vrrpv2.create(type_, vrid, priority, max_adver_int,
158 return vrrp_.is_valid()
160 def test_is_valid_ok(self):
161 ok_(self._test_is_valid())
163 def test_is_valid_ng_type(self):
164 ok_(not self._test_is_valid(type_=15))
166 def test_is_valid_ng_vrid_min(self):
167 vrid = vrrp.VRRP_VRID_MIN - 1
168 ok_(not self._test_is_valid(vrid=vrid))
170 def test_is_valid_ng_vrid_max(self):
171 vrid = vrrp.VRRP_VRID_MAX + 1
172 ok_(not self._test_is_valid(vrid=vrid))
174 def test_is_valid_ng_priority_min(self):
175 priority = vrrp.VRRP_PRIORITY_MIN - 1
176 ok_(not self._test_is_valid(priority=priority))
178 def test_is_valid_ng_priority_max(self):
179 priority = vrrp.VRRP_PRIORITY_MAX + 1
180 ok_(not self._test_is_valid(priority=priority))
182 def test_is_valid_ng_adver_min(self):
183 max_adver_int = vrrp.VRRP_V2_MAX_ADVER_INT_MIN - 1
184 ok_(not self._test_is_valid(max_adver_int=max_adver_int))
186 def test_is_valid_ng_adver_max(self):
187 max_adver_int = vrrp.VRRP_V2_MAX_ADVER_INT_MAX + 1
188 ok_(not self._test_is_valid(max_adver_int=max_adver_int))
190 def test_to_string(self):
191 vrrpv2_values = {'version': self.version,
194 'priority': self.priority,
195 'count_ip': self.count_ip,
196 'max_adver_int': self.max_adver_int,
197 'checksum': self.vrrpv2.checksum,
198 'ip_addresses': [self.ip_address],
199 'auth_type': self.auth_type,
200 'auth_data': self.auth_data,
201 'identification': self.vrrpv2.identification}
202 _vrrpv2_str = ','.join(['%s=%s' % (k, repr(vrrpv2_values[k]))
203 for k, v in inspect.getmembers(self.vrrpv2)
204 if k in vrrpv2_values])
205 vrrpv2_str = '%s(%s)' % (vrrp.vrrpv2.__name__, _vrrpv2_str)
207 eq_(str(self.vrrpv2), vrrpv2_str)
208 eq_(repr(self.vrrpv2), vrrpv2_str)
211 class Test_vrrpv3_ipv4(unittest.TestCase):
212 """ Test case for vrrp v3 IPv4
214 version = vrrp.VRRP_VERSION_V3
215 type_ = vrrp.VRRP_TYPE_ADVERTISEMENT
221 ip_address = '192.168.0.1'
222 vrrpv3 = vrrp.vrrpv3.create(type_, vrid, priority, max_adver_int,
224 buf = struct.pack(vrrp.vrrpv3._PACK_STR + '4s',
225 vrrp.vrrp_to_version_type(vrrp.VRRP_VERSION_V3, type_),
226 vrid, priority, count_ip,
227 max_adver_int, checksum,
228 addrconv.ipv4.text_to_bin(ip_address))
237 eq_(self.type_, self.vrrpv3.type)
238 eq_(self.vrid, self.vrrpv3.vrid)
239 eq_(self.priority, self.vrrpv3.priority)
240 eq_(self.count_ip, self.vrrpv3.count_ip)
241 eq_(1, len(self.vrrpv3.ip_addresses))
242 eq_(self.ip_address, self.vrrpv3.ip_addresses[0])
244 def test_parser(self):
245 vrrpv3, _cls, _ = self.vrrpv3.parser(self.buf)
247 eq_(self.version, vrrpv3.version)
248 eq_(self.type_, vrrpv3.type)
249 eq_(self.vrid, vrrpv3.vrid)
250 eq_(self.priority, vrrpv3.priority)
251 eq_(self.count_ip, vrrpv3.count_ip)
252 eq_(self.max_adver_int, vrrpv3.max_adver_int)
253 eq_(self.checksum, vrrpv3.checksum)
254 eq_(1, len(vrrpv3.ip_addresses))
255 eq_(str, type(vrrpv3.ip_addresses[0]))
256 eq_(self.ip_address, vrrpv3.ip_addresses[0])
258 def test_serialize(self):
259 src_ip = '192.168.0.1'
260 dst_ip = vrrp.VRRP_IPV4_DST_ADDRESS
261 prev = ipv4.ipv4(4, 5, 0, 0, 0, 0, 0, vrrp.VRRP_IPV4_TTL,
262 inet.IPPROTO_VRRP, 0, src_ip, dst_ip)
264 type_ = vrrp.VRRP_TYPE_ADVERTISEMENT
268 ip_address = '192.168.0.2'
269 ip_addresses = [ip_address]
271 vrrp_ = vrrp.vrrpv3.create(
272 type_, vrid, priority, max_adver_int, ip_addresses)
274 buf = vrrp_.serialize(bytearray(), prev)
275 print(len(buf), type(buf), buf)
276 pack_str = vrrp.vrrpv3._PACK_STR + '4s'
277 pack_len = struct.calcsize(pack_str)
278 res = struct.unpack(pack_str, six.binary_type(buf))
279 eq_(res[0], vrrp.vrrp_to_version_type(vrrp.VRRP_VERSION_V3, type_))
281 eq_(res[2], priority)
282 eq_(res[3], len(ip_addresses))
283 eq_(res[4], max_adver_int)
285 eq_(res[6], addrconv.ipv4.text_to_bin(ip_address))
286 eq_(len(buf), pack_len)
290 ph = struct.pack('!4s4sxBH',
291 addrconv.ipv4.text_to_bin(src_ip),
292 addrconv.ipv4.text_to_bin(dst_ip),
293 inet.IPPROTO_VRRP, pack_len)
294 s = packet_utils.checksum(ph + buf)
298 def test_malformed_vrrpv3(self):
299 m_short_buf = self.buf[1:vrrp.vrrpv3._MIN_LEN]
300 vrrp.vrrp.parser(m_short_buf)
302 def test_create_packet(self):
303 primary_ip = '192.168.0.2'
304 p0 = self.vrrpv3.create_packet(primary_ip)
306 p1 = packet.Packet(six.binary_type(p0.data))
308 eq_(p0.data, p1.data)
310 def _test_is_valid(self, type_=None, vrid=None, priority=None,
317 priority = self.priority
318 if max_adver_int is None:
319 max_adver_int = self.max_adver_int
321 vrrp_ = vrrp.vrrpv3.create(type_, vrid, priority, max_adver_int,
323 return vrrp_.is_valid()
325 def test_is_valid_ok(self):
326 ok_(self._test_is_valid())
328 def test_is_valid_ng_type(self):
329 ok_(not self._test_is_valid(type_=15))
331 def test_is_valid_ng_vrid_min(self):
332 vrid = vrrp.VRRP_VRID_MIN - 1
333 ok_(not self._test_is_valid(vrid=vrid))
335 def test_is_valid_ng_vrid_max(self):
336 vrid = vrrp.VRRP_VRID_MAX + 1
337 ok_(not self._test_is_valid(vrid=vrid))
339 def test_is_valid_ng_priority_min(self):
340 priority = vrrp.VRRP_PRIORITY_MIN - 1
341 ok_(not self._test_is_valid(priority=priority))
343 def test_is_valid_ng_priority_max(self):
344 priority = vrrp.VRRP_PRIORITY_MAX + 1
345 ok_(not self._test_is_valid(priority=priority))
347 def test_is_valid_ng_adver_min(self):
348 max_adver_int = vrrp.VRRP_V3_MAX_ADVER_INT_MIN - 1
349 ok_(not self._test_is_valid(max_adver_int=max_adver_int))
351 def test_is_valid_ng_adver_max(self):
352 max_adver_int = vrrp.VRRP_V3_MAX_ADVER_INT_MAX + 1
353 ok_(not self._test_is_valid(max_adver_int=max_adver_int))
355 def test_to_string(self):
356 vrrpv3_values = {'version': self.version,
359 'priority': self.priority,
360 'count_ip': self.count_ip,
361 'max_adver_int': self.max_adver_int,
362 'checksum': self.vrrpv3.checksum,
363 'ip_addresses': [self.ip_address],
366 'identification': self.vrrpv3.identification}
367 _vrrpv3_str = ','.join(['%s=%s' % (k, repr(vrrpv3_values[k]))
368 for k, v in inspect.getmembers(self.vrrpv3)
369 if k in vrrpv3_values])
370 vrrpv3_str = '%s(%s)' % (vrrp.vrrpv3.__name__, _vrrpv3_str)
372 eq_(str(self.vrrpv3), vrrpv3_str)
373 eq_(repr(self.vrrpv3), vrrpv3_str)
376 class Test_vrrpv3_ipv6(unittest.TestCase):
377 """ Test case for vrrp v3 IPv6
379 version = vrrp.VRRP_VERSION_V3
380 type_ = vrrp.VRRP_TYPE_ADVERTISEMENT
386 ip_address = '2001:db8:2000::1'
387 vrrpv3 = vrrp.vrrpv3.create(type_, vrid, priority, max_adver_int,
389 buf = struct.pack(vrrp.vrrpv3._PACK_STR + '16s',
390 vrrp.vrrp_to_version_type(vrrp.VRRP_VERSION_V3, type_),
391 vrid, priority, count_ip,
392 max_adver_int, checksum,
393 addrconv.ipv6.text_to_bin(ip_address))
402 eq_(self.type_, self.vrrpv3.type)
403 eq_(self.vrid, self.vrrpv3.vrid)
404 eq_(self.priority, self.vrrpv3.priority)
405 eq_(self.count_ip, self.vrrpv3.count_ip)
406 eq_(1, len(self.vrrpv3.ip_addresses))
407 eq_(self.ip_address, self.vrrpv3.ip_addresses[0])
409 def test_parser(self):
410 vrrpv3, _cls, _ = self.vrrpv3.parser(self.buf)
412 eq_(self.version, vrrpv3.version)
413 eq_(self.type_, vrrpv3.type)
414 eq_(self.vrid, vrrpv3.vrid)
415 eq_(self.priority, vrrpv3.priority)
416 eq_(self.count_ip, vrrpv3.count_ip)
417 eq_(self.max_adver_int, vrrpv3.max_adver_int)
418 eq_(self.checksum, vrrpv3.checksum)
419 eq_(1, len(vrrpv3.ip_addresses))
420 eq_(str, type(vrrpv3.ip_addresses[0]))
421 eq_(self.ip_address, vrrpv3.ip_addresses[0])
423 def test_serialize(self):
424 src_ip = '2001:db8:2000::1'
425 dst_ip = vrrp.VRRP_IPV6_DST_ADDRESS
426 prev = ipv6.ipv6(6, 0, 0, 0, inet.IPPROTO_VRRP,
427 vrrp.VRRP_IPV6_HOP_LIMIT, src_ip, dst_ip)
429 type_ = vrrp.VRRP_TYPE_ADVERTISEMENT
433 ip_address = '2001:db8:2000::2'
434 ip_addresses = [ip_address]
436 vrrp_ = vrrp.vrrpv3.create(
437 type_, vrid, priority, max_adver_int, ip_addresses)
439 buf = vrrp_.serialize(bytearray(), prev)
440 print(len(buf), type(buf), buf)
441 pack_str = vrrp.vrrpv3._PACK_STR + '16s'
442 pack_len = struct.calcsize(pack_str)
443 res = struct.unpack(pack_str, six.binary_type(buf))
444 eq_(res[0], vrrp.vrrp_to_version_type(vrrp.VRRP_VERSION_V3, type_))
446 eq_(res[2], priority)
447 eq_(res[3], len(ip_addresses))
448 eq_(res[4], max_adver_int)
450 eq_(res[6], addrconv.ipv6.text_to_bin(ip_address))
451 eq_(len(buf), pack_len)
455 ph = struct.pack('!16s16sI3xB',
456 addrconv.ipv6.text_to_bin(src_ip),
457 addrconv.ipv6.text_to_bin(dst_ip),
458 pack_len, inet.IPPROTO_VRRP)
459 s = packet_utils.checksum(ph + buf)
463 def test_malformed_vrrpv3(self):
464 m_short_buf = self.buf[1:vrrp.vrrpv3._MIN_LEN]
465 vrrp.vrrp.parser(m_short_buf)
467 def test_create_packet(self):
468 primary_ip = '2001:db8:2000::3'
469 p0 = self.vrrpv3.create_packet(primary_ip)
471 print(len(p0.data), p0.data)
472 p1 = packet.Packet(six.binary_type(p0.data))
474 print(len(p0.data), p0.data)
475 print(len(p1.data), p1.data)
476 eq_(p0.data, p1.data)
478 def test_to_string(self):
479 vrrpv3_values = {'version': self.version,
482 'priority': self.priority,
483 'count_ip': self.count_ip,
484 'max_adver_int': self.max_adver_int,
485 'checksum': self.vrrpv3.checksum,
486 'ip_addresses': [self.ip_address],
489 'identification': self.vrrpv3.identification}
490 _vrrpv3_str = ','.join(['%s=%s' % (k, repr(vrrpv3_values[k]))
491 for k, v in inspect.getmembers(self.vrrpv3)
492 if k in vrrpv3_values])
493 vrrpv3_str = '%s(%s)' % (vrrp.vrrpv3.__name__, _vrrpv3_str)
495 eq_(str(self.vrrpv3), vrrpv3_str)
496 eq_(repr(self.vrrpv3), vrrpv3_str)