backing up
[vsorcdistro/.git] / ryu / build / lib.linux-armv7l-2.7 / ryu / tests / unit / packet / test_vrrp.py
1 # Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
2 # Copyright (C) 2013 Isaku Yamahata <yamahata at private email ne jp>
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #    http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16
17 # vim: tabstop=4 shiftwidth=4 softtabstop=4
18
19 from __future__ import print_function
20
21 import unittest
22 import logging
23 import six
24 import struct
25 import inspect
26
27 from nose.tools import eq_, ok_
28 from nose.tools import raises
29
30 from ryu.ofproto import inet
31 from ryu.lib.packet import ipv4
32 from ryu.lib.packet import ipv6
33 from ryu.lib.packet import packet
34 from ryu.lib.packet import packet_utils
35 from ryu.lib.packet import vrrp
36 from ryu.lib import addrconv
37
38
39 LOG = logging.getLogger(__name__)
40
41
42 class Test_vrrpv2(unittest.TestCase):
43     """ Test case for vrrp v2
44     """
45     version = vrrp.VRRP_VERSION_V2
46     type_ = vrrp.VRRP_TYPE_ADVERTISEMENT
47     vrid = 128
48     priority = 100
49     count_ip = 1
50     auth_type = vrrp.VRRP_AUTH_NO_AUTH
51     max_adver_int = 100
52     checksum = 0
53     ip_address = '192.168.0.1'
54     auth_data = (0, 0)
55     vrrpv2 = vrrp.vrrpv2.create(type_, vrid, priority, max_adver_int,
56                                 [ip_address])
57     buf = struct.pack(vrrp.vrrpv2._PACK_STR + '4sII',
58                       vrrp.vrrp_to_version_type(vrrp.VRRP_VERSION_V2, type_),
59                       vrid, priority, count_ip,
60                       auth_type, max_adver_int, checksum,
61                       addrconv.ipv4.text_to_bin(ip_address),
62                       auth_data[0], auth_data[1])
63
64     def setUp(self):
65         pass
66
67     def tearDown(self):
68         pass
69
70     def test_init(self):
71         eq_(self.type_, self.vrrpv2.type)
72         eq_(self.vrid, self.vrrpv2.vrid)
73         eq_(self.priority, self.vrrpv2.priority)
74         eq_(self.count_ip, self.vrrpv2.count_ip)
75         eq_(self.auth_type, self.vrrpv2.auth_type)
76         eq_(1, len(self.vrrpv2.ip_addresses))
77         eq_(self.ip_address, self.vrrpv2.ip_addresses[0])
78         eq_(self.auth_data, self.vrrpv2.auth_data)
79
80     def test_parser(self):
81         vrrpv2, _cls, _ = self.vrrpv2.parser(self.buf)
82
83         eq_(self.version, vrrpv2.version)
84         eq_(self.type_, vrrpv2.type)
85         eq_(self.vrid, vrrpv2.vrid)
86         eq_(self.priority, vrrpv2.priority)
87         eq_(self.count_ip, vrrpv2.count_ip)
88         eq_(self.auth_type, vrrpv2.auth_type)
89         eq_(self.max_adver_int, vrrpv2.max_adver_int)
90         eq_(self.checksum, vrrpv2.checksum)
91         eq_(1, len(vrrpv2.ip_addresses))
92         eq_(str, type(vrrpv2.ip_addresses[0]))
93         eq_(self.ip_address, vrrpv2.ip_addresses[0])
94         eq_(self.auth_data, vrrpv2.auth_data)
95
96     def test_serialize(self):
97         src_ip = '192.168.0.1'
98         dst_ip = vrrp.VRRP_IPV4_DST_ADDRESS
99         prev = ipv4.ipv4(4, 5, 0, 0, 0, 0, 0, vrrp.VRRP_IPV4_TTL,
100                          inet.IPPROTO_VRRP, 0, src_ip, dst_ip)
101
102         type_ = vrrp.VRRP_TYPE_ADVERTISEMENT
103         vrid = 5
104         priority = 10
105         max_adver_int = 30
106         ip_address = '192.168.0.2'
107         ip_addresses = [ip_address]
108
109         vrrp_ = vrrp.vrrpv2.create(
110             type_, vrid, priority, max_adver_int, ip_addresses)
111
112         buf = vrrp_.serialize(bytearray(), prev)
113         pack_str = vrrp.vrrpv2._PACK_STR + '4sII'
114         pack_len = struct.calcsize(pack_str)
115         res = struct.unpack(pack_str, six.binary_type(buf))
116         eq_(res[0], vrrp.vrrp_to_version_type(vrrp.VRRP_VERSION_V2, type_))
117         eq_(res[1], vrid)
118         eq_(res[2], priority)
119         eq_(res[3], len(ip_addresses))
120         eq_(res[4], vrrp.VRRP_AUTH_NO_AUTH)
121         eq_(res[5], max_adver_int)
122         # res[6] is checksum
123         eq_(res[7], addrconv.ipv4.text_to_bin(ip_address))
124         eq_(res[8], 0)
125         eq_(res[9], 0)
126         eq_(len(buf), pack_len)
127
128         # checksum
129         s = packet_utils.checksum(buf)
130         eq_(0, s)
131
132     @raises(Exception)
133     def test_malformed_vrrpv2(self):
134         m_short_buf = self.buf[1:vrrp.vrrpv2._MIN_LEN]
135         vrrp.vrrp.parser(m_short_buf)
136
137     def test_create_packet(self):
138         primary_ip = '192.168.0.2'
139         p0 = self.vrrpv2.create_packet(primary_ip)
140         p0.serialize()
141         p1 = packet.Packet(six.binary_type(p0.data))
142         p1.serialize()
143         eq_(p0.data, p1.data)
144
145     def _test_is_valid(self, type_=None, vrid=None, priority=None,
146                        max_adver_int=None):
147         if type_ is None:
148             type_ = self.type_
149         if vrid is None:
150             vrid = self.vrid
151         if priority is None:
152             priority = self.priority
153         if max_adver_int is None:
154             max_adver_int = self.max_adver_int
155
156         vrrp_ = vrrp.vrrpv2.create(type_, vrid, priority, max_adver_int,
157                                    [self.ip_address])
158         return vrrp_.is_valid()
159
160     def test_is_valid_ok(self):
161         ok_(self._test_is_valid())
162
163     def test_is_valid_ng_type(self):
164         ok_(not self._test_is_valid(type_=15))
165
166     def test_is_valid_ng_vrid_min(self):
167         vrid = vrrp.VRRP_VRID_MIN - 1
168         ok_(not self._test_is_valid(vrid=vrid))
169
170     def test_is_valid_ng_vrid_max(self):
171         vrid = vrrp.VRRP_VRID_MAX + 1
172         ok_(not self._test_is_valid(vrid=vrid))
173
174     def test_is_valid_ng_priority_min(self):
175         priority = vrrp.VRRP_PRIORITY_MIN - 1
176         ok_(not self._test_is_valid(priority=priority))
177
178     def test_is_valid_ng_priority_max(self):
179         priority = vrrp.VRRP_PRIORITY_MAX + 1
180         ok_(not self._test_is_valid(priority=priority))
181
182     def test_is_valid_ng_adver_min(self):
183         max_adver_int = vrrp.VRRP_V2_MAX_ADVER_INT_MIN - 1
184         ok_(not self._test_is_valid(max_adver_int=max_adver_int))
185
186     def test_is_valid_ng_adver_max(self):
187         max_adver_int = vrrp.VRRP_V2_MAX_ADVER_INT_MAX + 1
188         ok_(not self._test_is_valid(max_adver_int=max_adver_int))
189
190     def test_to_string(self):
191         vrrpv2_values = {'version': self.version,
192                          'type': self.type_,
193                          'vrid': self.vrid,
194                          'priority': self.priority,
195                          'count_ip': self.count_ip,
196                          'max_adver_int': self.max_adver_int,
197                          'checksum': self.vrrpv2.checksum,
198                          'ip_addresses': [self.ip_address],
199                          'auth_type': self.auth_type,
200                          'auth_data': self.auth_data,
201                          'identification': self.vrrpv2.identification}
202         _vrrpv2_str = ','.join(['%s=%s' % (k, repr(vrrpv2_values[k]))
203                                 for k, v in inspect.getmembers(self.vrrpv2)
204                                 if k in vrrpv2_values])
205         vrrpv2_str = '%s(%s)' % (vrrp.vrrpv2.__name__, _vrrpv2_str)
206
207         eq_(str(self.vrrpv2), vrrpv2_str)
208         eq_(repr(self.vrrpv2), vrrpv2_str)
209
210
211 class Test_vrrpv3_ipv4(unittest.TestCase):
212     """ Test case for vrrp v3 IPv4
213     """
214     version = vrrp.VRRP_VERSION_V3
215     type_ = vrrp.VRRP_TYPE_ADVERTISEMENT
216     vrid = 128
217     priority = 99
218     count_ip = 1
219     max_adver_int = 111
220     checksum = 0
221     ip_address = '192.168.0.1'
222     vrrpv3 = vrrp.vrrpv3.create(type_, vrid, priority, max_adver_int,
223                                 [ip_address])
224     buf = struct.pack(vrrp.vrrpv3._PACK_STR + '4s',
225                       vrrp.vrrp_to_version_type(vrrp.VRRP_VERSION_V3, type_),
226                       vrid, priority, count_ip,
227                       max_adver_int, checksum,
228                       addrconv.ipv4.text_to_bin(ip_address))
229
230     def setUp(self):
231         pass
232
233     def tearDown(self):
234         pass
235
236     def test_init(self):
237         eq_(self.type_, self.vrrpv3.type)
238         eq_(self.vrid, self.vrrpv3.vrid)
239         eq_(self.priority, self.vrrpv3.priority)
240         eq_(self.count_ip, self.vrrpv3.count_ip)
241         eq_(1, len(self.vrrpv3.ip_addresses))
242         eq_(self.ip_address, self.vrrpv3.ip_addresses[0])
243
244     def test_parser(self):
245         vrrpv3, _cls, _ = self.vrrpv3.parser(self.buf)
246
247         eq_(self.version, vrrpv3.version)
248         eq_(self.type_, vrrpv3.type)
249         eq_(self.vrid, vrrpv3.vrid)
250         eq_(self.priority, vrrpv3.priority)
251         eq_(self.count_ip, vrrpv3.count_ip)
252         eq_(self.max_adver_int, vrrpv3.max_adver_int)
253         eq_(self.checksum, vrrpv3.checksum)
254         eq_(1, len(vrrpv3.ip_addresses))
255         eq_(str, type(vrrpv3.ip_addresses[0]))
256         eq_(self.ip_address, vrrpv3.ip_addresses[0])
257
258     def test_serialize(self):
259         src_ip = '192.168.0.1'
260         dst_ip = vrrp.VRRP_IPV4_DST_ADDRESS
261         prev = ipv4.ipv4(4, 5, 0, 0, 0, 0, 0, vrrp.VRRP_IPV4_TTL,
262                          inet.IPPROTO_VRRP, 0, src_ip, dst_ip)
263
264         type_ = vrrp.VRRP_TYPE_ADVERTISEMENT
265         vrid = 5
266         priority = 10
267         max_adver_int = 30
268         ip_address = '192.168.0.2'
269         ip_addresses = [ip_address]
270
271         vrrp_ = vrrp.vrrpv3.create(
272             type_, vrid, priority, max_adver_int, ip_addresses)
273
274         buf = vrrp_.serialize(bytearray(), prev)
275         print(len(buf), type(buf), buf)
276         pack_str = vrrp.vrrpv3._PACK_STR + '4s'
277         pack_len = struct.calcsize(pack_str)
278         res = struct.unpack(pack_str, six.binary_type(buf))
279         eq_(res[0], vrrp.vrrp_to_version_type(vrrp.VRRP_VERSION_V3, type_))
280         eq_(res[1], vrid)
281         eq_(res[2], priority)
282         eq_(res[3], len(ip_addresses))
283         eq_(res[4], max_adver_int)
284         # res[5] is checksum
285         eq_(res[6], addrconv.ipv4.text_to_bin(ip_address))
286         eq_(len(buf), pack_len)
287         print(res)
288
289         # checksum
290         ph = struct.pack('!4s4sxBH',
291                          addrconv.ipv4.text_to_bin(src_ip),
292                          addrconv.ipv4.text_to_bin(dst_ip),
293                          inet.IPPROTO_VRRP, pack_len)
294         s = packet_utils.checksum(ph + buf)
295         eq_(0, s)
296
297     @raises(Exception)
298     def test_malformed_vrrpv3(self):
299         m_short_buf = self.buf[1:vrrp.vrrpv3._MIN_LEN]
300         vrrp.vrrp.parser(m_short_buf)
301
302     def test_create_packet(self):
303         primary_ip = '192.168.0.2'
304         p0 = self.vrrpv3.create_packet(primary_ip)
305         p0.serialize()
306         p1 = packet.Packet(six.binary_type(p0.data))
307         p1.serialize()
308         eq_(p0.data, p1.data)
309
310     def _test_is_valid(self, type_=None, vrid=None, priority=None,
311                        max_adver_int=None):
312         if type_ is None:
313             type_ = self.type_
314         if vrid is None:
315             vrid = self.vrid
316         if priority is None:
317             priority = self.priority
318         if max_adver_int is None:
319             max_adver_int = self.max_adver_int
320
321         vrrp_ = vrrp.vrrpv3.create(type_, vrid, priority, max_adver_int,
322                                    [self.ip_address])
323         return vrrp_.is_valid()
324
325     def test_is_valid_ok(self):
326         ok_(self._test_is_valid())
327
328     def test_is_valid_ng_type(self):
329         ok_(not self._test_is_valid(type_=15))
330
331     def test_is_valid_ng_vrid_min(self):
332         vrid = vrrp.VRRP_VRID_MIN - 1
333         ok_(not self._test_is_valid(vrid=vrid))
334
335     def test_is_valid_ng_vrid_max(self):
336         vrid = vrrp.VRRP_VRID_MAX + 1
337         ok_(not self._test_is_valid(vrid=vrid))
338
339     def test_is_valid_ng_priority_min(self):
340         priority = vrrp.VRRP_PRIORITY_MIN - 1
341         ok_(not self._test_is_valid(priority=priority))
342
343     def test_is_valid_ng_priority_max(self):
344         priority = vrrp.VRRP_PRIORITY_MAX + 1
345         ok_(not self._test_is_valid(priority=priority))
346
347     def test_is_valid_ng_adver_min(self):
348         max_adver_int = vrrp.VRRP_V3_MAX_ADVER_INT_MIN - 1
349         ok_(not self._test_is_valid(max_adver_int=max_adver_int))
350
351     def test_is_valid_ng_adver_max(self):
352         max_adver_int = vrrp.VRRP_V3_MAX_ADVER_INT_MAX + 1
353         ok_(not self._test_is_valid(max_adver_int=max_adver_int))
354
355     def test_to_string(self):
356         vrrpv3_values = {'version': self.version,
357                          'type': self.type_,
358                          'vrid': self.vrid,
359                          'priority': self.priority,
360                          'count_ip': self.count_ip,
361                          'max_adver_int': self.max_adver_int,
362                          'checksum': self.vrrpv3.checksum,
363                          'ip_addresses': [self.ip_address],
364                          'auth_type': None,
365                          'auth_data': None,
366                          'identification': self.vrrpv3.identification}
367         _vrrpv3_str = ','.join(['%s=%s' % (k, repr(vrrpv3_values[k]))
368                                 for k, v in inspect.getmembers(self.vrrpv3)
369                                 if k in vrrpv3_values])
370         vrrpv3_str = '%s(%s)' % (vrrp.vrrpv3.__name__, _vrrpv3_str)
371
372         eq_(str(self.vrrpv3), vrrpv3_str)
373         eq_(repr(self.vrrpv3), vrrpv3_str)
374
375
376 class Test_vrrpv3_ipv6(unittest.TestCase):
377     """ Test case for vrrp v3 IPv6
378     """
379     version = vrrp.VRRP_VERSION_V3
380     type_ = vrrp.VRRP_TYPE_ADVERTISEMENT
381     vrid = 128
382     priority = 99
383     count_ip = 1
384     max_adver_int = 111
385     checksum = 0
386     ip_address = '2001:db8:2000::1'
387     vrrpv3 = vrrp.vrrpv3.create(type_, vrid, priority, max_adver_int,
388                                 [ip_address])
389     buf = struct.pack(vrrp.vrrpv3._PACK_STR + '16s',
390                       vrrp.vrrp_to_version_type(vrrp.VRRP_VERSION_V3, type_),
391                       vrid, priority, count_ip,
392                       max_adver_int, checksum,
393                       addrconv.ipv6.text_to_bin(ip_address))
394
395     def setUp(self):
396         pass
397
398     def tearDown(self):
399         pass
400
401     def test_init(self):
402         eq_(self.type_, self.vrrpv3.type)
403         eq_(self.vrid, self.vrrpv3.vrid)
404         eq_(self.priority, self.vrrpv3.priority)
405         eq_(self.count_ip, self.vrrpv3.count_ip)
406         eq_(1, len(self.vrrpv3.ip_addresses))
407         eq_(self.ip_address, self.vrrpv3.ip_addresses[0])
408
409     def test_parser(self):
410         vrrpv3, _cls, _ = self.vrrpv3.parser(self.buf)
411
412         eq_(self.version, vrrpv3.version)
413         eq_(self.type_, vrrpv3.type)
414         eq_(self.vrid, vrrpv3.vrid)
415         eq_(self.priority, vrrpv3.priority)
416         eq_(self.count_ip, vrrpv3.count_ip)
417         eq_(self.max_adver_int, vrrpv3.max_adver_int)
418         eq_(self.checksum, vrrpv3.checksum)
419         eq_(1, len(vrrpv3.ip_addresses))
420         eq_(str, type(vrrpv3.ip_addresses[0]))
421         eq_(self.ip_address, vrrpv3.ip_addresses[0])
422
423     def test_serialize(self):
424         src_ip = '2001:db8:2000::1'
425         dst_ip = vrrp.VRRP_IPV6_DST_ADDRESS
426         prev = ipv6.ipv6(6, 0, 0, 0, inet.IPPROTO_VRRP,
427                          vrrp.VRRP_IPV6_HOP_LIMIT, src_ip, dst_ip)
428
429         type_ = vrrp.VRRP_TYPE_ADVERTISEMENT
430         vrid = 5
431         priority = 10
432         max_adver_int = 30
433         ip_address = '2001:db8:2000::2'
434         ip_addresses = [ip_address]
435
436         vrrp_ = vrrp.vrrpv3.create(
437             type_, vrid, priority, max_adver_int, ip_addresses)
438
439         buf = vrrp_.serialize(bytearray(), prev)
440         print(len(buf), type(buf), buf)
441         pack_str = vrrp.vrrpv3._PACK_STR + '16s'
442         pack_len = struct.calcsize(pack_str)
443         res = struct.unpack(pack_str, six.binary_type(buf))
444         eq_(res[0], vrrp.vrrp_to_version_type(vrrp.VRRP_VERSION_V3, type_))
445         eq_(res[1], vrid)
446         eq_(res[2], priority)
447         eq_(res[3], len(ip_addresses))
448         eq_(res[4], max_adver_int)
449         # res[5] is checksum
450         eq_(res[6], addrconv.ipv6.text_to_bin(ip_address))
451         eq_(len(buf), pack_len)
452         print(res)
453
454         # checksum
455         ph = struct.pack('!16s16sI3xB',
456                          addrconv.ipv6.text_to_bin(src_ip),
457                          addrconv.ipv6.text_to_bin(dst_ip),
458                          pack_len, inet.IPPROTO_VRRP)
459         s = packet_utils.checksum(ph + buf)
460         eq_(0, s)
461
462     @raises(Exception)
463     def test_malformed_vrrpv3(self):
464         m_short_buf = self.buf[1:vrrp.vrrpv3._MIN_LEN]
465         vrrp.vrrp.parser(m_short_buf)
466
467     def test_create_packet(self):
468         primary_ip = '2001:db8:2000::3'
469         p0 = self.vrrpv3.create_packet(primary_ip)
470         p0.serialize()
471         print(len(p0.data), p0.data)
472         p1 = packet.Packet(six.binary_type(p0.data))
473         p1.serialize()
474         print(len(p0.data), p0.data)
475         print(len(p1.data), p1.data)
476         eq_(p0.data, p1.data)
477
478     def test_to_string(self):
479         vrrpv3_values = {'version': self.version,
480                          'type': self.type_,
481                          'vrid': self.vrid,
482                          'priority': self.priority,
483                          'count_ip': self.count_ip,
484                          'max_adver_int': self.max_adver_int,
485                          'checksum': self.vrrpv3.checksum,
486                          'ip_addresses': [self.ip_address],
487                          'auth_type': None,
488                          'auth_data': None,
489                          'identification': self.vrrpv3.identification}
490         _vrrpv3_str = ','.join(['%s=%s' % (k, repr(vrrpv3_values[k]))
491                                 for k, v in inspect.getmembers(self.vrrpv3)
492                                 if k in vrrpv3_values])
493         vrrpv3_str = '%s(%s)' % (vrrp.vrrpv3.__name__, _vrrpv3_str)
494
495         eq_(str(self.vrrpv3), vrrpv3_str)
496         eq_(repr(self.vrrpv3), vrrpv3_str)