backing up
[vsorcdistro/.git] / ryu / build / lib.linux-armv7l-2.7 / ryu / tests / unit / packet / test_ipv4.py
1 # Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #    http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 # vim: tabstop=4 shiftwidth=4 softtabstop=4
17
18 import unittest
19 import logging
20 import six
21 import struct
22 from struct import *
23 from nose.tools import *
24 from ryu.ofproto import ether, inet
25 from ryu.lib.packet import packet_utils
26 from ryu.lib.packet.ethernet import ethernet
27 from ryu.lib.packet.packet import Packet
28 from ryu.lib.packet.ipv4 import ipv4
29 from ryu.lib.packet.tcp import tcp
30 from ryu.lib import addrconv
31
32
33 LOG = logging.getLogger('test_ipv4')
34
35
36 class Test_ipv4(unittest.TestCase):
37     """ Test case for ipv4
38     """
39
40     version = 4
41     header_length = 5 + 10
42     ver_hlen = version << 4 | header_length
43     tos = 0
44     total_length = header_length + 64
45     identification = 30774
46     flags = 4
47     offset = 1480
48     flg_off = flags << 13 | offset
49     ttl = 64
50     proto = inet.IPPROTO_TCP
51     csum = 0xadc6
52     src = '131.151.32.21'
53     dst = '131.151.32.129'
54     length = header_length * 4
55     option = b'\x86\x28\x00\x00\x00\x01\x01\x22' \
56         + b'\x00\x01\xae\x00\x00\x00\x00\x00' \
57         + b'\x00\x00\x00\x00\x00\x00\x00\x00' \
58         + b'\x00\x00\x00\x00\x00\x00\x00\x00' \
59         + b'\x00\x00\x00\x00\x00\x00\x00\x01'
60
61     buf = pack(ipv4._PACK_STR, ver_hlen, tos, total_length, identification,
62                flg_off, ttl, proto, csum,
63                addrconv.ipv4.text_to_bin(src),
64                addrconv.ipv4.text_to_bin(dst)) \
65         + option
66
67     ip = ipv4(version, header_length, tos, total_length, identification,
68               flags, offset, ttl, proto, csum, src, dst, option)
69
70     def setUp(self):
71         pass
72
73     def tearDown(self):
74         pass
75
76     def test_init(self):
77         eq_(self.version, self.ip.version)
78         eq_(self.header_length, self.ip.header_length)
79         eq_(self.tos, self.ip.tos)
80         eq_(self.total_length, self.ip.total_length)
81         eq_(self.identification, self.ip.identification)
82         eq_(self.flags, self.ip.flags)
83         eq_(self.offset, self.ip.offset)
84         eq_(self.ttl, self.ip.ttl)
85         eq_(self.proto, self.ip.proto)
86         eq_(self.csum, self.ip.csum)
87         eq_(self.src, self.ip.src)
88         eq_(self.dst, self.ip.dst)
89         eq_(self.length, len(self.ip))
90         eq_(self.option, self.ip.option)
91
92     def test_parser(self):
93         res, ptype, _ = self.ip.parser(self.buf)
94
95         eq_(res.version, self.version)
96         eq_(res.header_length, self.header_length)
97         eq_(res.tos, self.tos)
98         eq_(res.total_length, self.total_length)
99         eq_(res.identification, self.identification)
100         eq_(res.flags, self.flags)
101         eq_(res.offset, self.offset)
102         eq_(res.ttl, self.ttl)
103         eq_(res.proto, self.proto)
104         eq_(res.csum, self.csum)
105         eq_(res.src, self.src)
106         eq_(res.dst, self.dst)
107         eq_(ptype, tcp)
108
109     def test_serialize(self):
110         buf = self.ip.serialize(bytearray(), None)
111         res = struct.unpack_from(ipv4._PACK_STR, six.binary_type(buf))
112         option = buf[ipv4._MIN_LEN:ipv4._MIN_LEN + len(self.option)]
113
114         eq_(res[0], self.ver_hlen)
115         eq_(res[1], self.tos)
116         eq_(res[2], self.total_length)
117         eq_(res[3], self.identification)
118         eq_(res[4], self.flg_off)
119         eq_(res[5], self.ttl)
120         eq_(res[6], self.proto)
121         eq_(res[8], addrconv.ipv4.text_to_bin(self.src))
122         eq_(res[9], addrconv.ipv4.text_to_bin(self.dst))
123         eq_(option, self.option)
124
125         # checksum
126         csum = packet_utils.checksum(buf)
127         eq_(csum, 0)
128
129     @raises(Exception)
130     def test_malformed_ipv4(self):
131         m_short_buf = self.buf[1:ipv4._MIN_LEN]
132         ipv4.parser(m_short_buf)
133
134     def test_json(self):
135         jsondict = self.ip.to_jsondict()
136         ip = ipv4.from_jsondict(jsondict['ipv4'])
137         eq_(str(self.ip), str(ip))