backing up
[vsorcdistro/.git] / ryu / build / lib.linux-armv7l-2.7 / ryu / tests / unit / packet / test_zebra.py
1 # Copyright (C) 2017 Nippon Telegraph and Telephone Corporation.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #    http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 from __future__ import print_function
17
18 try:
19     import mock  # Python 2
20 except ImportError:
21     from unittest import mock  # Python 3
22
23 import os
24 import socket
25 import sys
26 import unittest
27
28 from nose.tools import eq_
29 from nose.tools import ok_
30 from nose.tools import raises
31 import six
32
33 from ryu.lib import pcaplib
34 from ryu.lib.packet import packet
35 from ryu.lib.packet import zebra
36 from ryu.utils import binary_str
37
38
39 PCAP_DATA_DIR = os.path.join(
40     os.path.dirname(sys.modules[__name__].__file__),
41     '../../packet_data/pcap/')
42
43
44 _patch_frr_v2 = mock.patch(
45     'ryu.lib.packet.zebra._is_frr_version_ge',
46     mock.MagicMock(side_effect=lambda x: x == zebra._FRR_VERSION_2_0))
47
48
49 class Test_zebra(unittest.TestCase):
50     """
51     Test case for ryu.lib.packet.zebra.
52     """
53
54     @staticmethod
55     def _test_pcap_single(f):
56         zebra_pcap_file = os.path.join(PCAP_DATA_DIR, f + '.pcap')
57         # print('*** testing %s' % zebra_pcap_file)
58
59         for _, buf in pcaplib.Reader(open(zebra_pcap_file, 'rb')):
60             # Checks if Zebra message can be parsed as expected.
61             pkt = packet.Packet(buf)
62             zebra_pkts = pkt.get_protocols(zebra.ZebraMessage)
63             for zebra_pkt in zebra_pkts:
64                 ok_(isinstance(zebra_pkt, zebra.ZebraMessage),
65                     'Failed to parse Zebra message: %s' % pkt)
66             ok_(not isinstance(pkt.protocols[-1],
67                                (six.binary_type, bytearray)),
68                 'Some messages could not be parsed in %s: %s' % (f, pkt))
69
70             # Checks if Zebra message can be serialized as expected.
71             pkt.serialize()
72             eq_(binary_str(buf), binary_str(pkt.data))
73
74     def test_pcap_quagga(self):
75         files = [
76             'zebra_v2',
77             'zebra_v3',
78         ]
79
80         for f in files:
81             self._test_pcap_single(f)
82
83     @_patch_frr_v2
84     def test_pcap_frr_v2(self):
85         files = [
86             'zebra_v4_frr_v2',  # API version 4 on FRRouting v2.0
87         ]
88
89         for f in files:
90             self._test_pcap_single(f)
91
92
93 class TestZebraMessage(unittest.TestCase):
94
95     def test_get_header_size(self):
96         eq_(zebra.ZebraMessage.V0_HEADER_SIZE,
97             zebra.ZebraMessage.get_header_size(0))
98         eq_(zebra.ZebraMessage.V1_HEADER_SIZE,
99             zebra.ZebraMessage.get_header_size(2))
100         eq_(zebra.ZebraMessage.V3_HEADER_SIZE,
101             zebra.ZebraMessage.get_header_size(3))
102         eq_(zebra.ZebraMessage.V3_HEADER_SIZE,
103             zebra.ZebraMessage.get_header_size(4))
104
105     @raises(ValueError)
106     def test_get_header_size_invalid_version(self):
107         eq_(zebra.ZebraMessage.V0_HEADER_SIZE,
108             zebra.ZebraMessage.get_header_size(0xff))
109
110
111 class TestZebraRedistributeAdd(unittest.TestCase):
112     buf = (
113         b'\x02'  # route_type
114     )
115     route_type = zebra.ZEBRA_ROUTE_CONNECT
116
117     def test_parser(self):
118         body = zebra.ZebraRedistributeAdd.parse(self.buf, version=3)
119
120         eq_(self.route_type, body.route_type)
121
122         buf = body.serialize(version=3)
123
124         eq_(binary_str(self.buf), binary_str(buf))
125
126
127 class TestZebraIPv4ImportLookup(unittest.TestCase):
128     buf = (
129         b'\x18'
130         b'\xc0\xa8\x01\x01'  # prefix
131     )
132     prefix = '192.168.1.1/24'
133     metric = None
134     nexthop_num = 0
135     from_zebra = False
136
137     def test_parser(self):
138         body = zebra.ZebraIPv4ImportLookup.parse(self.buf)
139
140         eq_(self.prefix, body.prefix)
141         eq_(self.metric, body.metric)
142         eq_(self.nexthop_num, len(body.nexthops))
143         eq_(self.from_zebra, body.from_zebra)
144
145         buf = body.serialize()
146
147         eq_(binary_str(self.buf), binary_str(buf))
148
149
150 class TestZebraIPv4ImportLookupFromZebra(unittest.TestCase):
151     buf = (
152         b'\xc0\xa8\x01\x01'  # prefix
153         b'\x00\x00\x00\x14'  # metric
154         b'\x01'              # nexthop_num
155         b'\x01'              # nexthop_type
156         b'\x00\x00\x00\x02'  # ifindex
157     )
158     prefix = '192.168.1.1'
159     metric = 0x14
160     nexthop_num = 1
161     nexthop_type = zebra.ZEBRA_NEXTHOP_IFINDEX
162     ifindex = 2
163     from_zebra = True
164
165     def test_parser(self):
166         body = zebra.ZebraIPv4ImportLookup.parse_from_zebra(self.buf)
167
168         eq_(self.prefix, body.prefix)
169         eq_(self.metric, body.metric)
170         eq_(self.nexthop_num, len(body.nexthops))
171         eq_(self.nexthop_type, body.nexthops[0].type)
172         eq_(self.ifindex, body.nexthops[0].ifindex)
173         eq_(self.from_zebra, body.from_zebra)
174
175         buf = body.serialize()
176
177         eq_(binary_str(self.buf), binary_str(buf))
178
179
180 class TestZebraIPv4NexthopLookupMRib(unittest.TestCase):
181     buf = (
182         b'\xc0\xa8\x01\x01'  # addr
183     )
184     addr = '192.168.1.1'
185     distance = None
186     metric = None
187     nexthop_num = 0
188
189     def test_parser(self):
190         body = zebra.ZebraIPv4NexthopLookupMRib.parse(self.buf)
191
192         eq_(self.addr, body.addr)
193         eq_(self.distance, body.distance)
194         eq_(self.metric, body.metric)
195         eq_(self.nexthop_num, len(body.nexthops))
196
197         buf = body.serialize()
198
199         eq_(binary_str(self.buf), binary_str(buf))
200
201
202 class TestZebraIPv4NexthopLookupMRibFromZebra(unittest.TestCase):
203     buf = (
204         b'\xc0\xa8\x01\x01'  # addr
205         b'\x01'              # distance
206         b'\x00\x00\x00\x14'  # metric
207         b'\x01'              # nexthop_num
208         b'\x01'              # nexthop_type
209         b'\x00\x00\x00\x02'  # ifindex
210     )
211     addr = '192.168.1.1'
212     distance = 1
213     metric = 0x14
214     nexthop_num = 1
215     nexthop_type = zebra.ZEBRA_NEXTHOP_IFINDEX
216     ifindex = 2
217
218     def test_parser(self):
219         body = zebra.ZebraIPv4NexthopLookupMRib.parse(self.buf)
220
221         eq_(self.addr, body.addr)
222         eq_(self.distance, body.distance)
223         eq_(self.metric, body.metric)
224         eq_(self.nexthop_num, len(body.nexthops))
225         eq_(self.nexthop_type, body.nexthops[0].type)
226         eq_(self.ifindex, body.nexthops[0].ifindex)
227
228         buf = body.serialize()
229
230         eq_(binary_str(self.buf), binary_str(buf))
231
232
233 class TestZebraNexthopUpdateIPv6(unittest.TestCase):
234     buf = (
235         b'\x00\x0a'          # family
236         b'\x40'              # prefix_len
237         b'\x20\x01\x0d\xb8'  # prefix
238         b'\x00\x00\x00\x00'
239         b'\x00\x00\x00\x14'  # metric
240         b'\x01'              # nexthop_num
241         b'\x01'              # nexthop_type
242         b'\x00\x00\x00\x02'  # ifindex
243     )
244     family = socket.AF_INET6
245     prefix = '2001:db8::/64'
246     metric = 0x14
247     nexthop_num = 1
248     nexthop_type = zebra.ZEBRA_NEXTHOP_IFINDEX
249     ifindex = 2
250
251     @_patch_frr_v2
252     def test_parser(self):
253         body = zebra.ZebraNexthopUpdate.parse(self.buf)
254
255         eq_(self.family, body.family)
256         eq_(self.prefix, body.prefix)
257         eq_(self.metric, body.metric)
258         eq_(self.nexthop_num, len(body.nexthops))
259         eq_(self.nexthop_type, body.nexthops[0].type)
260         eq_(self.ifindex, body.nexthops[0].ifindex)
261
262         buf = body.serialize()
263
264         eq_(binary_str(self.buf), binary_str(buf))
265
266
267 class TestZebraInterfaceNbrAddressAdd(unittest.TestCase):
268     buf = (
269         b'\x00\x00\x00\x01'  # ifindex
270         b'\x02'              # family
271         b'\xc0\xa8\x01\x00'  # prefix
272         b'\x18'              # prefix_len
273     )
274     ifindex = 1
275     family = socket.AF_INET
276     prefix = '192.168.1.0/24'
277
278     @_patch_frr_v2
279     def test_parser(self):
280         body = zebra.ZebraInterfaceNbrAddressAdd.parse(self.buf)
281
282         eq_(self.ifindex, body.ifindex)
283         eq_(self.family, body.family)
284         eq_(self.prefix, body.prefix)
285
286         buf = body.serialize()
287
288         eq_(binary_str(self.buf), binary_str(buf))
289
290
291 class TestZebraInterfaceBfdDestinationUpdate(unittest.TestCase):
292     buf = (
293         b'\x00\x00\x00\x01'  # ifindex
294         b'\x02'              # dst_family
295         b'\xc0\xa8\x01\x01'  # dst_prefix
296         b'\x18'              # dst_prefix_len
297         b'\x04'              # status
298         b'\x02'              # src_family
299         b'\xc0\xa8\x01\x02'  # src_prefix
300         b'\x18'              # src_prefix_len
301     )
302     ifindex = 1
303     dst_family = socket.AF_INET
304     dst_prefix = '192.168.1.1/24'
305     status = zebra.BFD_STATUS_UP
306     src_family = socket.AF_INET
307     src_prefix = '192.168.1.2/24'
308
309     @_patch_frr_v2
310     def test_parser(self):
311         body = zebra.ZebraInterfaceBfdDestinationUpdate.parse(self.buf)
312
313         eq_(self.ifindex, body.ifindex)
314         eq_(self.dst_family, body.dst_family)
315         eq_(self.dst_prefix, body.dst_prefix)
316         eq_(self.status, body.status)
317         eq_(self.src_family, body.src_family)
318         eq_(self.src_prefix, body.src_prefix)
319
320         buf = body.serialize()
321
322         eq_(binary_str(self.buf), binary_str(buf))
323
324
325 class TestZebraBfdDestinationRegisterMultiHopEnabled(unittest.TestCase):
326     buf = (
327         b'\x00\x00\x00\x01'  # pid
328         b'\x00\x02'          # dst_family
329         b'\xc0\xa8\x01\x01'  # dst_prefix
330         b'\x00\x00\x00\x10'  # min_rx_timer
331         b'\x00\x00\x00\x20'  # min_tx_timer
332         b'\x01'              # detect_mult
333         b'\x01'              # multi_hop
334         b'\x00\x02'          # src_family
335         b'\xc0\xa8\x01\x02'  # src_prefix
336         b'\x05'              # multi_hop_count
337     )
338     pid = 1
339     dst_family = socket.AF_INET
340     dst_prefix = '192.168.1.1'
341     min_rx_timer = 0x10
342     min_tx_timer = 0x20
343     detect_mult = 1
344     multi_hop = 1
345     src_family = socket.AF_INET
346     src_prefix = '192.168.1.2'
347     multi_hop_count = 5
348     ifname = None
349
350     @_patch_frr_v2
351     def test_parser(self):
352         body = zebra.ZebraBfdDestinationRegister.parse(self.buf)
353
354         eq_(self.pid, body.pid)
355         eq_(self.dst_family, body.dst_family)
356         eq_(self.dst_prefix, body.dst_prefix)
357         eq_(self.min_rx_timer, body.min_rx_timer)
358         eq_(self.min_tx_timer, body.min_tx_timer)
359         eq_(self.detect_mult, body.detect_mult)
360         eq_(self.multi_hop, body.multi_hop)
361         eq_(self.src_family, body.src_family)
362         eq_(self.src_prefix, body.src_prefix)
363         eq_(self.multi_hop_count, body.multi_hop_count)
364         eq_(self.ifname, body.ifname)
365
366         buf = body.serialize()
367
368         eq_(binary_str(self.buf), binary_str(buf))
369
370
371 class TestZebraBfdDestinationRegisterMultiHopDisabled(unittest.TestCase):
372     buf = (
373         b'\x00\x00\x00\x01'  # pid
374         b'\x00\x02'          # dst_family
375         b'\xc0\xa8\x01\x01'  # dst_prefix
376         b'\x00\x00\x00\x10'  # min_rx_timer
377         b'\x00\x00\x00\x20'  # min_tx_timer
378         b'\x01'              # detect_mult
379         b'\x00'              # multi_hop
380         b'\x00\x02'          # src_family
381         b'\xc0\xa8\x01\x02'  # src_prefix
382         b'\x04'              # ifname_len
383         b'eth0'              # ifname
384     )
385     pid = 1
386     dst_family = socket.AF_INET
387     dst_prefix = '192.168.1.1'
388     min_rx_timer = 0x10
389     min_tx_timer = 0x20
390     detect_mult = 1
391     multi_hop = 0
392     src_family = socket.AF_INET
393     src_prefix = '192.168.1.2'
394     multi_hop_count = None
395     ifname = 'eth0'
396
397     @_patch_frr_v2
398     def test_parser(self):
399         body = zebra.ZebraBfdDestinationRegister.parse(self.buf)
400
401         eq_(self.pid, body.pid)
402         eq_(self.dst_family, body.dst_family)
403         eq_(self.dst_prefix, body.dst_prefix)
404         eq_(self.min_rx_timer, body.min_rx_timer)
405         eq_(self.min_tx_timer, body.min_tx_timer)
406         eq_(self.detect_mult, body.detect_mult)
407         eq_(self.multi_hop, body.multi_hop)
408         eq_(self.src_family, body.src_family)
409         eq_(self.src_prefix, body.src_prefix)
410         eq_(self.multi_hop_count, body.multi_hop_count)
411         eq_(self.ifname, body.ifname)
412
413         buf = body.serialize()
414
415         eq_(binary_str(self.buf), binary_str(buf))
416
417
418 class TestZebraBfdDestinationRegisterMultiHopEnabledIPv6(unittest.TestCase):
419     buf = (
420         b'\x00\x00\x00\x01'  # pid
421         b'\x00\x0a'          # dst_family
422         b'\x20\x01\x0d\xb8'  # dst_prefix
423         b'\x00\x00\x00\x00'
424         b'\x00\x00\x00\x00'
425         b'\x00\x00\x00\x01'
426         b'\x00\x00\x00\x10'  # min_rx_timer
427         b'\x00\x00\x00\x20'  # min_tx_timer
428         b'\x01'              # detect_mult
429         b'\x01'              # multi_hop
430         b'\x00\x0a'          # src_family
431         b'\x20\x01\x0d\xb8'  # src_prefix
432         b'\x00\x00\x00\x00'
433         b'\x00\x00\x00\x00'
434         b'\x00\x00\x00\x02'
435         b'\x05'              # multi_hop_count
436     )
437     pid = 1
438     dst_family = socket.AF_INET6
439     dst_prefix = '2001:db8::1'
440     min_rx_timer = 0x10
441     min_tx_timer = 0x20
442     detect_mult = 1
443     multi_hop = 1
444     src_family = socket.AF_INET6
445     src_prefix = '2001:db8::2'
446     multi_hop_count = 5
447     ifname = None
448
449     @_patch_frr_v2
450     def test_parser(self):
451         body = zebra.ZebraBfdDestinationRegister.parse(self.buf)
452
453         eq_(self.pid, body.pid)
454         eq_(self.dst_family, body.dst_family)
455         eq_(self.dst_prefix, body.dst_prefix)
456         eq_(self.min_rx_timer, body.min_rx_timer)
457         eq_(self.min_tx_timer, body.min_tx_timer)
458         eq_(self.detect_mult, body.detect_mult)
459         eq_(self.multi_hop, body.multi_hop)
460         eq_(self.src_family, body.src_family)
461         eq_(self.src_prefix, body.src_prefix)
462         eq_(self.multi_hop_count, body.multi_hop_count)
463         eq_(self.ifname, body.ifname)
464
465         buf = body.serialize()
466
467         eq_(binary_str(self.buf), binary_str(buf))
468
469
470 class TestZebraBfdDestinationDeregisterMultiHopEnabled(unittest.TestCase):
471     buf = (
472         b'\x00\x00\x00\x01'  # pid
473         b'\x00\x02'          # dst_family
474         b'\xc0\xa8\x01\x01'  # dst_prefix
475         b'\x01'              # multi_hop
476         b'\x00\x02'          # src_family
477         b'\xc0\xa8\x01\x02'  # src_prefix
478         b'\x05'              # multi_hop_count
479     )
480     pid = 1
481     dst_family = socket.AF_INET
482     dst_prefix = '192.168.1.1'
483     multi_hop = 1
484     src_family = socket.AF_INET
485     src_prefix = '192.168.1.2'
486     multi_hop_count = 5
487     ifname = None
488
489     @_patch_frr_v2
490     def test_parser(self):
491         body = zebra.ZebraBfdDestinationDeregister.parse(self.buf)
492
493         eq_(self.pid, body.pid)
494         eq_(self.dst_family, body.dst_family)
495         eq_(self.dst_prefix, body.dst_prefix)
496         eq_(self.multi_hop, body.multi_hop)
497         eq_(self.src_family, body.src_family)
498         eq_(self.src_prefix, body.src_prefix)
499         eq_(self.multi_hop_count, body.multi_hop_count)
500         eq_(self.ifname, body.ifname)
501
502         buf = body.serialize()
503
504         eq_(binary_str(self.buf), binary_str(buf))
505
506
507 class TestZebraBfdDestinationDeregisterMultiHopDisabled(unittest.TestCase):
508     buf = (
509         b'\x00\x00\x00\x01'  # pid
510         b'\x00\x02'          # dst_family
511         b'\xc0\xa8\x01\x01'  # dst_prefix
512         b'\x00'              # multi_hop
513         b'\x00\x02'          # src_family
514         b'\xc0\xa8\x01\x02'  # src_prefix
515         b'\x04'              # ifname_len
516         b'eth0'              # ifname
517     )
518     pid = 1
519     dst_family = socket.AF_INET
520     dst_prefix = '192.168.1.1'
521     multi_hop = 0
522     src_family = socket.AF_INET
523     src_prefix = '192.168.1.2'
524     multi_hop_count = None
525     ifname = 'eth0'
526
527     @_patch_frr_v2
528     def test_parser(self):
529         body = zebra.ZebraBfdDestinationDeregister.parse(self.buf)
530
531         eq_(self.pid, body.pid)
532         eq_(self.dst_family, body.dst_family)
533         eq_(self.dst_prefix, body.dst_prefix)
534         eq_(self.multi_hop, body.multi_hop)
535         eq_(self.src_family, body.src_family)
536         eq_(self.src_prefix, body.src_prefix)
537         eq_(self.multi_hop_count, body.multi_hop_count)
538         eq_(self.ifname, body.ifname)
539
540         buf = body.serialize()
541
542         eq_(binary_str(self.buf), binary_str(buf))
543
544
545 class TestZebraBfdDestinationDeregisterMultiHopEnabledIPv6(unittest.TestCase):
546     buf = (
547         b'\x00\x00\x00\x01'  # pid
548         b'\x00\x0a'          # dst_family
549         b'\x20\x01\x0d\xb8'  # dst_prefix
550         b'\x00\x00\x00\x00'
551         b'\x00\x00\x00\x00'
552         b'\x00\x00\x00\x01'
553         b'\x01'              # multi_hop
554         b'\x00\x0a'          # src_family
555         b'\x20\x01\x0d\xb8'  # src_prefix
556         b'\x00\x00\x00\x00'
557         b'\x00\x00\x00\x00'
558         b'\x00\x00\x00\x02'
559         b'\x05'              # multi_hop_count
560     )
561     pid = 1
562     dst_family = socket.AF_INET6
563     dst_prefix = '2001:db8::1'
564     multi_hop = 1
565     src_family = socket.AF_INET6
566     src_prefix = '2001:db8::2'
567     multi_hop_count = 5
568     ifname = None
569
570     @_patch_frr_v2
571     def test_parser(self):
572         body = zebra.ZebraBfdDestinationDeregister.parse(self.buf)
573
574         eq_(self.pid, body.pid)
575         eq_(self.dst_family, body.dst_family)
576         eq_(self.dst_prefix, body.dst_prefix)
577         eq_(self.multi_hop, body.multi_hop)
578         eq_(self.src_family, body.src_family)
579         eq_(self.src_prefix, body.src_prefix)
580         eq_(self.multi_hop_count, body.multi_hop_count)
581         eq_(self.ifname, body.ifname)
582
583         buf = body.serialize()
584
585         eq_(binary_str(self.buf), binary_str(buf))
586
587
588 class TestZebraVrfAdd(unittest.TestCase):
589     buf = (
590         b'VRF1'              # vrf_name
591         b'\x00\x00\x00\x00'
592         b'\x00\x00\x00\x00'
593         b'\x00\x00\x00\x00'
594         b'\x00\x00\x00\x00'
595         b'\x00\x00\x00\x00'
596         b'\x00\x00\x00\x00'
597         b'\x00\x00\x00\x00'
598         b'\x00\x00\x00\x00'
599     )
600     vrf_name = 'VRF1'
601
602     @_patch_frr_v2
603     def test_parser(self):
604         body = zebra.ZebraVrfAdd.parse(self.buf)
605
606         eq_(self.vrf_name, body.vrf_name)
607
608         buf = body.serialize()
609
610         eq_(binary_str(self.buf), binary_str(buf))
611
612
613 class TestZebraInterfaceVrfUpdate(unittest.TestCase):
614     buf = (
615         b'\x00\x00\x00\x01'  # ifindex
616         b'\x00\x02'          # vrf_id
617     )
618     ifindex = 1
619     vrf_id = 2
620
621     @_patch_frr_v2
622     def test_parser(self):
623         body = zebra.ZebraInterfaceVrfUpdate.parse(self.buf)
624
625         eq_(self.ifindex, body.ifindex)
626         eq_(self.vrf_id, body.vrf_id)
627
628         buf = body.serialize()
629
630         eq_(binary_str(self.buf), binary_str(buf))
631
632
633 class TestZebraInterfaceEnableRadv(unittest.TestCase):
634     buf = (
635         b'\x00\x00\x00\x01'  # ifindex
636         b'\x00\x00\x01\x00'  # interval
637     )
638     ifindex = 1
639     interval = 0x100
640
641     @_patch_frr_v2
642     def test_parser(self):
643         body = zebra.ZebraInterfaceEnableRadv.parse(self.buf)
644
645         eq_(self.ifindex, body.ifindex)
646         eq_(self.interval, body.interval)
647
648         buf = body.serialize()
649
650         eq_(binary_str(self.buf), binary_str(buf))
651
652
653 class TestZebraMplsLabelsAddIPv4(unittest.TestCase):
654     buf = (
655         b'\x09'              # route_type
656         b'\x00\x00\x00\x02'  # family
657         b'\xc0\xa8\x01\x00'  # prefix
658         b'\x18'              # prefix_len
659         b'\xc0\xa8\x01\x01'  # gate_addr
660         b'\x10'              # distance
661         b'\x00\x00\x00\x64'  # in_label
662         b'\x00\x00\x00\x03'  # out_label
663     )
664     route_type = zebra.ZEBRA_ROUTE_BGP
665     family = socket.AF_INET
666     prefix = '192.168.1.0/24'
667     gate_addr = '192.168.1.1'
668     distance = 0x10
669     in_label = 100
670     out_label = zebra.MPLS_IMP_NULL_LABEL
671
672     @_patch_frr_v2
673     def test_parser(self):
674         body = zebra.ZebraMplsLabelsAdd.parse(self.buf)
675
676         eq_(self.route_type, body.route_type)
677         eq_(self.family, body.family)
678         eq_(self.prefix, body.prefix)
679         eq_(self.gate_addr, body.gate_addr)
680         eq_(self.distance, body.distance)
681         eq_(self.in_label, body.in_label)
682         eq_(self.out_label, body.out_label)
683
684         buf = body.serialize()
685
686         eq_(binary_str(self.buf), binary_str(buf))
687
688
689 class TestZebraMplsLabelsAddIPv6(unittest.TestCase):
690     buf = (
691         b'\x09'              # route_type
692         b'\x00\x00\x00\x0a'  # family
693         b'\x20\x01\x0d\xb8'  # prefix
694         b'\x00\x00\x00\x00'
695         b'\x00\x00\x00\x00'
696         b'\x00\x00\x00\x00'
697         b'\x40'              # prefix_len
698         b'\x20\x01\x0d\xb8'  # gate_addr
699         b'\x00\x00\x00\x00'
700         b'\x00\x00\x00\x00'
701         b'\x00\x00\x00\x01'
702         b'\x10'              # distance
703         b'\x00\x00\x00\x64'  # in_label
704         b'\x00\x00\x00\x03'  # out_label
705     )
706     route_type = zebra.ZEBRA_ROUTE_BGP
707     family = socket.AF_INET6
708     prefix = '2001:db8::/64'
709     gate_addr = '2001:db8::1'
710     distance = 0x10
711     in_label = 100
712     out_label = zebra.MPLS_IMP_NULL_LABEL
713
714     @_patch_frr_v2
715     def test_parser(self):
716         body = zebra.ZebraMplsLabelsAdd.parse(self.buf)
717
718         eq_(self.route_type, body.route_type)
719         eq_(self.family, body.family)
720         eq_(self.prefix, body.prefix)
721         eq_(self.gate_addr, body.gate_addr)
722         eq_(self.distance, body.distance)
723         eq_(self.in_label, body.in_label)
724         eq_(self.out_label, body.out_label)
725
726         buf = body.serialize()
727
728         eq_(binary_str(self.buf), binary_str(buf))