1 # Copyright (C) 2012-2015 Nippon Telegraph and Telephone Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 # vim: tabstop=4 shiftwidth=4 softtabstop=4
23 from nose.tools import *
24 from ryu.ofproto import inet
25 from ryu.lib.packet import tcp
26 from ryu.lib.packet.ipv4 import ipv4
27 from ryu.lib.packet import packet_utils
28 from ryu.lib import addrconv
31 LOG = logging.getLogger('test_tcp')
34 class Test_tcp(unittest.TestCase):
46 option = b'\x01\x02\x03\x04'
48 t = tcp.tcp(src_port, dst_port, seq, ack, offset, bits,
49 window_size, csum, urgent, option)
51 buf = pack(tcp.tcp._PACK_STR, src_port, dst_port, seq, ack,
52 offset << 4, bits, window_size, csum, urgent)
62 eq_(self.src_port, self.t.src_port)
63 eq_(self.dst_port, self.t.dst_port)
64 eq_(self.seq, self.t.seq)
65 eq_(self.ack, self.t.ack)
66 eq_(self.offset, self.t.offset)
67 eq_(self.bits, self.t.bits)
68 eq_(self.window_size, self.t.window_size)
69 eq_(self.csum, self.t.csum)
70 eq_(self.urgent, self.t.urgent)
71 eq_(self.option, self.t.option)
73 def test_parser(self):
74 r1, r2, _ = self.t.parser(self.buf)
76 eq_(self.src_port, r1.src_port)
77 eq_(self.dst_port, r1.dst_port)
80 eq_(self.offset, r1.offset)
81 eq_(self.bits, r1.bits)
82 eq_(self.window_size, r1.window_size)
83 eq_(self.csum, r1.csum)
84 eq_(self.urgent, r1.urgent)
85 eq_(self.option, r1.option)
88 def test_serialize(self):
92 src_ip = '192.168.10.1'
93 dst_ip = '192.168.100.1'
94 prev = ipv4(4, 5, 0, 0, 0, 0, 0, 64,
95 inet.IPPROTO_TCP, 0, src_ip, dst_ip)
97 t = tcp.tcp(self.src_port, self.dst_port, self.seq, self.ack,
98 offset, self.bits, self.window_size, csum, self.urgent)
99 buf = t.serialize(bytearray(), prev)
100 res = struct.unpack(tcp.tcp._PACK_STR, six.binary_type(buf))
102 eq_(res[0], self.src_port)
103 eq_(res[1], self.dst_port)
104 eq_(res[2], self.seq)
105 eq_(res[3], self.ack)
106 eq_(res[4], offset << 4)
107 eq_(res[5], self.bits)
108 eq_(res[6], self.window_size)
109 eq_(res[8], self.urgent)
112 # offset indicates the number of 32 bit (= 4 bytes)
113 # words in the TCP Header.
114 # So, we compare len(tcp) with offset * 4, here.
115 eq_(offset * 4, len(t))
118 ph = struct.pack('!4s4sBBH',
119 addrconv.ipv4.text_to_bin(src_ip),
120 addrconv.ipv4.text_to_bin(dst_ip), 0, 6, offset * 4)
122 s = packet_utils.checksum(d)
125 def test_serialize_option(self):
130 tcp.TCPOptionMaximumSegmentSize(max_seg_size=1460),
131 tcp.TCPOptionSACKPermitted(),
132 tcp.TCPOptionTimestamps(ts_val=287454020, ts_ecr=1432778632),
133 tcp.TCPOptionNoOperation(),
134 tcp.TCPOptionWindowScale(shift_cnt=9),
139 b'\x08\x0a\x11\x22\x33\x44\x55\x66\x77\x88'
143 prev = ipv4(4, 5, 0, 0, 0, 0, 0, 64,
144 inet.IPPROTO_TCP, 0, '192.168.10.1', '192.168.100.1')
147 t = tcp.tcp(self.src_port, self.dst_port, self.seq, self.ack,
148 offset, self.bits, self.window_size, csum, self.urgent,
150 buf = t.serialize(bytearray(), prev)
151 r_option_buf = buf[tcp.tcp._MIN_LEN:tcp.tcp._MIN_LEN + len(option_buf)]
152 eq_(option_buf, r_option_buf)
155 (r_tcp, _, _) = tcp.tcp.parser(buf)
156 eq_(str(option), str(r_tcp.option))
159 def test_malformed_tcp(self):
160 m_short_buf = self.buf[1:tcp.tcp._MIN_LEN]
161 tcp.tcp.parser(m_short_buf)
163 def test_default_args(self):
164 prev = ipv4(proto=inet.IPPROTO_TCP)
166 buf = t.serialize(bytearray(), prev)
167 res = struct.unpack(tcp.tcp._PACK_STR, buf)
178 # with option, without offset
179 t = tcp.tcp(option=[tcp.TCPOptionMaximumSegmentSize(1460)])
180 buf = t.serialize(bytearray(), prev)
181 res = struct.unpack(tcp.tcp._PACK_STR + '4s', buf)
191 eq_(res[9], b'\x02\x04\x05\xb4')
193 # with option, with long offset
194 t = tcp.tcp(offset=7, option=[tcp.TCPOptionWindowScale(shift_cnt=9)])
195 buf = t.serialize(bytearray(), prev)
196 res = struct.unpack(tcp.tcp._PACK_STR + '8s', buf)
206 eq_(res[9], b'\x03\x03\x09\x00\x00\x00\x00\x00')
209 jsondict = self.t.to_jsondict()
210 t = tcp.tcp.from_jsondict(jsondict['tcp'])
211 eq_(str(self.t), str(t))
214 class Test_TCPOption(unittest.TestCase):
217 tcp.TCPOptionEndOfOptionList(),
218 tcp.TCPOptionNoOperation(),
219 tcp.TCPOptionMaximumSegmentSize(max_seg_size=1460),
220 tcp.TCPOptionWindowScale(shift_cnt=9),
221 tcp.TCPOptionSACKPermitted(),
222 tcp.TCPOptionSACK(blocks=[(1, 2), (3, 4)], length=18),
223 tcp.TCPOptionTimestamps(ts_val=287454020, ts_ecr=1432778632),
224 tcp.TCPOptionUserTimeout(granularity=1, user_timeout=564),
225 tcp.TCPOptionAuthentication(
226 key_id=1, r_next_key_id=2,
227 mac=b'abcdefghijkl', length=16),
228 tcp.TCPOptionUnknown(value=b'foobar', kind=255, length=8),
229 tcp.TCPOptionUnknown(value=b'', kind=255, length=2),
232 b'\x00' # End of Option List
233 b'\x01' # No-Operation
234 b'\x02\x04\x05\xb4' # Maximum Segment Size
235 b'\x03\x03\x09' # Window Scale
236 b'\x04\x02' # SACK Permitted
238 b'\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03\x00\x00\x00\x04'
239 b'\x08\x0a' # Timestamps
240 b'\x11\x22\x33\x44\x55\x66\x77\x88'
241 b'\x1c\x04\x82\x34' # User Timeout Option
242 b'\x1d\x10\x01\x02' # TCP Authentication Option (TCP-AO)
244 b'\xff\x08' # Unknown with body
246 b'\xff\x02' # Unknown
249 def test_serialize(self):
250 output_buf = bytearray()
251 for option in self.input_options:
252 output_buf += option.serialize()
253 eq_(self.input_buf, output_buf)
255 def test_parser(self):
259 opt, buf = tcp.TCPOption.parser(buf)
260 output_options.append(opt)
261 eq_(str(self.input_options), str(output_options))
264 for option in self.input_options:
265 json_dict = option.to_jsondict()[option.__class__.__name__]
266 output_option = option.__class__.from_jsondict(json_dict)
267 eq_(str(option), str(output_option))