backing up
[vsorcdistro/.git] / ryu / build / lib.linux-armv7l-2.7 / ryu / tests / unit / packet / test_tcp.py
1 # Copyright (C) 2012-2015 Nippon Telegraph and Telephone Corporation.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #    http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 # vim: tabstop=4 shiftwidth=4 softtabstop=4
17
18 import unittest
19 import logging
20 import six
21 import struct
22 from struct import *
23 from nose.tools import *
24 from ryu.ofproto import inet
25 from ryu.lib.packet import tcp
26 from ryu.lib.packet.ipv4 import ipv4
27 from ryu.lib.packet import packet_utils
28 from ryu.lib import addrconv
29
30
31 LOG = logging.getLogger('test_tcp')
32
33
34 class Test_tcp(unittest.TestCase):
35     """ Test case for tcp
36     """
37     src_port = 6431
38     dst_port = 8080
39     seq = 5
40     ack = 1
41     offset = 6
42     bits = 0b101010
43     window_size = 2048
44     csum = 12345
45     urgent = 128
46     option = b'\x01\x02\x03\x04'
47
48     t = tcp.tcp(src_port, dst_port, seq, ack, offset, bits,
49                 window_size, csum, urgent, option)
50
51     buf = pack(tcp.tcp._PACK_STR, src_port, dst_port, seq, ack,
52                offset << 4, bits, window_size, csum, urgent)
53     buf += option
54
55     def setUp(self):
56         pass
57
58     def tearDown(self):
59         pass
60
61     def test_init(self):
62         eq_(self.src_port, self.t.src_port)
63         eq_(self.dst_port, self.t.dst_port)
64         eq_(self.seq, self.t.seq)
65         eq_(self.ack, self.t.ack)
66         eq_(self.offset, self.t.offset)
67         eq_(self.bits, self.t.bits)
68         eq_(self.window_size, self.t.window_size)
69         eq_(self.csum, self.t.csum)
70         eq_(self.urgent, self.t.urgent)
71         eq_(self.option, self.t.option)
72
73     def test_parser(self):
74         r1, r2, _ = self.t.parser(self.buf)
75
76         eq_(self.src_port, r1.src_port)
77         eq_(self.dst_port, r1.dst_port)
78         eq_(self.seq, r1.seq)
79         eq_(self.ack, r1.ack)
80         eq_(self.offset, r1.offset)
81         eq_(self.bits, r1.bits)
82         eq_(self.window_size, r1.window_size)
83         eq_(self.csum, r1.csum)
84         eq_(self.urgent, r1.urgent)
85         eq_(self.option, r1.option)
86         eq_(None, r2)
87
88     def test_serialize(self):
89         offset = 5
90         csum = 0
91
92         src_ip = '192.168.10.1'
93         dst_ip = '192.168.100.1'
94         prev = ipv4(4, 5, 0, 0, 0, 0, 0, 64,
95                     inet.IPPROTO_TCP, 0, src_ip, dst_ip)
96
97         t = tcp.tcp(self.src_port, self.dst_port, self.seq, self.ack,
98                     offset, self.bits, self.window_size, csum, self.urgent)
99         buf = t.serialize(bytearray(), prev)
100         res = struct.unpack(tcp.tcp._PACK_STR, six.binary_type(buf))
101
102         eq_(res[0], self.src_port)
103         eq_(res[1], self.dst_port)
104         eq_(res[2], self.seq)
105         eq_(res[3], self.ack)
106         eq_(res[4], offset << 4)
107         eq_(res[5], self.bits)
108         eq_(res[6], self.window_size)
109         eq_(res[8], self.urgent)
110
111         # test __len__
112         # offset indicates the number of 32 bit (= 4 bytes)
113         # words in the TCP Header.
114         # So, we compare len(tcp) with offset * 4, here.
115         eq_(offset * 4, len(t))
116
117         # checksum
118         ph = struct.pack('!4s4sBBH',
119                          addrconv.ipv4.text_to_bin(src_ip),
120                          addrconv.ipv4.text_to_bin(dst_ip), 0, 6, offset * 4)
121         d = ph + buf
122         s = packet_utils.checksum(d)
123         eq_(0, s)
124
125     def test_serialize_option(self):
126         # prepare test data
127         offset = 0
128         csum = 0
129         option = [
130             tcp.TCPOptionMaximumSegmentSize(max_seg_size=1460),
131             tcp.TCPOptionSACKPermitted(),
132             tcp.TCPOptionTimestamps(ts_val=287454020, ts_ecr=1432778632),
133             tcp.TCPOptionNoOperation(),
134             tcp.TCPOptionWindowScale(shift_cnt=9),
135         ]
136         option_buf = (
137             b'\x02\x04\x05\xb4'
138             b'\x04\x02'
139             b'\x08\x0a\x11\x22\x33\x44\x55\x66\x77\x88'
140             b'\x01'
141             b'\x03\x03\x09'
142         )
143         prev = ipv4(4, 5, 0, 0, 0, 0, 0, 64,
144                     inet.IPPROTO_TCP, 0, '192.168.10.1', '192.168.100.1')
145
146         # test serializer
147         t = tcp.tcp(self.src_port, self.dst_port, self.seq, self.ack,
148                     offset, self.bits, self.window_size, csum, self.urgent,
149                     option)
150         buf = t.serialize(bytearray(), prev)
151         r_option_buf = buf[tcp.tcp._MIN_LEN:tcp.tcp._MIN_LEN + len(option_buf)]
152         eq_(option_buf, r_option_buf)
153
154         # test parser
155         (r_tcp, _, _) = tcp.tcp.parser(buf)
156         eq_(str(option), str(r_tcp.option))
157
158     @raises(Exception)
159     def test_malformed_tcp(self):
160         m_short_buf = self.buf[1:tcp.tcp._MIN_LEN]
161         tcp.tcp.parser(m_short_buf)
162
163     def test_default_args(self):
164         prev = ipv4(proto=inet.IPPROTO_TCP)
165         t = tcp.tcp()
166         buf = t.serialize(bytearray(), prev)
167         res = struct.unpack(tcp.tcp._PACK_STR, buf)
168
169         eq_(res[0], 1)
170         eq_(res[1], 1)
171         eq_(res[2], 0)
172         eq_(res[3], 0)
173         eq_(res[4], 5 << 4)
174         eq_(res[5], 0)
175         eq_(res[6], 0)
176         eq_(res[8], 0)
177
178         # with option, without offset
179         t = tcp.tcp(option=[tcp.TCPOptionMaximumSegmentSize(1460)])
180         buf = t.serialize(bytearray(), prev)
181         res = struct.unpack(tcp.tcp._PACK_STR + '4s', buf)
182
183         eq_(res[0], 1)
184         eq_(res[1], 1)
185         eq_(res[2], 0)
186         eq_(res[3], 0)
187         eq_(res[4], 6 << 4)
188         eq_(res[5], 0)
189         eq_(res[6], 0)
190         eq_(res[8], 0)
191         eq_(res[9], b'\x02\x04\x05\xb4')
192
193         # with option, with long offset
194         t = tcp.tcp(offset=7, option=[tcp.TCPOptionWindowScale(shift_cnt=9)])
195         buf = t.serialize(bytearray(), prev)
196         res = struct.unpack(tcp.tcp._PACK_STR + '8s', buf)
197
198         eq_(res[0], 1)
199         eq_(res[1], 1)
200         eq_(res[2], 0)
201         eq_(res[3], 0)
202         eq_(res[4], 7 << 4)
203         eq_(res[5], 0)
204         eq_(res[6], 0)
205         eq_(res[8], 0)
206         eq_(res[9], b'\x03\x03\x09\x00\x00\x00\x00\x00')
207
208     def test_json(self):
209         jsondict = self.t.to_jsondict()
210         t = tcp.tcp.from_jsondict(jsondict['tcp'])
211         eq_(str(self.t), str(t))
212
213
214 class Test_TCPOption(unittest.TestCase):
215     # prepare test data
216     input_options = [
217         tcp.TCPOptionEndOfOptionList(),
218         tcp.TCPOptionNoOperation(),
219         tcp.TCPOptionMaximumSegmentSize(max_seg_size=1460),
220         tcp.TCPOptionWindowScale(shift_cnt=9),
221         tcp.TCPOptionSACKPermitted(),
222         tcp.TCPOptionSACK(blocks=[(1, 2), (3, 4)], length=18),
223         tcp.TCPOptionTimestamps(ts_val=287454020, ts_ecr=1432778632),
224         tcp.TCPOptionUserTimeout(granularity=1, user_timeout=564),
225         tcp.TCPOptionAuthentication(
226             key_id=1, r_next_key_id=2,
227             mac=b'abcdefghijkl', length=16),
228         tcp.TCPOptionUnknown(value=b'foobar', kind=255, length=8),
229         tcp.TCPOptionUnknown(value=b'', kind=255, length=2),
230     ]
231     input_buf = (
232         b'\x00'  # End of Option List
233         b'\x01'  # No-Operation
234         b'\x02\x04\x05\xb4'  # Maximum Segment Size
235         b'\x03\x03\x09'  # Window Scale
236         b'\x04\x02'  # SACK Permitted
237         b'\x05\x12'  # SACK
238         b'\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03\x00\x00\x00\x04'
239         b'\x08\x0a'  # Timestamps
240         b'\x11\x22\x33\x44\x55\x66\x77\x88'
241         b'\x1c\x04\x82\x34'  # User Timeout Option
242         b'\x1d\x10\x01\x02'  # TCP Authentication Option (TCP-AO)
243         b'abcdefghijkl'
244         b'\xff\x08'  # Unknown with body
245         b'foobar'
246         b'\xff\x02'  # Unknown
247     )
248
249     def test_serialize(self):
250         output_buf = bytearray()
251         for option in self.input_options:
252             output_buf += option.serialize()
253         eq_(self.input_buf, output_buf)
254
255     def test_parser(self):
256         buf = self.input_buf
257         output_options = []
258         while buf:
259             opt, buf = tcp.TCPOption.parser(buf)
260             output_options.append(opt)
261         eq_(str(self.input_options), str(output_options))
262
263     def test_json(self):
264         for option in self.input_options:
265             json_dict = option.to_jsondict()[option.__class__.__name__]
266             output_option = option.__class__.from_jsondict(json_dict)
267             eq_(str(option), str(output_option))