test_abf.py revision 492a5d0b
1#!/usr/bin/env python3
2
3from socket import inet_pton, inet_ntop, AF_INET, AF_INET6
4import unittest
5
6from framework import VppTestCase, VppTestRunner
7from vpp_ip import DpoProto
8from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsLabel, \
9    VppIpTable, FibPathProto
10
11from scapy.packet import Raw
12from scapy.layers.l2 import Ether
13from scapy.layers.inet import IP, UDP
14from scapy.layers.inet6 import IPv6
15
16from vpp_object import VppObject
17
18NUM_PKTS = 67
19
20
21def find_abf_policy(test, id):
22    policies = test.vapi.abf_policy_dump()
23    for p in policies:
24        if id == p.policy.policy_id:
25            return True
26    return False
27
28
29def find_abf_itf_attach(test, id, sw_if_index):
30    attachs = test.vapi.abf_itf_attach_dump()
31    for a in attachs:
32        if id == a.attach.policy_id and \
33           sw_if_index == a.attach.sw_if_index:
34            return True
35    return False
36
37
38class VppAbfPolicy(VppObject):
39
40    def __init__(self,
41                 test,
42                 policy_id,
43                 acl,
44                 paths):
45        self._test = test
46        self.policy_id = policy_id
47        self.acl = acl
48        self.paths = paths
49        self.encoded_paths = []
50        for path in self.paths:
51            self.encoded_paths.append(path.encode())
52
53    def add_vpp_config(self):
54        self._test.vapi.abf_policy_add_del(
55            1,
56            {'policy_id': self.policy_id,
57             'acl_index': self.acl.acl_index,
58             'n_paths': len(self.paths),
59             'paths': self.encoded_paths})
60        self._test.registry.register(self, self._test.logger)
61
62    def remove_vpp_config(self):
63        self._test.vapi.abf_policy_add_del(
64            0,
65            {'policy_id': self.policy_id,
66             'acl_index': self.acl.acl_index,
67             'n_paths': len(self.paths),
68             'paths': self.encoded_paths})
69
70    def query_vpp_config(self):
71        return find_abf_policy(self._test, self.policy_id)
72
73    def object_id(self):
74        return ("abf-policy-%d" % self.policy_id)
75
76
77class VppAbfAttach(VppObject):
78
79    def __init__(self,
80                 test,
81                 policy_id,
82                 sw_if_index,
83                 priority,
84                 is_ipv6=0):
85        self._test = test
86        self.policy_id = policy_id
87        self.sw_if_index = sw_if_index
88        self.priority = priority
89        self.is_ipv6 = is_ipv6
90
91    def add_vpp_config(self):
92        self._test.vapi.abf_itf_attach_add_del(
93            1,
94            {'policy_id': self.policy_id,
95             'sw_if_index': self.sw_if_index,
96             'priority': self.priority,
97             'is_ipv6': self.is_ipv6})
98        self._test.registry.register(self, self._test.logger)
99
100    def remove_vpp_config(self):
101        self._test.vapi.abf_itf_attach_add_del(
102            0,
103            {'policy_id': self.policy_id,
104             'sw_if_index': self.sw_if_index,
105             'priority': self.priority,
106             'is_ipv6': self.is_ipv6})
107
108    def query_vpp_config(self):
109        return find_abf_itf_attach(self._test,
110                                   self.policy_id,
111                                   self.sw_if_index)
112
113    def object_id(self):
114        return ("abf-attach-%d-%d" % (self.policy_id, self.sw_if_index))
115
116
117class TestAbf(VppTestCase):
118    """ ABF Test Case """
119
120    @classmethod
121    def setUpClass(cls):
122        super(TestAbf, cls).setUpClass()
123
124    @classmethod
125    def tearDownClass(cls):
126        super(TestAbf, cls).tearDownClass()
127
128    def setUp(self):
129        super(TestAbf, self).setUp()
130
131        self.create_pg_interfaces(range(5))
132
133        for i in self.pg_interfaces[:4]:
134            i.admin_up()
135            i.config_ip4()
136            i.resolve_arp()
137            i.config_ip6()
138            i.resolve_ndp()
139
140    def tearDown(self):
141        for i in self.pg_interfaces:
142            i.unconfig_ip4()
143            i.unconfig_ip6()
144            i.admin_down()
145        super(TestAbf, self).tearDown()
146
147    def test_abf4(self):
148        """ IPv4 ACL Based Forwarding
149        """
150
151        #
152        # We are not testing the various matching capabilities
153        # of ACLs, that's done elsewhere. Here ware are testing
154        # the application of ACLs to a forwarding path to achieve
155        # ABF
156        # So we construct just a few ACLs to ensure the ABF policies
157        # are correctly constructed and used. And a few path types
158        # to test the API path decoding.
159        #
160
161        #
162        # Rule 1
163        #
164        rule_1 = ({'is_permit': 1,
165                   'is_ipv6': 0,
166                   'proto': 17,
167                   'srcport_or_icmptype_first': 1234,
168                   'srcport_or_icmptype_last': 1234,
169                   'src_ip_prefix_len': 32,
170                   'src_ip_addr': inet_pton(AF_INET, "1.1.1.1"),
171                   'dstport_or_icmpcode_first': 1234,
172                   'dstport_or_icmpcode_last': 1234,
173                   'dst_ip_prefix_len': 32,
174                   'dst_ip_addr': inet_pton(AF_INET, "1.1.1.2")})
175        acl_1 = self.vapi.acl_add_replace(acl_index=4294967295, r=[rule_1])
176
177        #
178        # ABF policy for ACL 1 - path via interface 1
179        #
180        abf_1 = VppAbfPolicy(self, 10, acl_1,
181                             [VppRoutePath(self.pg1.remote_ip4,
182                                           self.pg1.sw_if_index)])
183        abf_1.add_vpp_config()
184
185        #
186        # Attach the policy to input interface Pg0
187        #
188        attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index, 50)
189        attach_1.add_vpp_config()
190
191        #
192        # fire in packet matching the ACL src,dst. If it's forwarded
193        # then the ABF was successful, since default routing will drop it
194        #
195        p_1 = (Ether(src=self.pg0.remote_mac,
196                     dst=self.pg0.local_mac) /
197               IP(src="1.1.1.1", dst="1.1.1.2") /
198               UDP(sport=1234, dport=1234) /
199               Raw(b'\xa5' * 100))
200        self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg1)
201
202        #
203        # Attach a 'better' priority policy to the same interface
204        #
205        abf_2 = VppAbfPolicy(self, 11, acl_1,
206                             [VppRoutePath(self.pg2.remote_ip4,
207                                           self.pg2.sw_if_index)])
208        abf_2.add_vpp_config()
209        attach_2 = VppAbfAttach(self, 11, self.pg0.sw_if_index, 40)
210        attach_2.add_vpp_config()
211
212        self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg2)
213
214        #
215        # Attach a policy with priority in the middle
216        #
217        abf_3 = VppAbfPolicy(self, 12, acl_1,
218                             [VppRoutePath(self.pg3.remote_ip4,
219                                           self.pg3.sw_if_index)])
220        abf_3.add_vpp_config()
221        attach_3 = VppAbfAttach(self, 12, self.pg0.sw_if_index, 45)
222        attach_3.add_vpp_config()
223
224        self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg2)
225
226        #
227        # remove the best priority
228        #
229        attach_2.remove_vpp_config()
230        self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg3)
231
232        #
233        # Attach one of the same policies to Pg1
234        #
235        attach_4 = VppAbfAttach(self, 12, self.pg1.sw_if_index, 45)
236        attach_4.add_vpp_config()
237
238        p_2 = (Ether(src=self.pg1.remote_mac,
239                     dst=self.pg1.local_mac) /
240               IP(src="1.1.1.1", dst="1.1.1.2") /
241               UDP(sport=1234, dport=1234) /
242               Raw(b'\xa5' * 100))
243        self.send_and_expect(self.pg1, p_2 * NUM_PKTS, self.pg3)
244
245        #
246        # detach the policy from PG1, now expect traffic to be dropped
247        #
248        attach_4.remove_vpp_config()
249
250        self.send_and_assert_no_replies(self.pg1, p_2 * NUM_PKTS, "Detached")
251
252        #
253        # Swap to route via a next-hop in the non-default table
254        #
255        table_20 = VppIpTable(self, 20)
256        table_20.add_vpp_config()
257
258        self.pg4.set_table_ip4(table_20.table_id)
259        self.pg4.admin_up()
260        self.pg4.config_ip4()
261        self.pg4.resolve_arp()
262
263        abf_13 = VppAbfPolicy(self, 13, acl_1,
264                              [VppRoutePath(self.pg4.remote_ip4,
265                                            0xffffffff,
266                                            nh_table_id=table_20.table_id)])
267        abf_13.add_vpp_config()
268        attach_5 = VppAbfAttach(self, 13, self.pg0.sw_if_index, 30)
269        attach_5.add_vpp_config()
270
271        self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg4)
272
273        self.pg4.unconfig_ip4()
274        self.pg4.set_table_ip4(0)
275
276    def test_abf6(self):
277        """ IPv6 ACL Based Forwarding
278        """
279
280        #
281        # Simple test for matching IPv6 packets
282        #
283
284        #
285        # Rule 1
286        #
287        rule_1 = ({'is_permit': 1,
288                   'is_ipv6': 1,
289                   'proto': 17,
290                   'srcport_or_icmptype_first': 1234,
291                   'srcport_or_icmptype_last': 1234,
292                   'src_ip_prefix_len': 128,
293                   'src_ip_addr': inet_pton(AF_INET6, "2001::2"),
294                   'dstport_or_icmpcode_first': 1234,
295                   'dstport_or_icmpcode_last': 1234,
296                   'dst_ip_prefix_len': 128,
297                   'dst_ip_addr': inet_pton(AF_INET6, "2001::1")})
298        acl_1 = self.vapi.acl_add_replace(acl_index=4294967295,
299                                          r=[rule_1])
300
301        #
302        # ABF policy for ACL 1 - path via interface 1
303        #
304        abf_1 = VppAbfPolicy(self, 10, acl_1,
305                             [VppRoutePath("3001::1",
306                                           0xffffffff)])
307        abf_1.add_vpp_config()
308
309        attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index,
310                                45, is_ipv6=True)
311        attach_1.add_vpp_config()
312
313        #
314        # a packet matching the rule
315        #
316        p = (Ether(src=self.pg0.remote_mac,
317                   dst=self.pg0.local_mac) /
318             IPv6(src="2001::2", dst="2001::1") /
319             UDP(sport=1234, dport=1234) /
320             Raw(b'\xa5' * 100))
321
322        #
323        # packets are dropped because there is no route to the policy's
324        # next hop
325        #
326        self.send_and_assert_no_replies(self.pg1, p * NUM_PKTS, "no route")
327
328        #
329        # add a route resolving the next-hop
330        #
331        route = VppIpRoute(self, "3001::1", 32,
332                           [VppRoutePath(self.pg1.remote_ip6,
333                                         self.pg1.sw_if_index)])
334        route.add_vpp_config()
335
336        #
337        # now expect packets forwarded.
338        #
339        self.send_and_expect(self.pg0, p * NUM_PKTS, self.pg1)
340
341
342if __name__ == '__main__':
343    unittest.main(testRunner=VppTestRunner)
344