1 # Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
23 from nose.tools import *
24 from ryu.lib import addrconv
25 from ryu.lib.packet import cfm
27 LOG = logging.getLogger(__name__)
30 class Test_cfm(unittest.TestCase):
33 self.message = cfm.cc_message()
34 self.ins = cfm.cfm(self.message)
37 self.buf = self.ins.serialize(data, prev)
39 def setUp_cc_message(self):
40 self.cc_message_md_lv = 1
41 self.cc_message_version = 1
42 self.cc_message_rdi = 1
43 self.cc_message_interval = 1
44 self.cc_message_seq_num = 123
45 self.cc_message_mep_id = 4
46 self.cc_message_md_name_format = 4
47 self.cc_message_md_name_length = 0
48 self.cc_message_md_name = b"hoge"
49 self.cc_message_short_ma_name_format = 2
50 self.cc_message_short_ma_name_length = 0
51 self.cc_message_short_ma_name = b"pakeratta"
52 self.cc_message_md_name_txfcf = 11
53 self.cc_message_md_name_rxfcb = 22
54 self.cc_message_md_name_txfcb = 33
55 self.cc_message_tlvs = [
57 cfm.port_status_tlv(),
59 cfm.interface_status_tlv(),
60 cfm.reply_ingress_tlv(),
61 cfm.reply_egress_tlv(),
62 cfm.ltm_egress_identifier_tlv(),
63 cfm.ltr_egress_identifier_tlv(),
64 cfm.organization_specific_tlv(),
66 self.message = cfm.cc_message(
67 self.cc_message_md_lv,
68 self.cc_message_version,
70 self.cc_message_interval,
71 self.cc_message_seq_num,
72 self.cc_message_mep_id,
73 self.cc_message_md_name_format,
74 self.cc_message_md_name_length,
75 self.cc_message_md_name,
76 self.cc_message_short_ma_name_format,
77 self.cc_message_short_ma_name_length,
78 self.cc_message_short_ma_name,
81 self.ins = cfm.cfm(self.message)
84 self.buf = self.ins.serialize(data, prev)
86 def setUp_loopback_message(self):
87 self.loopback_message_md_lv = 1
88 self.loopback_message_version = 1
89 self.loopback_message_transaction_id = 12345
90 self.loopback_message_tlvs = [
92 cfm.port_status_tlv(),
94 cfm.interface_status_tlv(),
95 cfm.reply_ingress_tlv(),
96 cfm.reply_egress_tlv(),
97 cfm.ltm_egress_identifier_tlv(),
98 cfm.ltr_egress_identifier_tlv(),
99 cfm.organization_specific_tlv(),
101 self.message = cfm.loopback_message(
102 self.loopback_message_md_lv,
103 self.loopback_message_version,
104 self.loopback_message_transaction_id,
105 self.loopback_message_tlvs)
106 self.ins = cfm.cfm(self.message)
109 self.buf = self.ins.serialize(data, prev)
111 def setUp_loopback_reply(self):
112 self.loopback_reply_md_lv = 1
113 self.loopback_reply_version = 1
114 self.loopback_reply_transaction_id = 12345
115 self.loopback_reply_tlvs = [
117 cfm.port_status_tlv(),
119 cfm.interface_status_tlv(),
120 cfm.reply_ingress_tlv(),
121 cfm.reply_egress_tlv(),
122 cfm.ltm_egress_identifier_tlv(),
123 cfm.ltr_egress_identifier_tlv(),
124 cfm.organization_specific_tlv(),
126 self.message = cfm.loopback_reply(
127 self.loopback_reply_md_lv,
128 self.loopback_reply_version,
129 self.loopback_reply_transaction_id,
130 self.loopback_reply_tlvs)
131 self.ins = cfm.cfm(self.message)
134 self.buf = self.ins.serialize(data, prev)
136 def setUp_link_trace_message(self):
137 self.link_trace_message_md_lv = 1
138 self.link_trace_message_version = 1
139 self.link_trace_message_use_fdb_only = 1
140 self.link_trace_message_transaction_id = 12345
141 self.link_trace_message_ttl = 123
142 self.link_trace_message_ltm_orig_addr = '11:22:33:44:55:66'
143 self.link_trace_message_ltm_targ_addr = '77:88:99:aa:cc:dd'
144 self.link_trace_message_tlvs = [
146 cfm.port_status_tlv(),
148 cfm.interface_status_tlv(),
149 cfm.reply_ingress_tlv(),
150 cfm.reply_egress_tlv(),
151 cfm.ltm_egress_identifier_tlv(),
152 cfm.ltr_egress_identifier_tlv(),
153 cfm.organization_specific_tlv(),
155 self.message = cfm.link_trace_message(
156 self.link_trace_message_md_lv,
157 self.link_trace_message_version,
158 self.link_trace_message_use_fdb_only,
159 self.link_trace_message_transaction_id,
160 self.link_trace_message_ttl,
161 self.link_trace_message_ltm_orig_addr,
162 self.link_trace_message_ltm_targ_addr,
163 self.link_trace_message_tlvs)
164 self.ins = cfm.cfm(self.message)
167 self.buf = self.ins.serialize(data, prev)
169 def setUp_link_trace_reply(self):
170 self.link_trace_reply_md_lv = 1
171 self.link_trace_reply_version = 1
172 self.link_trace_reply_use_fdb_only = 1
173 self.link_trace_reply_fwd_yes = 0
174 self.link_trace_reply_terminal_mep = 1
175 self.link_trace_reply_transaction_id = 5432
176 self.link_trace_reply_ttl = 123
177 self.link_trace_reply_relay_action = 3
178 self.link_trace_reply_tlvs = [
180 cfm.port_status_tlv(),
182 cfm.interface_status_tlv(),
183 cfm.reply_ingress_tlv(),
184 cfm.reply_egress_tlv(),
185 cfm.ltm_egress_identifier_tlv(),
186 cfm.ltr_egress_identifier_tlv(),
187 cfm.organization_specific_tlv(),
189 self.message = cfm.link_trace_reply(
190 self.link_trace_reply_md_lv,
191 self.link_trace_reply_version,
192 self.link_trace_reply_use_fdb_only,
193 self.link_trace_reply_fwd_yes,
194 self.link_trace_reply_terminal_mep,
195 self.link_trace_reply_transaction_id,
196 self.link_trace_reply_ttl,
197 self.link_trace_reply_relay_action,
198 self.link_trace_reply_tlvs)
199 self.ins = cfm.cfm(self.message)
202 self.buf = self.ins.serialize(data, prev)
208 eq_(str(self.message), str(self.ins.op))
210 def test_init_cc_message(self):
211 self.setUp_cc_message()
214 def test_init_loopback_message(self):
215 self.setUp_loopback_message()
218 def test_init_loopback_reply(self):
219 self.setUp_loopback_reply()
222 def test_init_link_trace_message(self):
223 self.setUp_link_trace_message()
226 def test_init_link_trace_reply(self):
227 self.setUp_link_trace_reply()
230 def test_parser(self):
231 _res = self.ins.parser(six.binary_type(self.buf))
233 if type(_res) is tuple:
237 eq_(str(self.message), str(res.op))
239 def test_parser_with_cc_message(self):
240 self.setUp_cc_message()
243 def test_parser_with_loopback_message(self):
244 self.setUp_loopback_message()
247 def test_parser_with_loopback_reply(self):
248 self.setUp_loopback_reply()
251 def test_parser_with_link_trace_message(self):
252 self.setUp_link_trace_message()
255 def test_parser_with_link_trace_reply(self):
256 self.setUp_link_trace_reply()
259 def test_serialize(self):
262 def test_serialize_with_cc_message(self):
263 self.setUp_cc_message()
264 self.test_serialize()
267 buf = self.ins.serialize(data, prev)
268 cc_message = cfm.cc_message.parser(six.binary_type(buf))
269 eq_(repr(self.message), repr(cc_message))
271 def test_serialize_with_loopback_message(self):
272 self.setUp_loopback_message()
273 self.test_serialize()
276 buf = self.ins.serialize(data, prev)
277 loopback_message = cfm.loopback_message.parser(six.binary_type(buf))
278 eq_(repr(self.message), repr(loopback_message))
280 def test_serialize_with_loopback_reply(self):
281 self.setUp_loopback_reply()
282 self.test_serialize()
285 buf = self.ins.serialize(data, prev)
286 loopback_reply = cfm.loopback_reply.parser(six.binary_type(buf))
287 eq_(repr(self.message), repr(loopback_reply))
289 def test_serialize_with_link_trace_message(self):
290 self.setUp_link_trace_message()
291 self.test_serialize()
294 buf = self.ins.serialize(data, prev)
295 link_trace_message = cfm.link_trace_message.parser(six.binary_type(buf))
296 eq_(repr(self.message), repr(link_trace_message))
298 def test_serialize_with_link_trace_reply(self):
299 self.setUp_link_trace_reply()
300 self.test_serialize()
303 buf = self.ins.serialize(data, prev)
304 link_trace_reply = cfm.link_trace_reply.parser(six.binary_type(buf))
305 eq_(repr(self.message), repr(link_trace_reply))
307 def test_to_string(self):
308 cfm_values = {'op': self.message}
309 _cfm_str = ','.join(['%s=%s' % (k, cfm_values[k])
310 for k, v in inspect.getmembers(self.ins)
312 cfm_str = '%s(%s)' % (cfm.cfm.__name__, _cfm_str)
313 eq_(str(self.ins), cfm_str)
314 eq_(repr(self.ins), cfm_str)
316 def test_to_string_cc_message(self):
317 self.setUp_cc_message()
318 self.test_to_string()
320 def test_to_string_loopback_message(self):
321 self.setUp_loopback_message()
322 self.test_to_string()
324 def test_to_string_loopback_reply(self):
325 self.setUp_loopback_reply()
326 self.test_to_string()
328 def test_to_string_link_trace_message(self):
329 self.setUp_link_trace_message()
330 self.test_to_string()
332 def test_to_string_link_trace_reply(self):
333 self.setUp_link_trace_reply()
334 self.test_to_string()
339 def test_len_cc_message(self):
340 self.setUp_cc_message()
341 eq_(len(self.ins), 0 + len(self.message))
343 def test_len_loopback_message(self):
344 self.setUp_loopback_message()
345 eq_(len(self.ins), 0 + len(self.message))
347 def test_len_loopback_reply(self):
348 self.setUp_loopback_reply()
349 eq_(len(self.ins), 0 + len(self.message))
351 def test_len_link_trace_message(self):
352 self.setUp_link_trace_message()
353 eq_(len(self.ins), 0 + len(self.message))
355 def test_len_link_trace_reply(self):
356 self.setUp_link_trace_reply()
357 eq_(len(self.ins), 0 + len(self.message))
359 def test_default_args(self):
363 jsondict = self.ins.to_jsondict()
364 ins = cfm.cfm.from_jsondict(jsondict['cfm'])
365 eq_(str(self.ins), str(ins))
367 def test_json_with_cc_message(self):
368 self.setUp_cc_message()
371 def test_json_with_loopback_message(self):
372 self.setUp_loopback_message()
375 def test_json_with_loopback_reply(self):
376 self.setUp_loopback_reply()
379 def test_json_with_link_trace_message(self):
380 self.setUp_link_trace_message()
383 def test_json_with_link_trace_reply(self):
384 self.setUp_link_trace_reply()
388 class Test_cc_message(unittest.TestCase):
393 self.opcode = cfm.CFM_CC_MESSAGE
396 self.first_tlv_offset = cfm.cc_message._TLV_OFFSET
399 self.md_name_format = cfm.cc_message._MD_FMT_CHARACTER_STRING
400 self.md_name_length = 3
401 self.md_name = b"foo"
402 self.short_ma_name_format = 2
403 self.short_ma_name_length = 8
404 self.short_ma_name = b"hogehoge"
408 self.ins = cfm.cc_message(
418 self.short_ma_name_format,
419 self.short_ma_name_length,
424 self.form = '!4BIH2B3s2B8s33x12x4xB'
425 self.buf = struct.pack(
427 (self.md_lv << 5) | self.version,
429 (self.rdi << 7) | self.interval,
430 self.first_tlv_offset,
436 self.short_ma_name_format,
437 self.short_ma_name_length,
446 eq_(self.md_lv, self.ins.md_lv)
447 eq_(self.version, self.ins.version)
448 eq_(self.rdi, self.ins.rdi)
449 eq_(self.interval, self.ins.interval)
450 eq_(self.seq_num, self.ins.seq_num)
451 eq_(self.mep_id, self.ins.mep_id)
452 eq_(self.md_name_format, self.ins.md_name_format)
453 eq_(self.md_name_length, self.ins.md_name_length)
454 eq_(self.md_name, self.ins.md_name)
455 eq_(self.short_ma_name_format, self.ins.short_ma_name_format)
456 eq_(self.short_ma_name_length, self.ins.short_ma_name_length)
457 eq_(self.short_ma_name, self.ins.short_ma_name)
458 eq_(self.tlvs, self.ins.tlvs)
460 def test_parser(self):
461 _res = cfm.cc_message.parser(self.buf)
462 if type(_res) is tuple:
466 eq_(self.md_lv, res.md_lv)
467 eq_(self.version, res.version)
468 eq_(self.rdi, res.rdi)
469 eq_(self.interval, res.interval)
470 eq_(self.seq_num, res.seq_num)
471 eq_(self.mep_id, res.mep_id)
472 eq_(self.md_name_format, res.md_name_format)
473 eq_(self.md_name_length, res.md_name_length)
474 eq_(self.md_name, res.md_name)
475 eq_(self.short_ma_name_format, res.short_ma_name_format)
476 eq_(self.short_ma_name_length, res.short_ma_name_length)
477 eq_(self.short_ma_name, res.short_ma_name)
478 eq_(self.tlvs, res.tlvs)
480 def test_parser_with_no_maintenance_domain_name_present(self):
481 form = '!4BIH3B8s37x12x4xB'
484 (self.md_lv << 5) | self.version,
486 (self.rdi << 7) | self.interval,
487 self.first_tlv_offset,
490 cfm.cc_message._MD_FMT_NO_MD_NAME_PRESENT,
491 self.short_ma_name_format,
492 self.short_ma_name_length,
496 _res = cfm.cc_message.parser(buf)
497 if type(_res) is tuple:
501 eq_(self.md_lv, res.md_lv)
502 eq_(self.version, res.version)
503 eq_(self.rdi, res.rdi)
504 eq_(self.interval, res.interval)
505 eq_(self.seq_num, res.seq_num)
506 eq_(self.mep_id, res.mep_id)
507 eq_(cfm.cc_message._MD_FMT_NO_MD_NAME_PRESENT, res.md_name_format)
508 eq_(self.short_ma_name_format, res.short_ma_name_format)
509 eq_(self.short_ma_name_length, res.short_ma_name_length)
510 eq_(self.short_ma_name, res.short_ma_name)
511 eq_(self.tlvs, res.tlvs)
513 def test_serialize(self):
514 buf = self.ins.serialize()
515 res = struct.unpack_from(self.form, six.binary_type(buf))
516 eq_(self.md_lv, res[0] >> 5)
517 eq_(self.version, res[0] & 0x1f)
518 eq_(self.opcode, res[1])
519 eq_(self.rdi, res[2] >> 7)
520 eq_(self.interval, res[2] & 0x07)
521 eq_(self.first_tlv_offset, res[3])
522 eq_(self.seq_num, res[4])
523 eq_(self.mep_id, res[5])
524 eq_(self.md_name_format, res[6])
525 eq_(self.md_name_length, res[7])
526 eq_(self.md_name, res[8])
527 eq_(self.short_ma_name_format, res[9])
528 eq_(self.short_ma_name_length, res[10])
529 eq_(self.short_ma_name, res[11])
530 eq_(self.end_tlv, res[12])
532 def test_serialize_with_md_name_length_zero(self):
533 ins = cfm.cc_message(
543 self.short_ma_name_format,
548 buf = ins.serialize()
549 res = struct.unpack_from(self.form, six.binary_type(buf))
550 eq_(self.md_lv, res[0] >> 5)
551 eq_(self.version, res[0] & 0x1f)
552 eq_(self.opcode, res[1])
553 eq_(self.rdi, res[2] >> 7)
554 eq_(self.interval, res[2] & 0x07)
555 eq_(self.first_tlv_offset, res[3])
556 eq_(self.seq_num, res[4])
557 eq_(self.mep_id, res[5])
558 eq_(self.md_name_format, res[6])
559 eq_(self.md_name_length, res[7])
560 eq_(self.md_name, res[8])
561 eq_(self.short_ma_name_format, res[9])
562 eq_(self.short_ma_name_length, res[10])
563 eq_(self.short_ma_name, res[11])
564 eq_(self.end_tlv, res[12])
566 def test_serialize_with_no_maintenance_domain_name_present(self):
567 form = '!4BIH3B8s37x12x4xB'
568 ins = cfm.cc_message(
575 cfm.cc_message._MD_FMT_NO_MD_NAME_PRESENT,
578 self.short_ma_name_format,
583 buf = ins.serialize()
584 res = struct.unpack_from(form, six.binary_type(buf))
585 eq_(self.md_lv, res[0] >> 5)
586 eq_(self.version, res[0] & 0x1f)
587 eq_(self.opcode, res[1])
588 eq_(self.rdi, res[2] >> 7)
589 eq_(self.interval, res[2] & 0x07)
590 eq_(self.first_tlv_offset, res[3])
591 eq_(self.seq_num, res[4])
592 eq_(self.mep_id, res[5])
593 eq_(cfm.cc_message._MD_FMT_NO_MD_NAME_PRESENT, res[6])
594 eq_(self.short_ma_name_format, res[7])
595 eq_(self.short_ma_name_length, res[8])
596 eq_(self.short_ma_name, res[9])
597 eq_(self.end_tlv, res[10])
600 # 75 octet (If tlv does not exist)
601 eq_(75, len(self.ins))
603 def test_default_args(self):
604 ins = cfm.cc_message()
605 buf = ins.serialize()
606 res = struct.unpack_from(cfm.cc_message._PACK_STR, six.binary_type(buf))
608 eq_(res[0] & 0x1f, 0)
611 eq_(res[2] & 0x07, 4)
618 class Test_loopback_message(unittest.TestCase):
623 self.opcode = cfm.CFM_LOOPBACK_MESSAGE
625 self.first_tlv_offset = cfm.loopback_message._TLV_OFFSET
626 self.transaction_id = 12345
631 self.ins = cfm.loopback_message(
638 self.buf = struct.pack(
640 (self.md_lv << 5) | self.version,
643 self.first_tlv_offset,
652 eq_(self.md_lv, self.ins.md_lv)
653 eq_(self.version, self.ins.version)
654 eq_(self.transaction_id, self.ins.transaction_id)
655 eq_(self.tlvs, self.ins.tlvs)
657 def test_parser(self):
658 _res = cfm.loopback_message.parser(self.buf)
659 if type(_res) is tuple:
663 eq_(self.md_lv, res.md_lv)
664 eq_(self.version, res.version)
665 eq_(self.transaction_id, res.transaction_id)
666 eq_(self.tlvs, res.tlvs)
668 def test_serialize(self):
669 buf = self.ins.serialize()
670 res = struct.unpack_from(self.form, six.binary_type(buf))
671 eq_(self.md_lv, res[0] >> 5)
672 eq_(self.version, res[0] & 0x1f)
673 eq_(self.opcode, res[1])
674 eq_(self.flags, res[2])
675 eq_(self.first_tlv_offset, res[3])
676 eq_(self.transaction_id, res[4])
677 eq_(self.end_tlv, res[5])
680 # 9 octet (If tlv does not exist)
681 eq_(9, len(self.ins))
683 def test_default_args(self):
684 ins = cfm.loopback_message()
685 buf = ins.serialize()
686 res = struct.unpack_from(cfm.loopback_message._PACK_STR,
687 six.binary_type(buf))
689 eq_(res[0] & 0x1f, 0)
696 class Test_loopback_reply(unittest.TestCase):
701 self.opcode = cfm.CFM_LOOPBACK_REPLY
703 self.first_tlv_offset = cfm.loopback_reply._TLV_OFFSET
704 self.transaction_id = 12345
708 self.ins = cfm.loopback_reply(
715 self.buf = struct.pack(
717 (self.md_lv << 5) | self.version,
720 self.first_tlv_offset,
729 eq_(self.md_lv, self.ins.md_lv)
730 eq_(self.version, self.ins.version)
731 eq_(self.transaction_id, self.ins.transaction_id)
732 eq_(self.tlvs, self.ins.tlvs)
734 def test_parser(self):
735 _res = cfm.loopback_reply.parser(self.buf)
736 if type(_res) is tuple:
740 eq_(self.md_lv, res.md_lv)
741 eq_(self.version, res.version)
742 eq_(self.transaction_id, res.transaction_id)
743 eq_(self.tlvs, res.tlvs)
745 def test_serialize(self):
746 buf = self.ins.serialize()
747 res = struct.unpack_from(self.form, six.binary_type(buf))
748 eq_(self.md_lv, res[0] >> 5)
749 eq_(self.version, res[0] & 0x1f)
750 eq_(self.opcode, res[1])
751 eq_(self.flags, res[2])
752 eq_(self.first_tlv_offset, res[3])
753 eq_(self.transaction_id, res[4])
754 eq_(self.end_tlv, res[5])
757 # 9 octet (If tlv does not exist)
758 eq_(9, len(self.ins))
760 def test_default_args(self):
761 ins = cfm.loopback_reply()
762 buf = ins.serialize()
763 res = struct.unpack_from(cfm.loopback_reply._PACK_STR, six.binary_type(buf))
765 eq_(res[0] & 0x1f, 0)
772 class Test_link_trace_message(unittest.TestCase):
777 self.opcode = cfm.CFM_LINK_TRACE_MESSAGE
778 self.use_fdb_only = 1
779 self.first_tlv_offset = cfm.link_trace_message._TLV_OFFSET
780 self.transaction_id = 12345
782 self.ltm_orig_addr = "00:11:22:44:55:66"
783 self.ltm_targ_addr = "ab:cd:ef:23:12:65"
788 self.ins = cfm.link_trace_message(
798 self.form = '!4BIB6s6sB'
799 self.buf = struct.pack(
801 (self.md_lv << 5) | self.version,
803 self.use_fdb_only << 7,
804 self.first_tlv_offset,
807 addrconv.mac.text_to_bin(self.ltm_orig_addr),
808 addrconv.mac.text_to_bin(self.ltm_targ_addr),
816 eq_(self.md_lv, self.ins.md_lv)
817 eq_(self.version, self.ins.version)
818 eq_(self.use_fdb_only, self.ins.use_fdb_only)
819 eq_(self.transaction_id, self.ins.transaction_id)
820 eq_(self.ttl, self.ins.ttl)
821 eq_(self.ltm_orig_addr, self.ins.ltm_orig_addr)
822 eq_(self.ltm_targ_addr, self.ins.ltm_targ_addr)
823 eq_(self.tlvs, self.ins.tlvs)
825 def test_parser(self):
826 _res = cfm.link_trace_message.parser(self.buf)
827 if type(_res) is tuple:
831 eq_(self.md_lv, res.md_lv)
832 eq_(self.version, res.version)
833 eq_(self.use_fdb_only, res.use_fdb_only)
834 eq_(self.transaction_id, res.transaction_id)
835 eq_(self.ttl, res.ttl)
836 eq_(self.ltm_orig_addr, res.ltm_orig_addr)
837 eq_(self.ltm_targ_addr, res.ltm_targ_addr)
838 eq_(self.tlvs, res.tlvs)
840 def test_serialize(self):
841 buf = self.ins.serialize()
842 res = struct.unpack_from(self.form, six.binary_type(buf))
843 eq_(self.md_lv, res[0] >> 5)
844 eq_(self.version, res[0] & 0x1f)
845 eq_(self.opcode, res[1])
846 eq_(self.use_fdb_only, res[2] >> 7)
847 eq_(self.first_tlv_offset, res[3])
848 eq_(self.transaction_id, res[4])
849 eq_(self.ttl, res[5])
850 eq_(addrconv.mac.text_to_bin(self.ltm_orig_addr), res[6])
851 eq_(addrconv.mac.text_to_bin(self.ltm_targ_addr), res[7])
852 eq_(self.end_tlv, res[8])
855 # 22 octet (If tlv does not exist)
856 eq_(22, len(self.ins))
858 def test_default_args(self):
859 ins = cfm.link_trace_message()
860 buf = ins.serialize()
861 res = struct.unpack_from(cfm.link_trace_message._PACK_STR, six.binary_type(buf))
863 eq_(res[0] & 0x1f, 0)
869 eq_(res[6], addrconv.mac.text_to_bin('00:00:00:00:00:00'))
870 eq_(res[7], addrconv.mac.text_to_bin('00:00:00:00:00:00'))
873 class Test_link_trace_reply(unittest.TestCase):
878 self.opcode = cfm.CFM_LINK_TRACE_REPLY
879 self.use_fdb_only = 1
881 self.terminal_mep = 1
882 self.first_tlv_offset = cfm.link_trace_reply._TLV_OFFSET
883 self.transaction_id = 12345
885 self.relay_action = 2
886 self.ltm_orig_addr = "00:11:22:aa:bb:cc"
887 self.ltm_targ_addr = "53:45:24:64:ac:ff"
891 self.ins = cfm.link_trace_reply(
902 self.form = '!4BIBBB'
903 self.buf = struct.pack(
905 (self.md_lv << 5) | self.version,
907 (self.use_fdb_only << 7) | (self.fwd_yes << 6) |
908 (self.terminal_mep << 5),
909 self.first_tlv_offset,
920 eq_(self.md_lv, self.ins.md_lv)
921 eq_(self.version, self.ins.version)
922 eq_(self.use_fdb_only, self.ins.use_fdb_only)
923 eq_(self.fwd_yes, self.ins.fwd_yes)
924 eq_(self.terminal_mep, self.ins.terminal_mep)
925 eq_(self.transaction_id, self.ins.transaction_id)
926 eq_(self.ttl, self.ins.ttl)
927 eq_(self.relay_action, self.ins.relay_action)
928 eq_(self.tlvs, self.ins.tlvs)
930 def test_parser(self):
931 _res = cfm.link_trace_reply.parser(self.buf)
932 if type(_res) is tuple:
936 eq_(self.md_lv, res.md_lv)
937 eq_(self.version, res.version)
938 eq_(self.use_fdb_only, self.ins.use_fdb_only)
939 eq_(self.fwd_yes, self.ins.fwd_yes)
940 eq_(self.terminal_mep, self.ins.terminal_mep)
941 eq_(self.transaction_id, res.transaction_id)
942 eq_(self.ttl, res.ttl)
943 eq_(self.relay_action, res.relay_action)
944 eq_(self.tlvs, res.tlvs)
946 def test_serialize(self):
947 buf = self.ins.serialize()
948 res = struct.unpack_from(self.form, six.binary_type(buf))
949 eq_(self.md_lv, res[0] >> 5)
950 eq_(self.version, res[0] & 0x1f)
951 eq_(self.opcode, res[1])
952 eq_(self.use_fdb_only, res[2] >> 7 & 0x01)
953 eq_(self.fwd_yes, res[2] >> 6 & 0x01)
954 eq_(self.terminal_mep, res[2] >> 5 & 0x01)
955 eq_(self.first_tlv_offset, res[3])
956 eq_(self.transaction_id, res[4])
957 eq_(self.ttl, res[5])
958 eq_(self.relay_action, res[6])
959 eq_(self.end_tlv, res[7])
962 # 11 octet (If tlv does not exist)
963 eq_(11, len(self.ins))
965 def test_default_args(self):
966 ins = cfm.link_trace_reply()
967 buf = ins.serialize()
968 res = struct.unpack_from(cfm.link_trace_reply._PACK_STR, six.binary_type(buf))
970 eq_(res[0] & 0x1f, 0)
973 eq_(res[2] >> 6 & 0x01, 0)
974 eq_(res[2] >> 5 & 0x01, 1)
981 class Test_sender_id_tlv(unittest.TestCase):
984 self._type = cfm.CFM_SENDER_ID_TLV
986 self.chassis_id_length = 1
987 self.chassis_id_subtype = 3
988 self.chassis_id = b"\x0a"
989 self.ma_domain_length = 2
990 self.ma_domain = b"\x04\x05"
992 self.ma = b"\x01\x02\x03"
993 self.ins = cfm.sender_id_tlv(
995 self.chassis_id_length,
996 self.chassis_id_subtype,
998 self.ma_domain_length,
1003 self.form = '!BHBB1sB2sB3s'
1004 self.buf = struct.pack(
1008 self.chassis_id_length,
1009 self.chassis_id_subtype,
1011 self.ma_domain_length,
1020 def test_init(self):
1021 eq_(self.length, self.ins.length)
1022 eq_(self.chassis_id_length, self.ins.chassis_id_length)
1023 eq_(self.chassis_id_subtype, self.ins.chassis_id_subtype)
1024 eq_(self.chassis_id, self.ins.chassis_id)
1025 eq_(self.ma_domain_length, self.ins.ma_domain_length)
1026 eq_(self.ma_domain, self.ins.ma_domain)
1027 eq_(self.ma_length, self.ins.ma_length)
1028 eq_(self.ma, self.ins.ma)
1030 def test_parser(self):
1031 _res = cfm.sender_id_tlv.parser(self.buf)
1032 if type(_res) is tuple:
1036 eq_(self.length, res.length)
1037 eq_(self.chassis_id_length, res.chassis_id_length)
1038 eq_(self.chassis_id_subtype, res.chassis_id_subtype)
1039 eq_(self.chassis_id, res.chassis_id)
1040 eq_(self.ma_domain_length, res.ma_domain_length)
1041 eq_(self.ma_domain, res.ma_domain)
1042 eq_(self.ma_length, res.ma_length)
1043 eq_(self.ma, res.ma)
1045 def test_serialize(self):
1046 buf = self.ins.serialize()
1047 res = struct.unpack_from(self.form, six.binary_type(buf))
1048 eq_(self._type, res[0])
1049 eq_(self.length, res[1])
1050 eq_(self.chassis_id_length, res[2])
1051 eq_(self.chassis_id_subtype, res[3])
1052 eq_(self.chassis_id, res[4])
1053 eq_(self.ma_domain_length, res[5])
1054 eq_(self.ma_domain, res[6])
1055 eq_(self.ma_length, res[7])
1056 eq_(self.ma, res[8])
1058 def test_serialize_semi_normal_ptn1(self):
1059 ins = cfm.sender_id_tlv(
1060 chassis_id_subtype=self.chassis_id_subtype,
1061 chassis_id=self.chassis_id,
1062 ma_domain=self.ma_domain,
1064 buf = ins.serialize()
1065 form = '!BHBB1sB2sB'
1066 res = struct.unpack_from(form, six.binary_type(buf))
1067 eq_(self._type, res[0])
1069 eq_(self.chassis_id_length, res[2])
1070 eq_(self.chassis_id_subtype, res[3])
1071 eq_(self.chassis_id, res[4])
1072 eq_(self.ma_domain_length, res[5])
1073 eq_(self.ma_domain, res[6])
1076 def test_serialize_semi_normal_ptn2(self):
1077 ins = cfm.sender_id_tlv(
1078 ma_domain=self.ma_domain,
1081 buf = ins.serialize()
1083 res = struct.unpack_from(form, six.binary_type(buf))
1084 eq_(self._type, res[0])
1087 eq_(self.ma_domain_length, res[3])
1088 eq_(self.ma_domain, res[4])
1089 eq_(self.ma_length, res[5])
1090 eq_(self.ma, res[6])
1092 def test_serialize_semi_normal_ptn3(self):
1093 ins = cfm.sender_id_tlv(
1094 chassis_id_subtype=self.chassis_id_subtype,
1095 chassis_id=self.chassis_id,
1097 buf = ins.serialize()
1099 res = struct.unpack_from(form, six.binary_type(buf))
1100 eq_(self._type, res[0])
1102 eq_(self.chassis_id_length, res[2])
1103 eq_(self.chassis_id_subtype, res[3])
1104 eq_(self.chassis_id, res[4])
1107 def test_serialize_semi_normal_ptn4(self):
1108 ins = cfm.sender_id_tlv(
1109 ma_domain=self.ma_domain,
1111 buf = ins.serialize()
1113 res = struct.unpack_from(form, six.binary_type(buf))
1114 eq_(self._type, res[0])
1117 eq_(self.ma_domain_length, res[3])
1118 eq_(self.ma_domain, res[4])
1121 def test_serialize_with_length_zero(self):
1122 ins = cfm.sender_id_tlv(
1125 self.chassis_id_subtype,
1132 buf = ins.serialize()
1133 res = struct.unpack_from(self.form, six.binary_type(buf))
1134 eq_(self._type, res[0])
1135 eq_(self.length, res[1])
1136 eq_(self.chassis_id_length, res[2])
1137 eq_(self.chassis_id_subtype, res[3])
1138 eq_(self.chassis_id, res[4])
1139 eq_(self.ma_domain_length, res[5])
1140 eq_(self.ma_domain, res[6])
1141 eq_(self.ma_length, res[7])
1142 eq_(self.ma, res[8])
1145 # tlv_length = type_len + length_len + value_len
1146 eq_(1 + 2 + 10, len(self.ins))
1148 def test_default_args(self):
1149 ins = cfm.sender_id_tlv()
1150 buf = ins.serialize()
1151 res = struct.unpack_from(cfm.sender_id_tlv._PACK_STR, six.binary_type(buf))
1152 eq_(res[0], cfm.CFM_SENDER_ID_TLV)
1157 class Test_port_status_tlv(unittest.TestCase):
1160 self._type = cfm.CFM_PORT_STATUS_TLV
1162 self.port_status = 1
1163 self.ins = cfm.port_status_tlv(
1168 self.buf = struct.pack(
1178 def test_init(self):
1179 eq_(self.length, self.ins.length)
1180 eq_(self.port_status, self.ins.port_status)
1182 def test_parser(self):
1183 _res = cfm.port_status_tlv.parser(self.buf)
1184 if type(_res) is tuple:
1188 eq_(self.length, res.length)
1189 eq_(self.port_status, res.port_status)
1191 def test_serialize(self):
1192 buf = self.ins.serialize()
1193 res = struct.unpack_from(self.form, six.binary_type(buf))
1194 eq_(self._type, res[0])
1195 eq_(self.length, res[1])
1196 eq_(self.port_status, res[2])
1199 # tlv_length = type_len + length_len + value_len
1200 eq_(1 + 2 + 1, len(self.ins))
1202 def test_default_args(self):
1203 ins = cfm.port_status_tlv()
1204 buf = ins.serialize()
1205 res = struct.unpack_from(cfm.port_status_tlv._PACK_STR, six.binary_type(buf))
1206 eq_(res[0], cfm.CFM_PORT_STATUS_TLV)
1211 class Test_data_tlv(unittest.TestCase):
1214 self._type = cfm.CFM_DATA_TLV
1216 self.data_value = b"\x01\x02\x03"
1217 self.ins = cfm.data_tlv(
1222 self.buf = struct.pack(
1232 def test_init(self):
1233 eq_(self.length, self.ins.length)
1234 eq_(self.data_value, self.ins.data_value)
1236 def test_parser(self):
1237 _res = cfm.data_tlv.parser(self.buf)
1238 if type(_res) is tuple:
1242 eq_(self.length, res.length)
1243 eq_(self.data_value, res.data_value)
1245 def test_serialize(self):
1246 buf = self.ins.serialize()
1247 res = struct.unpack_from(self.form, six.binary_type(buf))
1248 eq_(self._type, res[0])
1249 eq_(self.length, res[1])
1250 eq_(self.data_value, res[2])
1252 def test_serialize_with_length_zero(self):
1257 buf = ins.serialize()
1258 res = struct.unpack_from(self.form, six.binary_type(buf))
1259 eq_(self._type, res[0])
1260 eq_(self.length, res[1])
1261 eq_(self.data_value, res[2])
1264 # tlv_length = type_len + length_len + value_len
1265 eq_(1 + 2 + 3, len(self.ins))
1267 def test_default_args(self):
1268 ins = cfm.data_tlv()
1269 buf = ins.serialize()
1270 res = struct.unpack_from(cfm.data_tlv._PACK_STR, six.binary_type(buf))
1271 eq_(res[0], cfm.CFM_DATA_TLV)
1275 class Test_interface_status_tlv(unittest.TestCase):
1278 self._type = cfm.CFM_INTERFACE_STATUS_TLV
1280 self.interface_status = 4
1281 self.ins = cfm.interface_status_tlv(
1283 self.interface_status
1286 self.buf = struct.pack(
1290 self.interface_status
1296 def test_init(self):
1297 eq_(self.length, self.ins.length)
1298 eq_(self.interface_status, self.ins.interface_status)
1300 def test_parser(self):
1301 _res = cfm.interface_status_tlv.parser(self.buf)
1302 if type(_res) is tuple:
1306 eq_(self.length, res.length)
1307 eq_(self.interface_status, res.interface_status)
1309 def test_serialize(self):
1310 buf = self.ins.serialize()
1311 res = struct.unpack_from(self.form, six.binary_type(buf))
1312 eq_(self._type, res[0])
1313 eq_(self.length, res[1])
1314 eq_(self.interface_status, res[2])
1317 # tlv_length = type_len + length_len + value_len
1318 eq_(1 + 2 + 1, len(self.ins))
1320 def test_default_args(self):
1321 ins = cfm.interface_status_tlv()
1322 buf = ins.serialize()
1323 res = struct.unpack_from(cfm.interface_status_tlv._PACK_STR, six.binary_type(buf))
1324 eq_(res[0], cfm.CFM_INTERFACE_STATUS_TLV)
1329 class Test_ltm_egress_identifier_tlv(unittest.TestCase):
1332 self._type = cfm.CFM_LTM_EGRESS_IDENTIFIER_TLV
1334 self.egress_id_ui = 7
1335 self.egress_id_mac = "11:22:33:44:55:66"
1336 self.ins = cfm.ltm_egress_identifier_tlv(
1341 self.form = '!BHH6s'
1342 self.buf = struct.pack(
1347 addrconv.mac.text_to_bin(self.egress_id_mac)
1353 def test_init(self):
1354 eq_(self.length, self.ins.length)
1355 eq_(self.egress_id_ui, self.ins.egress_id_ui)
1356 eq_(self.egress_id_mac, self.ins.egress_id_mac)
1358 def test_parser(self):
1359 _res = cfm.ltm_egress_identifier_tlv.parser(self.buf)
1360 if type(_res) is tuple:
1364 eq_(self.length, res.length)
1365 eq_(self.egress_id_ui, res.egress_id_ui)
1366 eq_(self.egress_id_mac, res.egress_id_mac)
1368 def test_serialize(self):
1369 buf = self.ins.serialize()
1370 res = struct.unpack_from(self.form, six.binary_type(buf))
1371 eq_(self._type, res[0])
1372 eq_(self.length, res[1])
1373 eq_(self.egress_id_ui, res[2])
1374 eq_(addrconv.mac.text_to_bin(self.egress_id_mac), res[3])
1376 def test_serialize_with_length_zero(self):
1377 ins = cfm.ltm_egress_identifier_tlv(
1382 buf = ins.serialize()
1383 res = struct.unpack_from(self.form, six.binary_type(buf))
1384 eq_(self._type, res[0])
1385 eq_(self.length, res[1])
1386 eq_(self.egress_id_ui, res[2])
1387 eq_(addrconv.mac.text_to_bin(self.egress_id_mac), res[3])
1390 # tlv_length = type_len + length_len + value_len
1391 eq_(1 + 2 + 8, len(self.ins))
1393 def test_default_args(self):
1394 ins = cfm.ltm_egress_identifier_tlv()
1395 buf = ins.serialize()
1396 res = struct.unpack_from(
1397 cfm.ltm_egress_identifier_tlv._PACK_STR, six.binary_type(buf))
1398 eq_(res[0], cfm.CFM_LTM_EGRESS_IDENTIFIER_TLV)
1401 eq_(res[3], addrconv.mac.text_to_bin('00:00:00:00:00:00'))
1404 class Test_ltr_egress_identifier_tlv(unittest.TestCase):
1407 self._type = cfm.CFM_LTR_EGRESS_IDENTIFIER_TLV
1409 self.last_egress_id_ui = 7
1410 self.last_egress_id_mac = "11:22:33:44:55:66"
1411 self.next_egress_id_ui = 5
1412 self.next_egress_id_mac = "33:11:33:aa:bb:cc"
1413 self.ins = cfm.ltr_egress_identifier_tlv(self.length,
1414 self.last_egress_id_ui,
1415 self.last_egress_id_mac,
1416 self.next_egress_id_ui,
1417 self.next_egress_id_mac
1419 self.form = '!BHH6sH6s'
1420 self.buf = struct.pack(
1424 self.last_egress_id_ui,
1425 addrconv.mac.text_to_bin(self.last_egress_id_mac),
1426 self.next_egress_id_ui,
1427 addrconv.mac.text_to_bin(self.next_egress_id_mac))
1432 def test_init(self):
1433 eq_(self.length, self.ins.length)
1434 eq_(self.last_egress_id_ui, self.ins.last_egress_id_ui)
1435 eq_(self.last_egress_id_mac, self.ins.last_egress_id_mac)
1436 eq_(self.next_egress_id_ui, self.ins.next_egress_id_ui)
1437 eq_(self.next_egress_id_mac, self.ins.next_egress_id_mac)
1439 def test_parser(self):
1440 _res = cfm.ltr_egress_identifier_tlv.parser(self.buf)
1441 if type(_res) is tuple:
1445 eq_(self.length, res.length)
1446 eq_(self.last_egress_id_ui, res.last_egress_id_ui)
1447 eq_(self.last_egress_id_mac, res.last_egress_id_mac)
1448 eq_(self.next_egress_id_ui, res.next_egress_id_ui)
1449 eq_(self.next_egress_id_mac, res.next_egress_id_mac)
1451 def test_serialize(self):
1452 buf = self.ins.serialize()
1453 res = struct.unpack_from(self.form, six.binary_type(buf))
1454 eq_(self._type, res[0])
1455 eq_(self.length, res[1])
1456 eq_(self.last_egress_id_ui, res[2])
1457 eq_(addrconv.mac.text_to_bin(self.last_egress_id_mac), res[3])
1458 eq_(self.next_egress_id_ui, res[4])
1459 eq_(addrconv.mac.text_to_bin(self.next_egress_id_mac), res[5])
1461 def test_serialize_with_length_zero(self):
1462 ins = cfm.ltr_egress_identifier_tlv(0,
1463 self.last_egress_id_ui,
1464 self.last_egress_id_mac,
1465 self.next_egress_id_ui,
1466 self.next_egress_id_mac
1468 buf = ins.serialize()
1469 res = struct.unpack_from(self.form, six.binary_type(buf))
1470 eq_(self._type, res[0])
1471 eq_(self.length, res[1])
1472 eq_(self.last_egress_id_ui, res[2])
1473 eq_(addrconv.mac.text_to_bin(self.last_egress_id_mac), res[3])
1474 eq_(self.next_egress_id_ui, res[4])
1475 eq_(addrconv.mac.text_to_bin(self.next_egress_id_mac), res[5])
1478 # tlv_length = type_len + length_len + value_len
1479 eq_(1 + 2 + 16, len(self.ins))
1481 def test_default_args(self):
1482 ins = cfm.ltr_egress_identifier_tlv()
1483 buf = ins.serialize()
1484 res = struct.unpack_from(cfm.ltr_egress_identifier_tlv._PACK_STR,
1485 six.binary_type(buf))
1486 eq_(res[0], cfm.CFM_LTR_EGRESS_IDENTIFIER_TLV)
1489 eq_(res[3], addrconv.mac.text_to_bin('00:00:00:00:00:00'))
1491 eq_(res[5], addrconv.mac.text_to_bin('00:00:00:00:00:00'))
1494 class Test_organization_specific_tlv(unittest.TestCase):
1497 self._type = cfm.CFM_ORGANIZATION_SPECIFIC_TLV
1499 self.oui = b"\xff\x12\x34"
1501 self.value = b"\x01\x02\x0f\x0e\x0d\x0c"
1502 self.ins = cfm.organization_specific_tlv(self.length,
1507 self.form = '!BH3sB6s'
1508 self.buf = struct.pack(self.form,
1519 def test_init(self):
1520 eq_(self.length, self.ins.length)
1521 eq_(self.oui, self.ins.oui)
1522 eq_(self.subtype, self.ins.subtype)
1523 eq_(self.value, self.ins.value)
1525 def test_parser(self):
1526 _res = cfm.organization_specific_tlv.parser(self.buf)
1527 if type(_res) is tuple:
1531 eq_(self.length, res.length)
1532 eq_(self.oui, res.oui)
1533 eq_(self.subtype, res.subtype)
1534 eq_(self.value, res.value)
1536 def test_serialize(self):
1537 buf = self.ins.serialize()
1538 res = struct.unpack_from(self.form, six.binary_type(buf))
1539 eq_(self._type, res[0])
1540 eq_(self.length, res[1])
1541 eq_(self.oui, res[2])
1542 eq_(self.subtype, res[3])
1543 eq_(self.value, res[4])
1545 def test_serialize_with_zero(self):
1546 ins = cfm.organization_specific_tlv(0,
1551 buf = ins.serialize()
1552 res = struct.unpack_from(self.form, six.binary_type(buf))
1553 eq_(self._type, res[0])
1554 eq_(self.length, res[1])
1555 eq_(self.oui, res[2])
1556 eq_(self.subtype, res[3])
1557 eq_(self.value, res[4])
1560 # tlv_length = type_len + length_len + value_len
1561 eq_(1 + 2 + 10, len(self.ins))
1563 def test_default_args(self):
1564 ins = cfm.organization_specific_tlv()
1565 buf = ins.serialize()
1566 res = struct.unpack_from(cfm.organization_specific_tlv._PACK_STR,
1567 six.binary_type(buf))
1568 eq_(res[0], cfm.CFM_ORGANIZATION_SPECIFIC_TLV)
1570 eq_(res[2], b"\x00\x00\x00")
1574 class Test_reply_ingress_tlv(unittest.TestCase):
1577 self._type = cfm.CFM_REPLY_INGRESS_TLV
1580 self.mac_address = 'aa:bb:cc:56:34:12'
1581 self.port_id_length = 3
1582 self.port_id_subtype = 2
1583 self.port_id = b"\x01\x04\x09"
1584 self.ins = cfm.reply_ingress_tlv(self.length, self.action,
1586 self.port_id_length,
1587 self.port_id_subtype,
1590 self.form = '!BHB6sBB3s'
1591 self.buf = struct.pack(self.form,
1595 addrconv.mac.text_to_bin(self.mac_address),
1596 self.port_id_length,
1597 self.port_id_subtype,
1604 def test_init(self):
1605 eq_(self.length, self.ins.length)
1606 eq_(self.action, self.ins.action)
1607 eq_(self.mac_address, self.ins.mac_address)
1608 eq_(self.port_id_length, self.ins.port_id_length)
1609 eq_(self.port_id_subtype, self.ins.port_id_subtype)
1610 eq_(self.port_id, self.ins.port_id)
1612 def test_parser(self):
1613 _res = cfm.reply_ingress_tlv.parser(self.buf)
1614 if type(_res) is tuple:
1618 eq_(self.length, res.length)
1619 eq_(self.action, res.action)
1620 eq_(self.mac_address, res.mac_address)
1621 eq_(self.port_id_length, res.port_id_length)
1622 eq_(self.port_id_subtype, res.port_id_subtype)
1623 eq_(self.port_id, res.port_id)
1625 def test_serialize(self):
1626 buf = self.ins.serialize()
1627 res = struct.unpack_from(self.form, six.binary_type(buf))
1628 eq_(self._type, res[0])
1629 eq_(self.length, res[1])
1630 eq_(self.action, res[2])
1631 eq_(addrconv.mac.text_to_bin(self.mac_address), res[3])
1632 eq_(self.port_id_length, res[4])
1633 eq_(self.port_id_subtype, res[5])
1634 eq_(self.port_id, res[6])
1636 def test_serialize_with_zero(self):
1637 ins = cfm.reply_ingress_tlv(0,
1641 self.port_id_subtype,
1644 buf = ins.serialize()
1645 res = struct.unpack_from(self.form, six.binary_type(buf))
1646 eq_(self._type, res[0])
1647 eq_(self.length, res[1])
1648 eq_(self.action, res[2])
1649 eq_(addrconv.mac.text_to_bin(self.mac_address), res[3])
1650 eq_(self.port_id_length, res[4])
1651 eq_(self.port_id_subtype, res[5])
1652 eq_(self.port_id, res[6])
1655 # tlv_length = type_len + length_len + value_len
1656 eq_(1 + 2 + 12, len(self.ins))
1658 def test_default_args(self):
1659 ins = cfm.reply_ingress_tlv()
1660 buf = ins.serialize()
1661 res = struct.unpack_from(cfm.reply_ingress_tlv._PACK_STR, six.binary_type(buf))
1662 eq_(res[0], cfm.CFM_REPLY_INGRESS_TLV)
1665 eq_(res[3], addrconv.mac.text_to_bin('00:00:00:00:00:00'))
1668 class Test_reply_egress_tlv(unittest.TestCase):
1671 self._type = cfm.CFM_REPLY_EGRESS_TLV
1674 self.mac_address = 'aa:bb:cc:56:34:12'
1675 self.port_id_length = 3
1676 self.port_id_subtype = 2
1677 self.port_id = b"\x01\x04\x09"
1678 self.ins = cfm.reply_egress_tlv(self.length,
1681 self.port_id_length,
1682 self.port_id_subtype,
1685 self.form = '!BHB6sBB3s'
1686 self.buf = struct.pack(self.form,
1690 addrconv.mac.text_to_bin(self.mac_address),
1691 self.port_id_length,
1692 self.port_id_subtype,
1699 def test_init(self):
1700 eq_(self.length, self.ins.length)
1701 eq_(self.action, self.ins.action)
1702 eq_(self.mac_address, self.ins.mac_address)
1703 eq_(self.port_id_length, self.ins.port_id_length)
1704 eq_(self.port_id_subtype, self.ins.port_id_subtype)
1705 eq_(self.port_id, self.ins.port_id)
1707 def test_parser(self):
1708 _res = cfm.reply_ingress_tlv.parser(self.buf)
1709 if type(_res) is tuple:
1713 eq_(self.length, res.length)
1714 eq_(self.action, res.action)
1715 eq_(self.mac_address, res.mac_address)
1716 eq_(self.port_id_length, res.port_id_length)
1717 eq_(self.port_id_subtype, res.port_id_subtype)
1718 eq_(self.port_id, res.port_id)
1720 def test_serialize(self):
1721 buf = self.ins.serialize()
1722 res = struct.unpack_from(self.form, six.binary_type(buf))
1723 eq_(self._type, res[0])
1724 eq_(self.length, res[1])
1725 eq_(self.action, res[2])
1726 eq_(addrconv.mac.text_to_bin(self.mac_address), res[3])
1727 eq_(self.port_id_length, res[4])
1728 eq_(self.port_id_subtype, res[5])
1729 eq_(self.port_id, res[6])
1731 def test_serialize_with_zero(self):
1732 ins = cfm.reply_egress_tlv(0,
1736 self.port_id_subtype,
1739 buf = ins.serialize()
1740 res = struct.unpack_from(self.form, six.binary_type(buf))
1741 eq_(self._type, res[0])
1742 eq_(self.length, res[1])
1743 eq_(self.action, res[2])
1744 eq_(addrconv.mac.text_to_bin(self.mac_address), res[3])
1745 eq_(self.port_id_length, res[4])
1746 eq_(self.port_id_subtype, res[5])
1747 eq_(self.port_id, res[6])
1750 # tlv_length = type_len + length_len + value_len
1751 eq_(1 + 2 + 12, len(self.ins))
1753 def test_default_args(self):
1754 ins = cfm.reply_egress_tlv()
1755 buf = ins.serialize()
1756 res = struct.unpack_from(cfm.reply_egress_tlv._PACK_STR,
1757 six.binary_type(buf))
1758 eq_(res[0], cfm.CFM_REPLY_EGRESS_TLV)
1761 eq_(res[3], addrconv.mac.text_to_bin('00:00:00:00:00:00'))