test_abf.py revision 2f8cd914
1#!/usr/bin/env python3
2
3from socket import inet_pton, inet_ntop, AF_INET, AF_INET6
4import unittest
5
6from framework import VppTestCase, VppTestRunner
7from vpp_ip import DpoProto
8from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsLabel, \
9    VppIpTable, FibPathProto
10from vpp_acl import AclRule, VppAcl
11
12from scapy.packet import Raw
13from scapy.layers.l2 import Ether
14from scapy.layers.inet import IP, UDP
15from scapy.layers.inet6 import IPv6
16from ipaddress import IPv4Network, IPv6Network
17
18from vpp_object import VppObject
19
20NUM_PKTS = 67
21
22
23def find_abf_policy(test, id):
24    policies = test.vapi.abf_policy_dump()
25    for p in policies:
26        if id == p.policy.policy_id:
27            return True
28    return False
29
30
31def find_abf_itf_attach(test, id, sw_if_index):
32    attachs = test.vapi.abf_itf_attach_dump()
33    for a in attachs:
34        if id == a.attach.policy_id and \
35           sw_if_index == a.attach.sw_if_index:
36            return True
37    return False
38
39
40class VppAbfPolicy(VppObject):
41
42    def __init__(self,
43                 test,
44                 policy_id,
45                 acl,
46                 paths):
47        self._test = test
48        self.policy_id = policy_id
49        self.acl = acl
50        self.paths = paths
51        self.encoded_paths = []
52        for path in self.paths:
53            self.encoded_paths.append(path.encode())
54
55    def add_vpp_config(self):
56        self._test.vapi.abf_policy_add_del(
57            1,
58            {'policy_id': self.policy_id,
59             'acl_index': self.acl.acl_index,
60             'n_paths': len(self.paths),
61             'paths': self.encoded_paths})
62        self._test.registry.register(self, self._test.logger)
63
64    def remove_vpp_config(self):
65        self._test.vapi.abf_policy_add_del(
66            0,
67            {'policy_id': self.policy_id,
68             'acl_index': self.acl.acl_index,
69             'n_paths': len(self.paths),
70             'paths': self.encoded_paths})
71
72    def query_vpp_config(self):
73        return find_abf_policy(self._test, self.policy_id)
74
75    def object_id(self):
76        return ("abf-policy-%d" % self.policy_id)
77
78
79class VppAbfAttach(VppObject):
80
81    def __init__(self,
82                 test,
83                 policy_id,
84                 sw_if_index,
85                 priority,
86                 is_ipv6=0):
87        self._test = test
88        self.policy_id = policy_id
89        self.sw_if_index = sw_if_index
90        self.priority = priority
91        self.is_ipv6 = is_ipv6
92
93    def add_vpp_config(self):
94        self._test.vapi.abf_itf_attach_add_del(
95            1,
96            {'policy_id': self.policy_id,
97             'sw_if_index': self.sw_if_index,
98             'priority': self.priority,
99             'is_ipv6': self.is_ipv6})
100        self._test.registry.register(self, self._test.logger)
101
102    def remove_vpp_config(self):
103        self._test.vapi.abf_itf_attach_add_del(
104            0,
105            {'policy_id': self.policy_id,
106             'sw_if_index': self.sw_if_index,
107             'priority': self.priority,
108             'is_ipv6': self.is_ipv6})
109
110    def query_vpp_config(self):
111        return find_abf_itf_attach(self._test,
112                                   self.policy_id,
113                                   self.sw_if_index)
114
115    def object_id(self):
116        return ("abf-attach-%d-%d" % (self.policy_id, self.sw_if_index))
117
118
119class TestAbf(VppTestCase):
120    """ ABF Test Case """
121
122    @classmethod
123    def setUpClass(cls):
124        super(TestAbf, cls).setUpClass()
125
126    @classmethod
127    def tearDownClass(cls):
128        super(TestAbf, cls).tearDownClass()
129
130    def setUp(self):
131        super(TestAbf, self).setUp()
132
133        self.create_pg_interfaces(range(5))
134
135        for i in self.pg_interfaces[:4]:
136            i.admin_up()
137            i.config_ip4()
138            i.resolve_arp()
139            i.config_ip6()
140            i.resolve_ndp()
141
142    def tearDown(self):
143        for i in self.pg_interfaces:
144            i.unconfig_ip4()
145            i.unconfig_ip6()
146            i.admin_down()
147        super(TestAbf, self).tearDown()
148
149    def test_abf4(self):
150        """ IPv4 ACL Based Forwarding
151        """
152
153        #
154        # We are not testing the various matching capabilities
155        # of ACLs, that's done elsewhere. Here ware are testing
156        # the application of ACLs to a forwarding path to achieve
157        # ABF
158        # So we construct just a few ACLs to ensure the ABF policies
159        # are correctly constructed and used. And a few path types
160        # to test the API path decoding.
161        #
162
163        #
164        # Rule 1
165        #
166        rule_1 = AclRule(is_permit=1, proto=17, ports=1234,
167                         src_prefix=IPv4Network("1.1.1.1/32"),
168                         dst_prefix=IPv4Network("1.1.1.2/32"))
169        acl_1 = VppAcl(self, rules=[rule_1])
170        acl_1.add_vpp_config()
171
172        #
173        # ABF policy for ACL 1 - path via interface 1
174        #
175        abf_1 = VppAbfPolicy(self, 10, acl_1,
176                             [VppRoutePath(self.pg1.remote_ip4,
177                                           self.pg1.sw_if_index)])
178        abf_1.add_vpp_config()
179
180        #
181        # Attach the policy to input interface Pg0
182        #
183        attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index, 50)
184        attach_1.add_vpp_config()
185
186        #
187        # fire in packet matching the ACL src,dst. If it's forwarded
188        # then the ABF was successful, since default routing will drop it
189        #
190        p_1 = (Ether(src=self.pg0.remote_mac,
191                     dst=self.pg0.local_mac) /
192               IP(src="1.1.1.1", dst="1.1.1.2") /
193               UDP(sport=1234, dport=1234) /
194               Raw(b'\xa5' * 100))
195        self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg1)
196
197        #
198        # Attach a 'better' priority policy to the same interface
199        #
200        abf_2 = VppAbfPolicy(self, 11, acl_1,
201                             [VppRoutePath(self.pg2.remote_ip4,
202                                           self.pg2.sw_if_index)])
203        abf_2.add_vpp_config()
204        attach_2 = VppAbfAttach(self, 11, self.pg0.sw_if_index, 40)
205        attach_2.add_vpp_config()
206
207        self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg2)
208
209        #
210        # Attach a policy with priority in the middle
211        #
212        abf_3 = VppAbfPolicy(self, 12, acl_1,
213                             [VppRoutePath(self.pg3.remote_ip4,
214                                           self.pg3.sw_if_index)])
215        abf_3.add_vpp_config()
216        attach_3 = VppAbfAttach(self, 12, self.pg0.sw_if_index, 45)
217        attach_3.add_vpp_config()
218
219        self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg2)
220
221        #
222        # remove the best priority
223        #
224        attach_2.remove_vpp_config()
225        self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg3)
226
227        #
228        # Attach one of the same policies to Pg1
229        #
230        attach_4 = VppAbfAttach(self, 12, self.pg1.sw_if_index, 45)
231        attach_4.add_vpp_config()
232
233        p_2 = (Ether(src=self.pg1.remote_mac,
234                     dst=self.pg1.local_mac) /
235               IP(src="1.1.1.1", dst="1.1.1.2") /
236               UDP(sport=1234, dport=1234) /
237               Raw(b'\xa5' * 100))
238        self.send_and_expect(self.pg1, p_2 * NUM_PKTS, self.pg3)
239
240        #
241        # detach the policy from PG1, now expect traffic to be dropped
242        #
243        attach_4.remove_vpp_config()
244
245        self.send_and_assert_no_replies(self.pg1, p_2 * NUM_PKTS, "Detached")
246
247        #
248        # Swap to route via a next-hop in the non-default table
249        #
250        table_20 = VppIpTable(self, 20)
251        table_20.add_vpp_config()
252
253        self.pg4.set_table_ip4(table_20.table_id)
254        self.pg4.admin_up()
255        self.pg4.config_ip4()
256        self.pg4.resolve_arp()
257
258        abf_13 = VppAbfPolicy(self, 13, acl_1,
259                              [VppRoutePath(self.pg4.remote_ip4,
260                                            0xffffffff,
261                                            nh_table_id=table_20.table_id)])
262        abf_13.add_vpp_config()
263        attach_5 = VppAbfAttach(self, 13, self.pg0.sw_if_index, 30)
264        attach_5.add_vpp_config()
265
266        self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg4)
267
268        self.pg4.unconfig_ip4()
269        self.pg4.set_table_ip4(0)
270
271    def test_abf6(self):
272        """ IPv6 ACL Based Forwarding
273        """
274
275        #
276        # Simple test for matching IPv6 packets
277        #
278
279        #
280        # Rule 1
281        #
282        rule_1 = AclRule(is_permit=1, proto=17, ports=1234,
283                         src_prefix=IPv6Network("2001::2/128"),
284                         dst_prefix=IPv6Network("2001::1/128"))
285        acl_1 = VppAcl(self, rules=[rule_1])
286        acl_1.add_vpp_config()
287
288        #
289        # ABF policy for ACL 1 - path via interface 1
290        #
291        abf_1 = VppAbfPolicy(self, 10, acl_1,
292                             [VppRoutePath("3001::1",
293                                           0xffffffff)])
294        abf_1.add_vpp_config()
295
296        attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index,
297                                45, is_ipv6=True)
298        attach_1.add_vpp_config()
299
300        #
301        # a packet matching the rule
302        #
303        p = (Ether(src=self.pg0.remote_mac,
304                   dst=self.pg0.local_mac) /
305             IPv6(src="2001::2", dst="2001::1") /
306             UDP(sport=1234, dport=1234) /
307             Raw(b'\xa5' * 100))
308
309        #
310        # packets are dropped because there is no route to the policy's
311        # next hop
312        #
313        self.send_and_assert_no_replies(self.pg1, p * NUM_PKTS, "no route")
314
315        #
316        # add a route resolving the next-hop
317        #
318        route = VppIpRoute(self, "3001::1", 32,
319                           [VppRoutePath(self.pg1.remote_ip6,
320                                         self.pg1.sw_if_index)])
321        route.add_vpp_config()
322
323        #
324        # now expect packets forwarded.
325        #
326        self.send_and_expect(self.pg0, p * NUM_PKTS, self.pg1)
327
328
329if __name__ == '__main__':
330    unittest.main(testRunner=VppTestRunner)
331