1 # Copyright (C) 2017 Nippon Telegraph and Telephone Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 from __future__ import print_function
19 import mock # Python 2
21 from unittest import mock # Python 3
28 from nose.tools import eq_
29 from nose.tools import ok_
30 from nose.tools import raises
33 from ryu.lib import pcaplib
34 from ryu.lib.packet import packet
35 from ryu.lib.packet import zebra
36 from ryu.utils import binary_str
39 PCAP_DATA_DIR = os.path.join(
40 os.path.dirname(sys.modules[__name__].__file__),
41 '../../packet_data/pcap/')
44 _patch_frr_v2 = mock.patch(
45 'ryu.lib.packet.zebra._is_frr_version_ge',
46 mock.MagicMock(side_effect=lambda x: x == zebra._FRR_VERSION_2_0))
49 class Test_zebra(unittest.TestCase):
51 Test case for ryu.lib.packet.zebra.
55 def _test_pcap_single(f):
56 zebra_pcap_file = os.path.join(PCAP_DATA_DIR, f + '.pcap')
57 # print('*** testing %s' % zebra_pcap_file)
59 for _, buf in pcaplib.Reader(open(zebra_pcap_file, 'rb')):
60 # Checks if Zebra message can be parsed as expected.
61 pkt = packet.Packet(buf)
62 zebra_pkts = pkt.get_protocols(zebra.ZebraMessage)
63 for zebra_pkt in zebra_pkts:
64 ok_(isinstance(zebra_pkt, zebra.ZebraMessage),
65 'Failed to parse Zebra message: %s' % pkt)
66 ok_(not isinstance(pkt.protocols[-1],
67 (six.binary_type, bytearray)),
68 'Some messages could not be parsed in %s: %s' % (f, pkt))
70 # Checks if Zebra message can be serialized as expected.
72 eq_(binary_str(buf), binary_str(pkt.data))
74 def test_pcap_quagga(self):
81 self._test_pcap_single(f)
84 def test_pcap_frr_v2(self):
86 'zebra_v4_frr_v2', # API version 4 on FRRouting v2.0
90 self._test_pcap_single(f)
93 class TestZebraMessage(unittest.TestCase):
95 def test_get_header_size(self):
96 eq_(zebra.ZebraMessage.V0_HEADER_SIZE,
97 zebra.ZebraMessage.get_header_size(0))
98 eq_(zebra.ZebraMessage.V1_HEADER_SIZE,
99 zebra.ZebraMessage.get_header_size(2))
100 eq_(zebra.ZebraMessage.V3_HEADER_SIZE,
101 zebra.ZebraMessage.get_header_size(3))
102 eq_(zebra.ZebraMessage.V3_HEADER_SIZE,
103 zebra.ZebraMessage.get_header_size(4))
106 def test_get_header_size_invalid_version(self):
107 eq_(zebra.ZebraMessage.V0_HEADER_SIZE,
108 zebra.ZebraMessage.get_header_size(0xff))
111 class TestZebraRedistributeAdd(unittest.TestCase):
115 route_type = zebra.ZEBRA_ROUTE_CONNECT
117 def test_parser(self):
118 body = zebra.ZebraRedistributeAdd.parse(self.buf, version=3)
120 eq_(self.route_type, body.route_type)
122 buf = body.serialize(version=3)
124 eq_(binary_str(self.buf), binary_str(buf))
127 class TestZebraIPv4ImportLookup(unittest.TestCase):
130 b'\xc0\xa8\x01\x01' # prefix
132 prefix = '192.168.1.1/24'
137 def test_parser(self):
138 body = zebra.ZebraIPv4ImportLookup.parse(self.buf)
140 eq_(self.prefix, body.prefix)
141 eq_(self.metric, body.metric)
142 eq_(self.nexthop_num, len(body.nexthops))
143 eq_(self.from_zebra, body.from_zebra)
145 buf = body.serialize()
147 eq_(binary_str(self.buf), binary_str(buf))
150 class TestZebraIPv4ImportLookupFromZebra(unittest.TestCase):
152 b'\xc0\xa8\x01\x01' # prefix
153 b'\x00\x00\x00\x14' # metric
154 b'\x01' # nexthop_num
155 b'\x01' # nexthop_type
156 b'\x00\x00\x00\x02' # ifindex
158 prefix = '192.168.1.1'
161 nexthop_type = zebra.ZEBRA_NEXTHOP_IFINDEX
165 def test_parser(self):
166 body = zebra.ZebraIPv4ImportLookup.parse_from_zebra(self.buf)
168 eq_(self.prefix, body.prefix)
169 eq_(self.metric, body.metric)
170 eq_(self.nexthop_num, len(body.nexthops))
171 eq_(self.nexthop_type, body.nexthops[0].type)
172 eq_(self.ifindex, body.nexthops[0].ifindex)
173 eq_(self.from_zebra, body.from_zebra)
175 buf = body.serialize()
177 eq_(binary_str(self.buf), binary_str(buf))
180 class TestZebraIPv4NexthopLookupMRib(unittest.TestCase):
182 b'\xc0\xa8\x01\x01' # addr
189 def test_parser(self):
190 body = zebra.ZebraIPv4NexthopLookupMRib.parse(self.buf)
192 eq_(self.addr, body.addr)
193 eq_(self.distance, body.distance)
194 eq_(self.metric, body.metric)
195 eq_(self.nexthop_num, len(body.nexthops))
197 buf = body.serialize()
199 eq_(binary_str(self.buf), binary_str(buf))
202 class TestZebraIPv4NexthopLookupMRibFromZebra(unittest.TestCase):
204 b'\xc0\xa8\x01\x01' # addr
206 b'\x00\x00\x00\x14' # metric
207 b'\x01' # nexthop_num
208 b'\x01' # nexthop_type
209 b'\x00\x00\x00\x02' # ifindex
215 nexthop_type = zebra.ZEBRA_NEXTHOP_IFINDEX
218 def test_parser(self):
219 body = zebra.ZebraIPv4NexthopLookupMRib.parse(self.buf)
221 eq_(self.addr, body.addr)
222 eq_(self.distance, body.distance)
223 eq_(self.metric, body.metric)
224 eq_(self.nexthop_num, len(body.nexthops))
225 eq_(self.nexthop_type, body.nexthops[0].type)
226 eq_(self.ifindex, body.nexthops[0].ifindex)
228 buf = body.serialize()
230 eq_(binary_str(self.buf), binary_str(buf))
233 class TestZebraNexthopUpdateIPv6(unittest.TestCase):
237 b'\x20\x01\x0d\xb8' # prefix
239 b'\x00\x00\x00\x14' # metric
240 b'\x01' # nexthop_num
241 b'\x01' # nexthop_type
242 b'\x00\x00\x00\x02' # ifindex
244 family = socket.AF_INET6
245 prefix = '2001:db8::/64'
248 nexthop_type = zebra.ZEBRA_NEXTHOP_IFINDEX
252 def test_parser(self):
253 body = zebra.ZebraNexthopUpdate.parse(self.buf)
255 eq_(self.family, body.family)
256 eq_(self.prefix, body.prefix)
257 eq_(self.metric, body.metric)
258 eq_(self.nexthop_num, len(body.nexthops))
259 eq_(self.nexthop_type, body.nexthops[0].type)
260 eq_(self.ifindex, body.nexthops[0].ifindex)
262 buf = body.serialize()
264 eq_(binary_str(self.buf), binary_str(buf))
267 class TestZebraInterfaceNbrAddressAdd(unittest.TestCase):
269 b'\x00\x00\x00\x01' # ifindex
271 b'\xc0\xa8\x01\x00' # prefix
275 family = socket.AF_INET
276 prefix = '192.168.1.0/24'
279 def test_parser(self):
280 body = zebra.ZebraInterfaceNbrAddressAdd.parse(self.buf)
282 eq_(self.ifindex, body.ifindex)
283 eq_(self.family, body.family)
284 eq_(self.prefix, body.prefix)
286 buf = body.serialize()
288 eq_(binary_str(self.buf), binary_str(buf))
291 class TestZebraInterfaceBfdDestinationUpdate(unittest.TestCase):
293 b'\x00\x00\x00\x01' # ifindex
295 b'\xc0\xa8\x01\x01' # dst_prefix
296 b'\x18' # dst_prefix_len
299 b'\xc0\xa8\x01\x02' # src_prefix
300 b'\x18' # src_prefix_len
303 dst_family = socket.AF_INET
304 dst_prefix = '192.168.1.1/24'
305 status = zebra.BFD_STATUS_UP
306 src_family = socket.AF_INET
307 src_prefix = '192.168.1.2/24'
310 def test_parser(self):
311 body = zebra.ZebraInterfaceBfdDestinationUpdate.parse(self.buf)
313 eq_(self.ifindex, body.ifindex)
314 eq_(self.dst_family, body.dst_family)
315 eq_(self.dst_prefix, body.dst_prefix)
316 eq_(self.status, body.status)
317 eq_(self.src_family, body.src_family)
318 eq_(self.src_prefix, body.src_prefix)
320 buf = body.serialize()
322 eq_(binary_str(self.buf), binary_str(buf))
325 class TestZebraBfdDestinationRegisterMultiHopEnabled(unittest.TestCase):
327 b'\x00\x00\x00\x01' # pid
328 b'\x00\x02' # dst_family
329 b'\xc0\xa8\x01\x01' # dst_prefix
330 b'\x00\x00\x00\x10' # min_rx_timer
331 b'\x00\x00\x00\x20' # min_tx_timer
332 b'\x01' # detect_mult
334 b'\x00\x02' # src_family
335 b'\xc0\xa8\x01\x02' # src_prefix
336 b'\x05' # multi_hop_count
339 dst_family = socket.AF_INET
340 dst_prefix = '192.168.1.1'
345 src_family = socket.AF_INET
346 src_prefix = '192.168.1.2'
351 def test_parser(self):
352 body = zebra.ZebraBfdDestinationRegister.parse(self.buf)
354 eq_(self.pid, body.pid)
355 eq_(self.dst_family, body.dst_family)
356 eq_(self.dst_prefix, body.dst_prefix)
357 eq_(self.min_rx_timer, body.min_rx_timer)
358 eq_(self.min_tx_timer, body.min_tx_timer)
359 eq_(self.detect_mult, body.detect_mult)
360 eq_(self.multi_hop, body.multi_hop)
361 eq_(self.src_family, body.src_family)
362 eq_(self.src_prefix, body.src_prefix)
363 eq_(self.multi_hop_count, body.multi_hop_count)
364 eq_(self.ifname, body.ifname)
366 buf = body.serialize()
368 eq_(binary_str(self.buf), binary_str(buf))
371 class TestZebraBfdDestinationRegisterMultiHopDisabled(unittest.TestCase):
373 b'\x00\x00\x00\x01' # pid
374 b'\x00\x02' # dst_family
375 b'\xc0\xa8\x01\x01' # dst_prefix
376 b'\x00\x00\x00\x10' # min_rx_timer
377 b'\x00\x00\x00\x20' # min_tx_timer
378 b'\x01' # detect_mult
380 b'\x00\x02' # src_family
381 b'\xc0\xa8\x01\x02' # src_prefix
386 dst_family = socket.AF_INET
387 dst_prefix = '192.168.1.1'
392 src_family = socket.AF_INET
393 src_prefix = '192.168.1.2'
394 multi_hop_count = None
398 def test_parser(self):
399 body = zebra.ZebraBfdDestinationRegister.parse(self.buf)
401 eq_(self.pid, body.pid)
402 eq_(self.dst_family, body.dst_family)
403 eq_(self.dst_prefix, body.dst_prefix)
404 eq_(self.min_rx_timer, body.min_rx_timer)
405 eq_(self.min_tx_timer, body.min_tx_timer)
406 eq_(self.detect_mult, body.detect_mult)
407 eq_(self.multi_hop, body.multi_hop)
408 eq_(self.src_family, body.src_family)
409 eq_(self.src_prefix, body.src_prefix)
410 eq_(self.multi_hop_count, body.multi_hop_count)
411 eq_(self.ifname, body.ifname)
413 buf = body.serialize()
415 eq_(binary_str(self.buf), binary_str(buf))
418 class TestZebraBfdDestinationRegisterMultiHopEnabledIPv6(unittest.TestCase):
420 b'\x00\x00\x00\x01' # pid
421 b'\x00\x0a' # dst_family
422 b'\x20\x01\x0d\xb8' # dst_prefix
426 b'\x00\x00\x00\x10' # min_rx_timer
427 b'\x00\x00\x00\x20' # min_tx_timer
428 b'\x01' # detect_mult
430 b'\x00\x0a' # src_family
431 b'\x20\x01\x0d\xb8' # src_prefix
435 b'\x05' # multi_hop_count
438 dst_family = socket.AF_INET6
439 dst_prefix = '2001:db8::1'
444 src_family = socket.AF_INET6
445 src_prefix = '2001:db8::2'
450 def test_parser(self):
451 body = zebra.ZebraBfdDestinationRegister.parse(self.buf)
453 eq_(self.pid, body.pid)
454 eq_(self.dst_family, body.dst_family)
455 eq_(self.dst_prefix, body.dst_prefix)
456 eq_(self.min_rx_timer, body.min_rx_timer)
457 eq_(self.min_tx_timer, body.min_tx_timer)
458 eq_(self.detect_mult, body.detect_mult)
459 eq_(self.multi_hop, body.multi_hop)
460 eq_(self.src_family, body.src_family)
461 eq_(self.src_prefix, body.src_prefix)
462 eq_(self.multi_hop_count, body.multi_hop_count)
463 eq_(self.ifname, body.ifname)
465 buf = body.serialize()
467 eq_(binary_str(self.buf), binary_str(buf))
470 class TestZebraBfdDestinationDeregisterMultiHopEnabled(unittest.TestCase):
472 b'\x00\x00\x00\x01' # pid
473 b'\x00\x02' # dst_family
474 b'\xc0\xa8\x01\x01' # dst_prefix
476 b'\x00\x02' # src_family
477 b'\xc0\xa8\x01\x02' # src_prefix
478 b'\x05' # multi_hop_count
481 dst_family = socket.AF_INET
482 dst_prefix = '192.168.1.1'
484 src_family = socket.AF_INET
485 src_prefix = '192.168.1.2'
490 def test_parser(self):
491 body = zebra.ZebraBfdDestinationDeregister.parse(self.buf)
493 eq_(self.pid, body.pid)
494 eq_(self.dst_family, body.dst_family)
495 eq_(self.dst_prefix, body.dst_prefix)
496 eq_(self.multi_hop, body.multi_hop)
497 eq_(self.src_family, body.src_family)
498 eq_(self.src_prefix, body.src_prefix)
499 eq_(self.multi_hop_count, body.multi_hop_count)
500 eq_(self.ifname, body.ifname)
502 buf = body.serialize()
504 eq_(binary_str(self.buf), binary_str(buf))
507 class TestZebraBfdDestinationDeregisterMultiHopDisabled(unittest.TestCase):
509 b'\x00\x00\x00\x01' # pid
510 b'\x00\x02' # dst_family
511 b'\xc0\xa8\x01\x01' # dst_prefix
513 b'\x00\x02' # src_family
514 b'\xc0\xa8\x01\x02' # src_prefix
519 dst_family = socket.AF_INET
520 dst_prefix = '192.168.1.1'
522 src_family = socket.AF_INET
523 src_prefix = '192.168.1.2'
524 multi_hop_count = None
528 def test_parser(self):
529 body = zebra.ZebraBfdDestinationDeregister.parse(self.buf)
531 eq_(self.pid, body.pid)
532 eq_(self.dst_family, body.dst_family)
533 eq_(self.dst_prefix, body.dst_prefix)
534 eq_(self.multi_hop, body.multi_hop)
535 eq_(self.src_family, body.src_family)
536 eq_(self.src_prefix, body.src_prefix)
537 eq_(self.multi_hop_count, body.multi_hop_count)
538 eq_(self.ifname, body.ifname)
540 buf = body.serialize()
542 eq_(binary_str(self.buf), binary_str(buf))
545 class TestZebraBfdDestinationDeregisterMultiHopEnabledIPv6(unittest.TestCase):
547 b'\x00\x00\x00\x01' # pid
548 b'\x00\x0a' # dst_family
549 b'\x20\x01\x0d\xb8' # dst_prefix
554 b'\x00\x0a' # src_family
555 b'\x20\x01\x0d\xb8' # src_prefix
559 b'\x05' # multi_hop_count
562 dst_family = socket.AF_INET6
563 dst_prefix = '2001:db8::1'
565 src_family = socket.AF_INET6
566 src_prefix = '2001:db8::2'
571 def test_parser(self):
572 body = zebra.ZebraBfdDestinationDeregister.parse(self.buf)
574 eq_(self.pid, body.pid)
575 eq_(self.dst_family, body.dst_family)
576 eq_(self.dst_prefix, body.dst_prefix)
577 eq_(self.multi_hop, body.multi_hop)
578 eq_(self.src_family, body.src_family)
579 eq_(self.src_prefix, body.src_prefix)
580 eq_(self.multi_hop_count, body.multi_hop_count)
581 eq_(self.ifname, body.ifname)
583 buf = body.serialize()
585 eq_(binary_str(self.buf), binary_str(buf))
588 class TestZebraVrfAdd(unittest.TestCase):
603 def test_parser(self):
604 body = zebra.ZebraVrfAdd.parse(self.buf)
606 eq_(self.vrf_name, body.vrf_name)
608 buf = body.serialize()
610 eq_(binary_str(self.buf), binary_str(buf))
613 class TestZebraInterfaceVrfUpdate(unittest.TestCase):
615 b'\x00\x00\x00\x01' # ifindex
622 def test_parser(self):
623 body = zebra.ZebraInterfaceVrfUpdate.parse(self.buf)
625 eq_(self.ifindex, body.ifindex)
626 eq_(self.vrf_id, body.vrf_id)
628 buf = body.serialize()
630 eq_(binary_str(self.buf), binary_str(buf))
633 class TestZebraInterfaceEnableRadv(unittest.TestCase):
635 b'\x00\x00\x00\x01' # ifindex
636 b'\x00\x00\x01\x00' # interval
642 def test_parser(self):
643 body = zebra.ZebraInterfaceEnableRadv.parse(self.buf)
645 eq_(self.ifindex, body.ifindex)
646 eq_(self.interval, body.interval)
648 buf = body.serialize()
650 eq_(binary_str(self.buf), binary_str(buf))
653 class TestZebraMplsLabelsAddIPv4(unittest.TestCase):
656 b'\x00\x00\x00\x02' # family
657 b'\xc0\xa8\x01\x00' # prefix
659 b'\xc0\xa8\x01\x01' # gate_addr
661 b'\x00\x00\x00\x64' # in_label
662 b'\x00\x00\x00\x03' # out_label
664 route_type = zebra.ZEBRA_ROUTE_BGP
665 family = socket.AF_INET
666 prefix = '192.168.1.0/24'
667 gate_addr = '192.168.1.1'
670 out_label = zebra.MPLS_IMP_NULL_LABEL
673 def test_parser(self):
674 body = zebra.ZebraMplsLabelsAdd.parse(self.buf)
676 eq_(self.route_type, body.route_type)
677 eq_(self.family, body.family)
678 eq_(self.prefix, body.prefix)
679 eq_(self.gate_addr, body.gate_addr)
680 eq_(self.distance, body.distance)
681 eq_(self.in_label, body.in_label)
682 eq_(self.out_label, body.out_label)
684 buf = body.serialize()
686 eq_(binary_str(self.buf), binary_str(buf))
689 class TestZebraMplsLabelsAddIPv6(unittest.TestCase):
692 b'\x00\x00\x00\x0a' # family
693 b'\x20\x01\x0d\xb8' # prefix
698 b'\x20\x01\x0d\xb8' # gate_addr
703 b'\x00\x00\x00\x64' # in_label
704 b'\x00\x00\x00\x03' # out_label
706 route_type = zebra.ZEBRA_ROUTE_BGP
707 family = socket.AF_INET6
708 prefix = '2001:db8::/64'
709 gate_addr = '2001:db8::1'
712 out_label = zebra.MPLS_IMP_NULL_LABEL
715 def test_parser(self):
716 body = zebra.ZebraMplsLabelsAdd.parse(self.buf)
718 eq_(self.route_type, body.route_type)
719 eq_(self.family, body.family)
720 eq_(self.prefix, body.prefix)
721 eq_(self.gate_addr, body.gate_addr)
722 eq_(self.distance, body.distance)
723 eq_(self.in_label, body.in_label)
724 eq_(self.out_label, body.out_label)
726 buf = body.serialize()
728 eq_(binary_str(self.buf), binary_str(buf))