1 # Copyright (C) 2016 Nippon Telegraph and Telephone Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 from __future__ import print_function
27 import mock # Python 2
29 from unittest import mock # Python 3
31 from nose.tools import eq_
32 from nose.tools import ok_
34 from ryu.lib import addrconv
35 from ryu.lib import mrtlib
36 from ryu.lib.packet import bgp
37 from ryu.lib.packet import ospf
38 from ryu.utils import binary_str
41 LOG = logging.getLogger(__name__)
43 MRT_DATA_DIR = os.path.join(
44 os.path.dirname(sys.modules[__name__].__file__), '../../packet_data/mrt/')
47 class TestMrtlib(unittest.TestCase):
49 Test case for ryu.lib.mrtlib.
52 def test_reader(self):
54 'rib.20161101.0000_pick.bz2',
55 'updates.20161101.0000.bz2',
59 # print('\n*** testing mrtlib.Reader with %s ...' % f)
61 input_file = os.path.join(MRT_DATA_DIR, f)
62 for record in mrtlib.Reader(bz2.BZ2File(input_file, 'rb')):
63 # print('* No.%d\n%s' % (counter, record))
64 ok_(not isinstance(record, mrtlib.UnknownMrtRecord))
67 def test_writer(self):
69 'rib.20161101.0000_pick.bz2',
70 'updates.20161101.0000.bz2',
74 # print('\n*** testing mrtlib.Writer with %s ...' % f)
75 input_file = os.path.join(MRT_DATA_DIR, f)
76 input_buf = bz2.BZ2File(input_file, 'rb').read()
77 input_records = list(mrtlib.Reader(bz2.BZ2File(input_file, 'rb')))
81 mrt_writer = mrtlib.Writer(f)
82 for record in input_records:
83 # print('* No.%d\n%s' % (counter, record))
84 mrt_writer.write(record)
87 output_buf = f.getvalue()
89 eq_(binary_str(input_buf), binary_str(output_buf))
93 eq_(True, mrt_writer._f.closed)
96 class TestMrtlibMrtRecord(unittest.TestCase):
98 Test case for ryu.lib.mrtlib.MrtRecord.
101 def test_init_without_type_subtype(self):
102 type_ = mrtlib.MrtRecord.TYPE_TABLE_DUMP
103 subtype = mrtlib.TableDumpMrtRecord.SUBTYPE_AFI_IPv4
105 message = mrtlib.TableDumpAfiIPv4MrtMessage(
108 prefix='192.168.1.0',
116 record = mrtlib.TableDumpMrtRecord(message)
118 eq_(type_, record.type)
119 eq_(subtype, record.subtype)
121 def test_parse_pre_with_type_et(self):
123 b'\x00\x00\x00\x00' # timestamp
124 b'\x00\x11\x00\x00' # type=TYPE_BGP4MP_ET(17), subtype
125 b'\x00\x00\x00\xaa' # length
128 required_len = mrtlib.MrtRecord.parse_pre(buf)
130 eq_(0xaa + mrtlib.ExtendedTimestampMrtRecord.HEADER_SIZE,
134 # Note: MrtCommonRecord is tested in TestMrtlibMrtRecord.
135 # class TestMrtlibMrtCommonRecord(unittest.TestCase):
138 class TestMrtlibExtendedTimestampMrtRecord(unittest.TestCase):
140 Test case for ryu.lib.mrtlib.ExtendedTimestampMrtRecord.
143 def test_parse_extended_header(self):
146 b'\x11\x11\x11\x11' # ms_timestamp
151 rest) = mrtlib.ExtendedTimestampMrtRecord.parse_extended_header(buf)
153 ok_(isinstance(headers, list))
155 eq_(0x11111111, headers[0])
158 def test_serialize(self):
159 body = b'test' # 4 bytes
161 b'\x11\x11\x11\x11' # timestamp
162 b'\x22\x22\x33\x33' # type, subtype
163 b'\x00\x00\x00\x04' # length=len(body)
164 b'\x44\x44\x44\x44' # ms_timestamp
168 message_mock = mock.MagicMock(spec=mrtlib.MrtMessage)
169 message_mock.serialize.return_value = body
171 record = mrtlib.ExtendedTimestampMrtRecord(
172 message=message_mock,
173 timestamp=0x11111111,
174 type_=0x2222, subtype=0x3333,
175 ms_timestamp=0x44444444,
179 output = record.serialize()
184 class TestMrtlibUnknownMrtRecord(unittest.TestCase):
186 Test case for ryu.lib.mrtlib.UnknownMrtRecord.
189 def test_parse(self):
190 body = b'test' # 4 bytes
192 b'\x11\x11\x11\x11' # timestamp
193 b'\x22\x22\x33\x33' # type, subtype
194 b'\x00\x00\x00\x04' # length=len(body)
198 (record, rest) = mrtlib.MrtRecord.parse(buf)
200 eq_(0x11111111, record.timestamp)
201 eq_(0x2222, record.type)
202 eq_(0x3333, record.subtype)
203 eq_(0x00000004, record.length)
204 eq_(body, record.message.buf)
207 def test_serialize(self):
208 body = b'test' # 4 bytes
210 b'\x11\x11\x11\x11' # timestamp
211 b'\x22\x22\x33\x33' # type, subtype
212 b'\x00\x00\x00\x04' # length=len(body)
216 message = mrtlib.UnknownMrtMessage(buf=body)
217 record = mrtlib.UnknownMrtRecord(
219 timestamp=0x11111111,
220 type_=0x2222, subtype=0x3333,
224 output = record.serialize()
229 class TestMrtlibOspf2MrtRecord(unittest.TestCase):
231 Test case for ryu.lib.mrtlib.Ospf2MrtRecord.
234 @mock.patch('ryu.lib.packet.ospf.ospf.parser')
235 def test_parse(self, mock_ospf_parser):
236 remote_ip = '10.0.0.1'
237 local_ip = '10.0.0.2'
238 body = b'test' # 4 bytes
240 b'\x11\x11\x11\x11' # timestamp
241 b'\x00\x0b\x00\x00' # type=TYPE_OSPFv2(11), subtype
242 b'\x00\x00\x00\x0c' # length=len(remote_ip + local_ip + body)
243 + addrconv.ipv4.text_to_bin(remote_ip) # remote_ip
244 + addrconv.ipv4.text_to_bin(local_ip) # local_ip
245 + body # ospf_message
248 mock_ospf_message = mock.MagicMock(spec=ospf.OSPFMessage)
249 mock_ospf_parser.return_value = (mock_ospf_message, None, '')
251 (record, rest) = mrtlib.MrtRecord.parse(buf)
253 eq_(0x11111111, record.timestamp)
254 eq_(mrtlib.MrtRecord.TYPE_OSPFv2, record.type)
255 eq_(0x0000, record.subtype)
256 eq_(0x0000000c, record.length)
257 eq_(remote_ip, record.message.remote_ip)
258 eq_(local_ip, record.message.local_ip)
259 eq_(mock_ospf_message, record.message.ospf_message)
262 def test_serialize(self):
263 remote_ip = '10.0.0.1'
264 local_ip = '10.0.0.2'
265 body = b'test' # 4 bytes
267 b'\x11\x11\x11\x11' # timestamp
268 b'\x00\x0b\x00\x00' # type=TYPE_OSPFv2(11), subtype
269 b'\x00\x00\x00\x0c' # length=len(remote_ip + local_ip + body)
270 + addrconv.ipv4.text_to_bin(remote_ip) # remote_ip
271 + addrconv.ipv4.text_to_bin(local_ip) # local_ip
272 + body # ospf_message
275 mock_ospf_message = mock.MagicMock(spec=ospf.OSPFMessage)
276 mock_ospf_message.serialize.return_value = body
278 message = mrtlib.Ospf2MrtMessage(
281 ospf_message=mock_ospf_message,
283 record = mrtlib.Ospf2MrtRecord(
285 timestamp=0x11111111,
291 output = record.serialize()
296 class TestMrtlibTableDumpMrtRecord(unittest.TestCase):
298 Test case for ryu.lib.mrtlib.TableDumpMrtRecord.
301 @mock.patch('ryu.lib.packet.bgp._PathAttribute.parser')
302 def test_parse_afi_ipv4(self, mock_bgp_attr_parser):
304 peer_ip = '172.16.0.1'
305 body = b'test' # 4 bytes
307 b'\x11\x11\x11\x11' # timestamp
308 b'\x00\x0c\x00\x01' # type=TYPE_TABLE_DUMP(12),
309 # subtype=SUBTYPE_AFI_IPv4(1)
310 b'\x00\x00\x00\x1a' # length=26
311 b'\x22\x22\x33\x33' # view_num, seq_num
312 + addrconv.ipv4.text_to_bin(prefix) + # prefix
313 b'\x18\x01' # prefix_len=24, status=1
314 b'\x44\x44\x44\x44' # originated_time
315 + addrconv.ipv4.text_to_bin(peer_ip) + # peer_ip
316 b'\xfd\xe8\x00\x04' # peer_as=65000, attr_len=len(body)
317 + body # bgp_attributes
320 mock_bgp_attr = mock.MagicMock(spec=bgp._PathAttribute)
321 mock_bgp_attr_parser.return_value = (mock_bgp_attr, b'')
323 (record, rest) = mrtlib.MrtRecord.parse(buf)
325 eq_(0x11111111, record.timestamp)
326 eq_(mrtlib.MrtRecord.TYPE_TABLE_DUMP, record.type)
327 eq_(mrtlib.TableDumpMrtRecord.SUBTYPE_AFI_IPv4, record.subtype)
328 eq_(0x0000001a, record.length)
329 eq_(0x2222, record.message.view_num)
330 eq_(0x3333, record.message.seq_num)
331 eq_(prefix, record.message.prefix)
332 eq_(24, record.message.prefix_len)
333 eq_(1, record.message.status)
334 eq_(0x44444444, record.message.originated_time)
335 eq_(peer_ip, record.message.peer_ip)
336 eq_(65000, record.message.peer_as)
337 eq_(0x0004, record.message.attr_len)
338 eq_([mock_bgp_attr], record.message.bgp_attributes)
341 def test_serialize_afi_ipv4(self):
343 peer_ip = '172.16.0.1'
344 body = b'test' # 4 bytes
346 b'\x11\x11\x11\x11' # timestamp
347 b'\x00\x0c\x00\x01' # type=TYPE_TABLE_DUMP(12),
348 # subtype=SUBTYPE_AFI_IPv4(1)
349 b'\x00\x00\x00\x1a' # length=26
350 b'\x22\x22\x33\x33' # view_num, seq_num
351 + addrconv.ipv4.text_to_bin(prefix) + # prefix
352 b'\x18\x01' # prefix_len=24, status=1
353 b'\x44\x44\x44\x44' # originated_time
354 + addrconv.ipv4.text_to_bin(peer_ip) + # peer_ip
355 b'\xfd\xe8\x00\x04' # peer_as=65000, attr_len=len(body)
356 + body # bgp_attributes
359 mock_bgp_attr = mock.MagicMock(spec=bgp._PathAttribute)
360 mock_bgp_attr.serialize.return_value = body
362 message = mrtlib.TableDumpAfiIPv4MrtMessage(
368 originated_time=0x44444444,
371 bgp_attributes=[mock_bgp_attr],
374 record = mrtlib.TableDumpMrtRecord(
376 timestamp=0x11111111,
382 output = record.serialize()
386 @mock.patch('ryu.lib.packet.bgp._PathAttribute.parser')
387 def test_parse_afi_ipv6(self, mock_bgp_attr_parser):
388 prefix = '2001:db8::1'
390 body = b'test' # 4 bytes
392 b'\x11\x11\x11\x11' # timestamp
393 b'\x00\x0c\x00\x02' # type=TYPE_TABLE_DUMP(12),
394 # subtype=SUBTYPE_AFI_IPv6(2)
395 b'\x00\x00\x00\x32' # length=50
396 b'\x22\x22\x33\x33' # view_num, seq_num
397 + addrconv.ipv6.text_to_bin(prefix) + # prefix
398 b'\x40\x01' # prefix_len=64, status=1
399 b'\x44\x44\x44\x44' # originated_time
400 + addrconv.ipv6.text_to_bin(peer_ip) + # peer_ip
401 b'\xfd\xe8\x00\x04' # peer_as=65000, attr_len=len(body)
402 + body # bgp_attributes
405 mock_bgp_attr = mock.MagicMock(spec=bgp._PathAttribute)
406 mock_bgp_attr_parser.return_value = (mock_bgp_attr, b'')
408 (record, rest) = mrtlib.MrtRecord.parse(buf)
410 eq_(0x11111111, record.timestamp)
411 eq_(mrtlib.MrtRecord.TYPE_TABLE_DUMP, record.type)
412 eq_(mrtlib.TableDumpMrtRecord.SUBTYPE_AFI_IPv6, record.subtype)
413 eq_(0x00000032, record.length)
414 eq_(0x2222, record.message.view_num)
415 eq_(0x3333, record.message.seq_num)
416 eq_(prefix, record.message.prefix)
417 eq_(64, record.message.prefix_len)
418 eq_(1, record.message.status)
419 eq_(0x44444444, record.message.originated_time)
420 eq_(peer_ip, record.message.peer_ip)
421 eq_(65000, record.message.peer_as)
422 eq_(0x0004, record.message.attr_len)
423 eq_([mock_bgp_attr], record.message.bgp_attributes)
426 def test_serialize_afi_ipv6(self):
427 prefix = '2001:db8::1'
429 body = b'test' # 4 bytes
431 b'\x11\x11\x11\x11' # timestamp
432 b'\x00\x0c\x00\x02' # type=TYPE_TABLE_DUMP(12),
433 # subtype=SUBTYPE_AFI_IPv6(2)
434 b'\x00\x00\x00\x32' # length=50
435 b'\x22\x22\x33\x33' # view_num, seq_num
436 + addrconv.ipv6.text_to_bin(prefix) + # prefix
437 b'\x40\x01' # prefix_len=64, status=1
438 b'\x44\x44\x44\x44' # originated_time
439 + addrconv.ipv6.text_to_bin(peer_ip) + # peer_ip
440 b'\xfd\xe8\x00\x04' # peer_as=65000, attr_len=len(body)
441 + body # bgp_attributes
444 mock_bgp_attr = mock.MagicMock(spec=bgp._PathAttribute)
445 mock_bgp_attr.serialize.return_value = body
447 message = mrtlib.TableDumpAfiIPv6MrtMessage(
453 originated_time=0x44444444,
456 bgp_attributes=[mock_bgp_attr],
459 record = mrtlib.TableDumpMrtRecord(
461 timestamp=0x11111111,
467 output = record.serialize()
472 class TestMrtlibTableDump2MrtRecord(unittest.TestCase):
474 Test case for ryu.lib.mrtlib.TableDump2MrtRecord.
477 # Note: The classes corresponding to the following subtypes are
478 # tested in TestMrtlibMrtRecord.
479 # - SUBTYPE_PEER_INDEX_TABLE = 1
480 # - SUBTYPE_RIB_IPV4_UNICAST = 2
481 # - SUBTYPE_RIB_IPV4_MULTICAST = 3
482 # - SUBTYPE_RIB_IPV6_UNICAST = 4
483 # - SUBTYPE_RIB_IPV6_MULTICAST = 5
485 @mock.patch('ryu.lib.mrtlib.MrtRibEntry.parse')
486 @mock.patch('ryu.lib.packet.bgp.BGPNLRI.parser')
487 def test_parse_rib_generic(self, mock_nlri_parser, mock_rib_entry_parser):
488 nlri_bin = b'nlri' # 4 bytes
489 rib_entries_bin = b'ribs' # 4 bytes
491 b'\x11\x11\x11\x11' # timestamp
492 b'\x00\x0d\x00\x06' # type=TYPE_TABLE_DUMP_V2(13),
493 # subtype=SUBTYPE_RIB_GENERIC(6)
494 b'\x00\x00\x00\x11' # length=17
495 b'\x22\x22\x22\x22' # seq_num
496 b'\x33\x33\x44' # afi, safi
498 b'\x00\x01' # entry_count
499 + rib_entries_bin # rib_entries
502 b'\x00\x01' # entry_count
503 + rib_entries_bin # rib_entries
506 mock_bgp_nlri = mock.MagicMock(spec=bgp._AddrPrefix)
507 mock_nlri_parser.return_value = (mock_bgp_nlri, buf_entries)
509 mock_rib_entry = mock.MagicMock(spec=mrtlib.MrtRibEntry)
510 mock_rib_entry_parser.return_value = (mock_rib_entry, b'')
512 (record, rest) = mrtlib.MrtRecord.parse(buf)
514 eq_(0x11111111, record.timestamp)
515 eq_(mrtlib.MrtRecord.TYPE_TABLE_DUMP_V2, record.type)
516 eq_(mrtlib.TableDump2MrtRecord.SUBTYPE_RIB_GENERIC, record.subtype)
517 eq_(0x00000011, record.length)
518 eq_(0x22222222, record.message.seq_num)
519 eq_(0x3333, record.message.afi)
520 eq_(0x44, record.message.safi)
521 eq_(mock_bgp_nlri, record.message.nlri)
522 eq_(0x0001, record.message.entry_count)
523 eq_([mock_rib_entry], record.message.rib_entries)
526 def test_serialize_rib_generic(self):
527 nlri_bin = b'nlri' # 4 bytes
528 rib_entries_bin = b'ribs' # 4 bytes
530 b'\x11\x11\x11\x11' # timestamp
531 b'\x00\x0d\x00\x06' # type=TYPE_TABLE_DUMP_V2(13),
532 # subtype=SUBTYPE_RIB_GENERIC(6)
533 b'\x00\x00\x00\x11' # length=17
534 b'\x22\x22\x22\x22' # seq_num
535 b'\x33\x33\x44' # afi, safi
537 b'\x00\x01' # entry_count
538 + rib_entries_bin # rib_entries
541 mock_bgp_nlri = mock.MagicMock(spec=bgp._AddrPrefix)
542 mock_bgp_nlri.serialize.return_value = nlri_bin
544 mock_rib_entry = mock.MagicMock(spec=mrtlib.MrtRibEntry)
545 mock_rib_entry.serialize.return_value = rib_entries_bin
547 message = mrtlib.TableDump2RibGenericMrtMessage(
552 rib_entries=[mock_rib_entry],
555 record = mrtlib.TableDump2MrtRecord(
557 timestamp=0x11111111,
563 output = record.serialize()
568 class TestMrtlibMrtPeer(unittest.TestCase):
570 Test case for ryu.lib.mrtlib.MrtPeer.
573 def test_parse_two_octet_as(self):
578 + addrconv.ipv4.text_to_bin(bgp_id) # bgp_id
579 + addrconv.ipv4.text_to_bin(ip_addr) + # ip_addr
583 peer, rest = mrtlib.MrtPeer.parse(buf)
586 eq_(bgp_id, peer.bgp_id)
587 eq_(ip_addr, peer.ip_addr)
588 eq_(65000, peer.as_num)
591 def test_serialize_two_octet_as(self):
596 + addrconv.ipv4.text_to_bin(bgp_id) # bgp_id
597 + addrconv.ipv4.text_to_bin(ip_addr) + # ip_addr
601 peer = mrtlib.MrtPeer(
608 output = peer.serialize()
613 class TestMrtlibMrtRibEntry(unittest.TestCase):
615 Test case for ryu.lib.mrtlib.MrtRibEntry.
618 def test_parse_add_path(self):
622 bgp_attribute = bgp.BGPPathAttributeNextHop(nexthop)
624 bgp_attr_buf = bgp_attribute.serialize()
625 attr_len = len(bgp_attr_buf)
627 b'\x00\x01' # peer_index
628 b'\x00\x00\x00\x02' # originated_time
629 b'\x00\x00\x00\x03' # path_id
630 + struct.pack('!H', attr_len) # attr_len
631 + bgp_attribute.serialize() # bgp_attributes
634 rib, rest = mrtlib.MrtRibEntry.parse(buf, is_addpath=True)
636 eq_(peer_index, rib.peer_index)
637 eq_(originated_time, rib.originated_time)
638 eq_(path_id, rib.path_id)
639 eq_(attr_len, rib.attr_len)
640 eq_(1, len(rib.bgp_attributes))
641 eq_(nexthop, rib.bgp_attributes[0].value)
644 def test_serialize_add_path(self):
648 bgp_attribute = bgp.BGPPathAttributeNextHop(nexthop)
650 bgp_attr_buf = bgp_attribute.serialize()
651 attr_len = len(bgp_attr_buf)
653 b'\x00\x01' # peer_index
654 b'\x00\x00\x00\x02' # originated_time
655 b'\x00\x00\x00\x03' # path_id
656 + struct.pack('!H', attr_len) # attr_len
657 + bgp_attribute.serialize() # bgp_attributes
660 rib = mrtlib.MrtRibEntry(
661 peer_index=peer_index,
662 originated_time=originated_time,
663 bgp_attributes=[bgp_attribute],
668 output = rib.serialize()
673 class TestMrtlibBgp4MpMrtRecord(unittest.TestCase):
675 Test case for ryu.lib.mrtlib.Bgp4MpMrtRecord.
678 # Note: The classes corresponding to the following subtypes are
679 # tested in TestMrtlibMrtRecord.
680 # - SUBTYPE_BGP4MP_MESSAGE = 1
681 # - SUBTYPE_BGP4MP_MESSAGE_AS4 = 4
682 # - SUBTYPE_BGP4MP_STATE_CHANGE_AS4 = 5
683 # - SUBTYPE_BGP4MP_MESSAGE_LOCAL = 6
684 # - SUBTYPE_BGP4MP_MESSAGE_AS4_LOCAL = 7
686 def test_parse_state_change_afi_ipv4(self):
688 local_ip = '10.0.0.2'
690 b'\x11\x11\x11\x11' # timestamp
691 b'\x00\x10\x00\x00' # type=TYPE_BGP4MP(16),
692 # subtype=SUBTYPE_BGP4MP_STATE_CHANGE(0)
693 b'\x00\x00\x00\x14' # length=20
694 b'\xfd\xe9\xfd\xea' # peer_as=65001, local_as=65002
695 b'\x22\x22\x00\x01' # if_index, addr_family=AFI_IPv4(1)
696 + addrconv.ipv4.text_to_bin(peer_ip) # peer_ip
697 + addrconv.ipv4.text_to_bin(local_ip) + # local_ip
698 b'\x00\x01\x00\x02' # old_state=STATE_IDLE(1),
699 # new_state=STATE_CONNECT(2)
702 (record, rest) = mrtlib.MrtRecord.parse(buf)
704 eq_(0x11111111, record.timestamp)
705 eq_(mrtlib.MrtRecord.TYPE_BGP4MP, record.type)
706 eq_(mrtlib.Bgp4MpMrtRecord.SUBTYPE_BGP4MP_STATE_CHANGE, record.subtype)
707 eq_(0x00000014, record.length)
708 eq_(65001, record.message.peer_as)
709 eq_(65002, record.message.local_as)
710 eq_(0x2222, record.message.if_index)
711 eq_(mrtlib.Bgp4MpStateChangeMrtMessage.AFI_IPv4,
713 eq_(mrtlib.Bgp4MpStateChangeMrtMessage.STATE_IDLE,
714 record.message.old_state)
715 eq_(mrtlib.Bgp4MpStateChangeMrtMessage.STATE_CONNECT,
716 record.message.new_state)
719 def test_serialize_state_change_afi_ipv4(self):
721 local_ip = '10.0.0.2'
723 b'\x11\x11\x11\x11' # timestamp
724 b'\x00\x10\x00\x00' # type=TYPE_BGP4MP(16),
725 # subtype=SUBTYPE_BGP4MP_STATE_CHANGE(0)
726 b'\x00\x00\x00\x14' # length=20
727 b'\xfd\xe9\xfd\xea' # peer_as=65001, local_as=65002
728 b'\x22\x22\x00\x01' # if_index, addr_family=AFI_IPv4(1)
729 + addrconv.ipv4.text_to_bin(peer_ip) # peer_ip
730 + addrconv.ipv4.text_to_bin(local_ip) + # local_ip
731 b'\x00\x01\x00\x02' # old_state=STATE_IDLE(1),
732 # new_state=STATE_CONNECT(2)
735 message = mrtlib.Bgp4MpStateChangeMrtMessage(
741 old_state=mrtlib.Bgp4MpStateChangeMrtMessage.STATE_IDLE,
742 new_state=mrtlib.Bgp4MpStateChangeMrtMessage.STATE_CONNECT,
743 # afi=mrtlib.Bgp4MpStateChangeMrtMessage.AFI_IPv4,
745 record = mrtlib.Bgp4MpMrtRecord(
747 timestamp=0x11111111,
753 output = record.serialize()
757 def test_parse_state_change_afi_ipv6(self):
761 b'\x11\x11\x11\x11' # timestamp
762 b'\x00\x10\x00\x00' # type=TYPE_BGP4MP(16),
763 # subtype=SUBTYPE_BGP4MP_STATE_CHANGE(0)
764 b'\x00\x00\x00\x2c' # length=44
765 b'\xfd\xe9\xfd\xea' # peer_as=65001, local_as=65002
766 b'\x22\x22\x00\x02' # if_index, addr_family=AFI_IPv6(2)
767 + addrconv.ipv6.text_to_bin(peer_ip) # peer_ip
768 + addrconv.ipv6.text_to_bin(local_ip) + # local_ip
769 b'\x00\x01\x00\x02' # old_state=STATE_IDLE(1),
770 # new_state=STATE_CONNECT(2)
773 (record, rest) = mrtlib.MrtRecord.parse(buf)
775 eq_(0x11111111, record.timestamp)
776 eq_(mrtlib.MrtRecord.TYPE_BGP4MP, record.type)
777 eq_(mrtlib.Bgp4MpMrtRecord.SUBTYPE_BGP4MP_STATE_CHANGE, record.subtype)
778 eq_(0x0000002c, record.length)
779 eq_(65001, record.message.peer_as)
780 eq_(65002, record.message.local_as)
781 eq_(0x2222, record.message.if_index)
782 eq_(mrtlib.Bgp4MpStateChangeMrtMessage.AFI_IPv6,
784 eq_(mrtlib.Bgp4MpStateChangeMrtMessage.STATE_IDLE,
785 record.message.old_state)
786 eq_(mrtlib.Bgp4MpStateChangeMrtMessage.STATE_CONNECT,
787 record.message.new_state)
790 def test_serialize_state_change_afi_ipv6(self):
794 b'\x11\x11\x11\x11' # timestamp
795 b'\x00\x10\x00\x00' # type=TYPE_BGP4MP(16),
796 # subtype=SUBTYPE_BGP4MP_STATE_CHANGE(0)
797 b'\x00\x00\x00\x2c' # length=44
798 b'\xfd\xe9\xfd\xea' # peer_as=65001, local_as=65002
799 b'\x22\x22\x00\x02' # if_index, addr_family=AFI_IPv6(2)
800 + addrconv.ipv6.text_to_bin(peer_ip) # peer_ip
801 + addrconv.ipv6.text_to_bin(local_ip) + # local_ip
802 b'\x00\x01\x00\x02' # old_state=STATE_IDLE(1),
803 # new_state=STATE_CONNECT(2)
806 message = mrtlib.Bgp4MpStateChangeMrtMessage(
812 old_state=mrtlib.Bgp4MpStateChangeMrtMessage.STATE_IDLE,
813 new_state=mrtlib.Bgp4MpStateChangeMrtMessage.STATE_CONNECT,
814 # afi=mrtlib.Bgp4MpStateChangeMrtMessage.AFI_IPv4,
816 record = mrtlib.Bgp4MpMrtRecord(
818 timestamp=0x11111111,
824 output = record.serialize()