test_cdp.py revision ead1e536
1#!/usr/bin/env python3
2""" CDP tests """
3
4from scapy.packet import Packet
5from scapy.all import ShortField, StrField
6from scapy.layers.l2 import Dot3, LLC, SNAP
7from scapy.contrib.cdp import CDPMsgDeviceID, CDPMsgSoftwareVersion, \
8        CDPMsgPlatform, CDPMsgPortID, CDPv2_HDR
9
10from framework import VppTestCase
11from scapy.all import raw
12from re import compile
13from time import sleep
14from util import ppp
15import platform
16import sys
17import unittest
18
19
20""" TestCDP is a subclass of  VPPTestCase classes.
21
22CDP test.
23
24"""
25
26
27class CustomTLV(Packet):
28    """ Custom TLV protocol layer for scapy """
29
30    fields_desc = [
31        ShortField("type", 0),
32        ShortField("length", 4),
33        StrField("value", "")
34
35    ]
36
37
38class TestCDP(VppTestCase):
39    """ CDP Test Case """
40
41    nen_ptr = compile(r"not enabled")
42    cdp_ptr = compile(r"^([-\.\w]+)\s+([-\.\w]+)\s+([-\.\w]+)\s+([-\.\w]+)$")
43    err_ptr = compile(r"^([\d]+)\s+([-\w]+)\s+([ -\.\w)(]+)$")
44
45    @property
46    def device_id(self):
47        return platform.node()
48
49    @property
50    def version(self):
51        return platform.release()
52
53    @property
54    def port_id(self):
55        return self.interface.name
56
57    @property
58    def platform(self):
59        return platform.system()
60
61    @classmethod
62    def setUpClass(cls):
63        super(TestCDP, cls).setUpClass()
64        try:
65            cls.create_pg_interfaces(range(1))
66            cls.interface = cls.pg_interfaces[0]
67
68            cls.interface.admin_up()
69            cls.interface.config_ip4()
70            cls.interface.resolve_arp()
71
72        except Exception:
73            super(TestCDP, cls).tearDownClass()
74            raise
75
76    @classmethod
77    def tearDownClass(cls):
78        super(TestCDP, cls).tearDownClass()
79
80    def test_enable_cdp(self):
81        self.logger.info(self.vapi.cdp_enable_disable(enable_disable=1))
82        ret = self.vapi.cli("show cdp")
83        self.logger.info(ret)
84        not_enabled = self.nen_ptr.search(ret)
85        self.assertFalse(not_enabled, "CDP isn't enabled")
86
87    def test_send_cdp_packet(self):
88        self.logger.info(self.vapi.cdp_enable_disable(enable_disable=1))
89        self.send_packet(self.create_packet())
90
91        neighbors = list(self.show_cdp())
92        self.assertTrue(neighbors, "CDP didn't register neighbor")
93
94        port, system = neighbors[0]
95        length = min(len(system), len(self.device_id))
96
97        self.assert_equal(port, self.port_id, "CDP received invalid port id")
98        self.assert_equal(system[:length], self.device_id[:length],
99                          "CDP received invalid device id")
100
101    def test_cdp_underflow_tlv(self):
102        self.send_bad_packet(3, ".")
103
104    def test_cdp_overflow_tlv(self):
105        self.send_bad_packet(8, ".")
106
107    def send_bad_packet(self, l, v):
108        self.logger.info(self.vapi.cdp_enable_disable(enable_disable=1))
109        self.send_packet(self.create_bad_packet(l, v))
110
111        errors = list(self.show_errors())
112        self.assertTrue(errors)
113
114        expected_errors = False
115        for count, node, reason in errors:
116            if (node == u'cdp-input' and
117                    reason == u'cdp packets with bad TLVs' and
118                    int(count) >= 1):
119
120                expected_errors = True
121                break
122        self.assertTrue(expected_errors, "CDP didn't drop bad packet")
123
124    def send_packet(self, packet):
125        self.logger.debug(ppp("Sending packet:", packet))
126        self.interface.add_stream(packet)
127        self.pg_start()
128
129    def create_base_packet(self):
130        packet = (Dot3(src=self.interface.remote_mac,
131                       dst="01:00:0c:cc:cc:cc") /
132                  LLC(dsap=0xaa, ssap=0xaa, ctrl=0x03) /
133                  SNAP()/CDPv2_HDR())
134        return packet
135
136    def create_packet(self):
137        packet = (self.create_base_packet() /
138                  CDPMsgDeviceID(val=self.device_id) /
139                  CDPMsgSoftwareVersion(val=self.version) /
140                  CDPMsgPortID(iface=self.port_id) /
141                  CDPMsgPlatform(val=self.platform))
142        return packet
143
144    def create_bad_packet(self, tl=4, tv=""):
145        packet = (self.create_base_packet() /
146                  CustomTLV(type=1,
147                            length=tl,
148                            value=tv))
149        return packet
150
151    def process_cli(self, exp, ptr):
152        for line in self.vapi.cli(exp).split('\n')[1:]:
153            m = ptr.match(line.strip())
154            if m:
155                yield m.groups()
156
157    def show_cdp(self):
158        for pack in self.process_cli("show cdp", self.cdp_ptr):
159            try:
160                port, system, _, _ = pack
161            except ValueError:
162                pass
163            else:
164                yield port, system
165
166    def show_errors(self):
167        for pack in self.process_cli("show errors", self.err_ptr):
168            try:
169                count, node, reason = pack
170            except ValueError:
171                pass
172            else:
173                yield count, node, reason
174