1#!/usr/bin/env python3
2
3import unittest
4
5from framework import VppTestCase, VppTestRunner, running_extended_tests
6from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
7
8from scapy.contrib.geneve import GENEVE
9from scapy.packet import Raw
10from scapy.layers.l2 import Ether
11from scapy.layers.inet import IP, UDP
12from scapy.layers.vxlan import VXLAN
13from scapy.compat import raw
14
15
16class TestTracefilter(VppTestCase):
17    """ Packet Tracer Filter Test """
18
19    @classmethod
20    def setUpClass(cls):
21        super(TestTracefilter, cls).setUpClass()
22
23    @classmethod
24    def tearDownClass(cls):
25        super(TestTracefilter, cls).tearDownClass()
26
27    def setUp(self):
28        super(TestTracefilter, self).setUp()
29        self.create_pg_interfaces(range(1))
30        for i in self.pg_interfaces:
31            i.admin_up()
32            i.config_ip4()
33
34    def tearDown(self):
35        super(TestTracefilter, self).tearDown()
36        for i in self.pg_interfaces:
37            i.unconfig()
38            i.admin_down()
39
40    def cli(self, cmd):
41        r = self.vapi.cli_return_response(cmd)
42        if r.retval != 0:
43            if hasattr(r, 'reply'):
44                self.logger.info(cmd + " FAIL reply " + r.reply)
45            else:
46                self.logger.info(cmd + " FAIL retval " + str(r.retval))
47        return r
48
49    # check number of hits for classifier
50    def assert_hits(self, n):
51        r = self.cli("show classify table verbose 2")
52        self.assertTrue(r.retval == 0)
53        self.assertTrue(hasattr(r, 'reply'))
54        self.assertTrue(r.reply.find("hits %i" % n) != -1)
55
56    def test_mactime_unitTest(self):
57        """ Packet Tracer Filter Test """
58        cmds = ["loopback create",
59                "set int ip address loop0 192.168.1.1/24",
60                "set int state loop0 up",
61                "packet-generator new {\n"
62                " name classifyme\n"
63                " limit 100\n"
64                " size 300-300\n"
65                " interface loop0\n"
66                " node ethernet-input\n"
67                " data { \n"
68                "      IP4: 1.2.3 -> 4.5.6\n"
69                "      UDP: 192.168.1.10 - 192.168.1.20 -> 192.168.2.10\n"
70                "      UDP: 1234 -> 2345\n"
71                "      incrementing 286\n"
72                "     }\n"
73                "}\n",
74                "classify filter trace mask l3 ip4 src\n"
75                " match l3 ip4 src 192.168.1.15",
76                "trace add pg-input 100 filter",
77                "pa en classifyme"]
78
79        for cmd in cmds:
80            self.cli(cmd)
81
82        # Check for 9 classifier hits, which is the right answer
83        self.assert_hits(9)
84
85        # cleanup
86        self.cli("pa de classifyme")
87        self.cli("classify filter trace del mask l3 ip4 src "
88                 "match l3 ip4 src 192.168.1.15")
89
90    # install a classify rule, inject traffic and check for hits
91    def assert_classify(self, mask, match, packets, n=None):
92        r = self.cli(
93            "classify filter trace mask hex %s match hex %s" %
94            (mask, match))
95        self.assertTrue(r.retval == 0)
96        r = self.cli("trace add pg-input %i filter" % len(packets))
97        self.assertTrue(r.retval == 0)
98        self.pg0.add_stream(packets)
99        self.cli("pa en")
100        self.assert_hits(n if n is not None else len(packets))
101        self.cli("clear trace")
102        self.cli(
103            "classify filter trace del mask hex %s match hex %s" %
104            (mask, match))
105
106    def test_encap(self):
107        """ Packet Tracer Filter Test with encap """
108
109        # the packet we are trying to match
110        p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
111             IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) /
112             UDP() /
113             VXLAN() /
114             Ether() /
115             IP() /
116             UDP() /
117             GENEVE(vni=1234) /
118             Ether() /
119             IP(src='192.168.4.167') /
120             UDP() /
121             Raw('\xa5' * 100))
122
123        #
124        # compute filter mask & value
125        # we compute it by XOR'ing a template packet with a modified packet
126        # we need to set checksums to 0 to make sure scapy will not recompute
127        # them
128        #
129        tmpl = (Ether() /
130                IP(chksum=0) /
131                UDP(chksum=0) /
132                VXLAN() /
133                Ether() /
134                IP(chksum=0) /
135                UDP(chksum=0) /
136                GENEVE(vni=0) /
137                Ether() /
138                IP(src='0.0.0.0', chksum=0))
139        ori = raw(tmpl)
140
141        # the mask
142        tmpl[GENEVE].vni = 0xffffff
143        user = tmpl[GENEVE].payload
144        user[IP].src = '255.255.255.255'
145        new = raw(tmpl)
146        mask = "".join(("{:02x}".format(o ^ n) for o, n in zip(ori, new)))
147
148        # this does not match (wrong vni)
149        tmpl[GENEVE].vni = 1
150        user = tmpl[GENEVE].payload
151        user[IP].src = '192.168.4.167'
152        new = raw(tmpl)
153        match = "".join(("{:02x}".format(o ^ n) for o, n in zip(ori, new)))
154        self.assert_classify(mask, match, [p] * 11, 0)
155
156        # this must match
157        tmpl[GENEVE].vni = 1234
158        new = raw(tmpl)
159        match = "".join(("{:02x}".format(o ^ n) for o, n in zip(ori, new)))
160        self.assert_classify(mask, match, [p] * 17)
161
162
163if __name__ == '__main__':
164    unittest.main(testRunner=VppTestRunner)
165