test_abf.py revision 770a0dea
1#!/usr/bin/env python3
2
3from socket import inet_pton, inet_ntop, AF_INET, AF_INET6
4import unittest
5
6from framework import VppTestCase, VppTestRunner
7from vpp_ip import DpoProto
8from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsLabel, \
9    VppIpTable, FibPathProto
10
11from scapy.packet import Raw
12from scapy.layers.l2 import Ether
13from scapy.layers.inet import IP, UDP
14from scapy.layers.inet6 import IPv6
15
16from vpp_object import VppObject
17
18NUM_PKTS = 67
19
20
21def find_abf_policy(test, id):
22    policies = test.vapi.abf_policy_dump()
23    for p in policies:
24        if id == p.policy.policy_id:
25            return True
26    return False
27
28
29def find_abf_itf_attach(test, id, sw_if_index):
30    attachs = test.vapi.abf_itf_attach_dump()
31    for a in attachs:
32        if id == a.attach.policy_id and \
33           sw_if_index == a.attach.sw_if_index:
34            return True
35    return False
36
37
38class VppAbfPolicy(VppObject):
39
40    def __init__(self,
41                 test,
42                 policy_id,
43                 acl,
44                 paths):
45        self._test = test
46        self.policy_id = policy_id
47        self.acl = acl
48        self.paths = paths
49        self.encoded_paths = []
50        for path in self.paths:
51            self.encoded_paths.append(path.encode())
52
53    def add_vpp_config(self):
54        self._test.vapi.abf_policy_add_del(
55            1,
56            {'policy_id': self.policy_id,
57             'acl_index': self.acl.acl_index,
58             'n_paths': len(self.paths),
59             'paths': self.encoded_paths})
60        self._test.registry.register(self, self._test.logger)
61
62    def remove_vpp_config(self):
63        self._test.vapi.abf_policy_add_del(
64            0,
65            {'policy_id': self.policy_id,
66             'acl_index': self.acl.acl_index,
67             'n_paths': len(self.paths),
68             'paths': self.encoded_paths})
69
70    def query_vpp_config(self):
71        return find_abf_policy(self._test, self.policy_id)
72
73    def object_id(self):
74        return ("abf-policy-%d" % self.policy_id)
75
76
77class VppAbfAttach(VppObject):
78
79    def __init__(self,
80                 test,
81                 policy_id,
82                 sw_if_index,
83                 priority,
84                 is_ipv6=0):
85        self._test = test
86        self.policy_id = policy_id
87        self.sw_if_index = sw_if_index
88        self.priority = priority
89        self.is_ipv6 = is_ipv6
90
91    def add_vpp_config(self):
92        self._test.vapi.abf_itf_attach_add_del(
93            1,
94            {'policy_id': self.policy_id,
95             'sw_if_index': self.sw_if_index,
96             'priority': self.priority,
97             'is_ipv6': self.is_ipv6})
98        self._test.registry.register(self, self._test.logger)
99
100    def remove_vpp_config(self):
101        self._test.vapi.abf_itf_attach_add_del(
102            0,
103            {'policy_id': self.policy_id,
104             'sw_if_index': self.sw_if_index,
105             'priority': self.priority,
106             'is_ipv6': self.is_ipv6})
107
108    def query_vpp_config(self):
109        return find_abf_itf_attach(self._test,
110                                   self.policy_id,
111                                   self.sw_if_index)
112
113    def object_id(self):
114        return ("abf-attach-%d-%d" % (self.policy_id, self.sw_if_index))
115
116
117class TestAbf(VppTestCase):
118    """ ABF Test Case """
119
120    @classmethod
121    def setUpClass(cls):
122        super(TestAbf, cls).setUpClass()
123
124    @classmethod
125    def tearDownClass(cls):
126        super(TestAbf, cls).tearDownClass()
127
128    def setUp(self):
129        super(TestAbf, self).setUp()
130
131        self.create_pg_interfaces(range(5))
132
133        for i in self.pg_interfaces[:4]:
134            i.admin_up()
135            i.config_ip4()
136            i.resolve_arp()
137            i.config_ip6()
138            i.resolve_ndp()
139
140    def tearDown(self):
141        for i in self.pg_interfaces:
142            i.unconfig_ip4()
143            i.unconfig_ip6()
144            i.ip6_disable()
145            i.admin_down()
146        super(TestAbf, self).tearDown()
147
148    def test_abf4(self):
149        """ IPv4 ACL Based Forwarding
150        """
151
152        #
153        # We are not testing the various matching capabilities
154        # of ACLs, that's done elsewhere. Here ware are testing
155        # the application of ACLs to a forwarding path to achieve
156        # ABF
157        # So we construct just a few ACLs to ensure the ABF policies
158        # are correctly constructed and used. And a few path types
159        # to test the API path decoding.
160        #
161
162        #
163        # Rule 1
164        #
165        rule_1 = ({'is_permit': 1,
166                   'is_ipv6': 0,
167                   'proto': 17,
168                   'srcport_or_icmptype_first': 1234,
169                   'srcport_or_icmptype_last': 1234,
170                   'src_ip_prefix_len': 32,
171                   'src_ip_addr': inet_pton(AF_INET, "1.1.1.1"),
172                   'dstport_or_icmpcode_first': 1234,
173                   'dstport_or_icmpcode_last': 1234,
174                   'dst_ip_prefix_len': 32,
175                   'dst_ip_addr': inet_pton(AF_INET, "1.1.1.2")})
176        acl_1 = self.vapi.acl_add_replace(acl_index=4294967295, r=[rule_1])
177
178        #
179        # ABF policy for ACL 1 - path via interface 1
180        #
181        abf_1 = VppAbfPolicy(self, 10, acl_1,
182                             [VppRoutePath(self.pg1.remote_ip4,
183                                           self.pg1.sw_if_index)])
184        abf_1.add_vpp_config()
185
186        #
187        # Attach the policy to input interface Pg0
188        #
189        attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index, 50)
190        attach_1.add_vpp_config()
191
192        #
193        # fire in packet matching the ACL src,dst. If it's forwarded
194        # then the ABF was successful, since default routing will drop it
195        #
196        p_1 = (Ether(src=self.pg0.remote_mac,
197                     dst=self.pg0.local_mac) /
198               IP(src="1.1.1.1", dst="1.1.1.2") /
199               UDP(sport=1234, dport=1234) /
200               Raw(b'\xa5' * 100))
201        self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg1)
202
203        #
204        # Attach a 'better' priority policy to the same interface
205        #
206        abf_2 = VppAbfPolicy(self, 11, acl_1,
207                             [VppRoutePath(self.pg2.remote_ip4,
208                                           self.pg2.sw_if_index)])
209        abf_2.add_vpp_config()
210        attach_2 = VppAbfAttach(self, 11, self.pg0.sw_if_index, 40)
211        attach_2.add_vpp_config()
212
213        self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg2)
214
215        #
216        # Attach a policy with priority in the middle
217        #
218        abf_3 = VppAbfPolicy(self, 12, acl_1,
219                             [VppRoutePath(self.pg3.remote_ip4,
220                                           self.pg3.sw_if_index)])
221        abf_3.add_vpp_config()
222        attach_3 = VppAbfAttach(self, 12, self.pg0.sw_if_index, 45)
223        attach_3.add_vpp_config()
224
225        self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg2)
226
227        #
228        # remove the best priority
229        #
230        attach_2.remove_vpp_config()
231        self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg3)
232
233        #
234        # Attach one of the same policies to Pg1
235        #
236        attach_4 = VppAbfAttach(self, 12, self.pg1.sw_if_index, 45)
237        attach_4.add_vpp_config()
238
239        p_2 = (Ether(src=self.pg1.remote_mac,
240                     dst=self.pg1.local_mac) /
241               IP(src="1.1.1.1", dst="1.1.1.2") /
242               UDP(sport=1234, dport=1234) /
243               Raw(b'\xa5' * 100))
244        self.send_and_expect(self.pg1, p_2 * NUM_PKTS, self.pg3)
245
246        #
247        # detach the policy from PG1, now expect traffic to be dropped
248        #
249        attach_4.remove_vpp_config()
250
251        self.send_and_assert_no_replies(self.pg1, p_2 * NUM_PKTS, "Detached")
252
253        #
254        # Swap to route via a next-hop in the non-default table
255        #
256        table_20 = VppIpTable(self, 20)
257        table_20.add_vpp_config()
258
259        self.pg4.set_table_ip4(table_20.table_id)
260        self.pg4.admin_up()
261        self.pg4.config_ip4()
262        self.pg4.resolve_arp()
263
264        abf_13 = VppAbfPolicy(self, 13, acl_1,
265                              [VppRoutePath(self.pg4.remote_ip4,
266                                            0xffffffff,
267                                            nh_table_id=table_20.table_id)])
268        abf_13.add_vpp_config()
269        attach_5 = VppAbfAttach(self, 13, self.pg0.sw_if_index, 30)
270        attach_5.add_vpp_config()
271
272        self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg4)
273
274        self.pg4.unconfig_ip4()
275        self.pg4.set_table_ip4(0)
276
277    def test_abf6(self):
278        """ IPv6 ACL Based Forwarding
279        """
280
281        #
282        # Simple test for matching IPv6 packets
283        #
284
285        #
286        # Rule 1
287        #
288        rule_1 = ({'is_permit': 1,
289                   'is_ipv6': 1,
290                   'proto': 17,
291                   'srcport_or_icmptype_first': 1234,
292                   'srcport_or_icmptype_last': 1234,
293                   'src_ip_prefix_len': 128,
294                   'src_ip_addr': inet_pton(AF_INET6, "2001::2"),
295                   'dstport_or_icmpcode_first': 1234,
296                   'dstport_or_icmpcode_last': 1234,
297                   'dst_ip_prefix_len': 128,
298                   'dst_ip_addr': inet_pton(AF_INET6, "2001::1")})
299        acl_1 = self.vapi.acl_add_replace(acl_index=4294967295,
300                                          r=[rule_1])
301
302        #
303        # ABF policy for ACL 1 - path via interface 1
304        #
305        abf_1 = VppAbfPolicy(self, 10, acl_1,
306                             [VppRoutePath("3001::1",
307                                           0xffffffff)])
308        abf_1.add_vpp_config()
309
310        attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index,
311                                45, is_ipv6=True)
312        attach_1.add_vpp_config()
313
314        #
315        # a packet matching the rule
316        #
317        p = (Ether(src=self.pg0.remote_mac,
318                   dst=self.pg0.local_mac) /
319             IPv6(src="2001::2", dst="2001::1") /
320             UDP(sport=1234, dport=1234) /
321             Raw(b'\xa5' * 100))
322
323        #
324        # packets are dropped because there is no route to the policy's
325        # next hop
326        #
327        self.send_and_assert_no_replies(self.pg1, p * NUM_PKTS, "no route")
328
329        #
330        # add a route resolving the next-hop
331        #
332        route = VppIpRoute(self, "3001::1", 32,
333                           [VppRoutePath(self.pg1.remote_ip6,
334                                         self.pg1.sw_if_index)])
335        route.add_vpp_config()
336
337        #
338        # now expect packets forwarded.
339        #
340        self.send_and_expect(self.pg0, p * NUM_PKTS, self.pg1)
341
342
343if __name__ == '__main__':
344    unittest.main(testRunner=VppTestRunner)
345