test_ipsec_tun_if_esp.py revision abde62fb
1import unittest
2import socket
3import copy
4
5from scapy.layers.ipsec import SecurityAssociation, ESP
6from scapy.layers.l2 import Ether, Raw, GRE
7from scapy.layers.inet import IP, UDP
8from scapy.layers.inet6 import IPv6
9from framework import VppTestRunner
10from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
11    IpsecTun4, IpsecTun6,  IpsecTcpTests, mk_scapy_crypt_key
12from vpp_ipsec_tun_interface import VppIpsecTunInterface
13from vpp_gre_interface import VppGreInterface
14from vpp_ipip_tun_interface import VppIpIpTunInterface
15from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
16from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect
17from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
18from util import ppp
19from vpp_papi import VppEnum
20
21
22def config_tun_params(p, encryption_type, tun_if):
23    ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
24    use_esn = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
25                              IPSEC_API_SAD_FLAG_USE_ESN))
26    crypt_key = mk_scapy_crypt_key(p)
27    p.scapy_tun_sa = SecurityAssociation(
28        encryption_type, spi=p.vpp_tun_spi,
29        crypt_algo=p.crypt_algo,
30        crypt_key=crypt_key,
31        auth_algo=p.auth_algo, auth_key=p.auth_key,
32        tunnel_header=ip_class_by_addr_type[p.addr_type](
33            src=tun_if.remote_ip,
34            dst=tun_if.local_ip),
35        nat_t_header=p.nat_header,
36        use_esn=use_esn)
37    p.vpp_tun_sa = SecurityAssociation(
38        encryption_type, spi=p.scapy_tun_spi,
39        crypt_algo=p.crypt_algo,
40        crypt_key=crypt_key,
41        auth_algo=p.auth_algo, auth_key=p.auth_key,
42        tunnel_header=ip_class_by_addr_type[p.addr_type](
43            dst=tun_if.remote_ip,
44            src=tun_if.local_ip),
45        nat_t_header=p.nat_header,
46        use_esn=use_esn)
47
48
49class TemplateIpsec4TunIfEsp(TemplateIpsec):
50    """ IPsec tunnel interface tests """
51
52    encryption_type = ESP
53
54    @classmethod
55    def setUpClass(cls):
56        super(TemplateIpsec4TunIfEsp, cls).setUpClass()
57
58    @classmethod
59    def tearDownClass(cls):
60        super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
61
62    def setUp(self):
63        super(TemplateIpsec4TunIfEsp, self).setUp()
64
65        self.tun_if = self.pg0
66
67        p = self.ipv4_params
68
69        p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
70                                        p.scapy_tun_spi, p.crypt_algo_vpp_id,
71                                        p.crypt_key, p.crypt_key,
72                                        p.auth_algo_vpp_id, p.auth_key,
73                                        p.auth_key)
74        p.tun_if.add_vpp_config()
75        p.tun_if.admin_up()
76        p.tun_if.config_ip4()
77        p.tun_if.config_ip6()
78        config_tun_params(p, self.encryption_type, p.tun_if)
79
80        r = VppIpRoute(self, p.remote_tun_if_host, 32,
81                       [VppRoutePath(p.tun_if.remote_ip4,
82                                     0xffffffff)])
83        r.add_vpp_config()
84        r = VppIpRoute(self, p.remote_tun_if_host6, 128,
85                       [VppRoutePath(p.tun_if.remote_ip6,
86                                     0xffffffff,
87                                     proto=DpoProto.DPO_PROTO_IP6)])
88        r.add_vpp_config()
89
90    def tearDown(self):
91        super(TemplateIpsec4TunIfEsp, self).tearDown()
92
93
94class TemplateIpsec4TunIfEspUdp(TemplateIpsec):
95    """ IPsec UDP tunnel interface tests """
96
97    tun4_encrypt_node_name = "esp4-encrypt-tun"
98    tun4_decrypt_node_name = "esp4-decrypt-tun"
99    encryption_type = ESP
100
101    @classmethod
102    def setUpClass(cls):
103        super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
104
105    @classmethod
106    def tearDownClass(cls):
107        super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
108
109    def setUp(self):
110        super(TemplateIpsec4TunIfEspUdp, self).setUp()
111
112        self.tun_if = self.pg0
113
114        p = self.ipv4_params
115        p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
116                   IPSEC_API_SAD_FLAG_UDP_ENCAP)
117        p.nat_header = UDP(sport=5454, dport=4500)
118
119        p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
120                                        p.scapy_tun_spi, p.crypt_algo_vpp_id,
121                                        p.crypt_key, p.crypt_key,
122                                        p.auth_algo_vpp_id, p.auth_key,
123                                        p.auth_key, udp_encap=True)
124        p.tun_if.add_vpp_config()
125        p.tun_if.admin_up()
126        p.tun_if.config_ip4()
127        p.tun_if.config_ip6()
128        config_tun_params(p, self.encryption_type, p.tun_if)
129
130        r = VppIpRoute(self, p.remote_tun_if_host, 32,
131                       [VppRoutePath(p.tun_if.remote_ip4,
132                                     0xffffffff)])
133        r.add_vpp_config()
134        r = VppIpRoute(self, p.remote_tun_if_host6, 128,
135                       [VppRoutePath(p.tun_if.remote_ip6,
136                                     0xffffffff,
137                                     proto=DpoProto.DPO_PROTO_IP6)])
138        r.add_vpp_config()
139
140    def tearDown(self):
141        super(TemplateIpsec4TunIfEspUdp, self).tearDown()
142
143
144class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
145    """ Ipsec ESP - TUN tests """
146    tun4_encrypt_node_name = "esp4-encrypt-tun"
147    tun4_decrypt_node_name = "esp4-decrypt-tun"
148
149    def test_tun_basic64(self):
150        """ ipsec 6o4 tunnel basic test """
151        self.tun4_encrypt_node_name = "esp4-encrypt-tun"
152
153        self.verify_tun_64(self.params[socket.AF_INET], count=1)
154
155    def test_tun_burst64(self):
156        """ ipsec 6o4 tunnel basic test """
157        self.tun4_encrypt_node_name = "esp4-encrypt-tun"
158
159        self.verify_tun_64(self.params[socket.AF_INET], count=257)
160
161    def test_tun_basic_frag44(self):
162        """ ipsec 4o4 tunnel frag basic test """
163        self.tun4_encrypt_node_name = "esp4-encrypt-tun"
164
165        p = self.ipv4_params
166
167        self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
168                                       [1500, 0, 0, 0])
169        self.verify_tun_44(self.params[socket.AF_INET],
170                           count=1, payload_size=1800, n_rx=2)
171        self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
172                                       [9000, 0, 0, 0])
173
174
175class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
176    """ Ipsec ESP UDP tests """
177
178    tun4_input_node = "ipsec4-tun-input"
179
180    def test_keepalive(self):
181        """ IPSEC NAT Keepalive """
182        self.verify_keepalive(self.ipv4_params)
183
184
185class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
186    """ Ipsec ESP - TCP tests """
187    pass
188
189
190class TemplateIpsec6TunIfEsp(TemplateIpsec):
191    """ IPsec tunnel interface tests """
192
193    encryption_type = ESP
194
195    def setUp(self):
196        super(TemplateIpsec6TunIfEsp, self).setUp()
197
198        self.tun_if = self.pg0
199
200        p = self.ipv6_params
201        p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
202                                        p.scapy_tun_spi, p.crypt_algo_vpp_id,
203                                        p.crypt_key, p.crypt_key,
204                                        p.auth_algo_vpp_id, p.auth_key,
205                                        p.auth_key, is_ip6=True)
206        p.tun_if.add_vpp_config()
207        p.tun_if.admin_up()
208        p.tun_if.config_ip6()
209        p.tun_if.config_ip4()
210        config_tun_params(p, self.encryption_type, p.tun_if)
211
212        r = VppIpRoute(self, p.remote_tun_if_host, 128,
213                       [VppRoutePath(p.tun_if.remote_ip6,
214                                     0xffffffff,
215                                     proto=DpoProto.DPO_PROTO_IP6)])
216        r.add_vpp_config()
217        r = VppIpRoute(self, p.remote_tun_if_host4, 32,
218                       [VppRoutePath(p.tun_if.remote_ip4,
219                                     0xffffffff)])
220        r.add_vpp_config()
221
222    def tearDown(self):
223        super(TemplateIpsec6TunIfEsp, self).tearDown()
224
225
226class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp, IpsecTun6Tests):
227    """ Ipsec ESP - TUN tests """
228    tun6_encrypt_node_name = "esp6-encrypt-tun"
229    tun6_decrypt_node_name = "esp6-decrypt-tun"
230
231    def test_tun_basic46(self):
232        """ ipsec 4o6 tunnel basic test """
233        self.tun6_encrypt_node_name = "esp6-encrypt-tun"
234        self.verify_tun_46(self.params[socket.AF_INET6], count=1)
235
236    def test_tun_burst46(self):
237        """ ipsec 4o6 tunnel burst test """
238        self.tun6_encrypt_node_name = "esp6-encrypt-tun"
239        self.verify_tun_46(self.params[socket.AF_INET6], count=257)
240
241
242class TestIpsec4MultiTunIfEsp(TemplateIpsec, IpsecTun4):
243    """ IPsec IPv4 Multi Tunnel interface """
244
245    encryption_type = ESP
246    tun4_encrypt_node_name = "esp4-encrypt-tun"
247    tun4_decrypt_node_name = "esp4-decrypt-tun"
248
249    def setUp(self):
250        super(TestIpsec4MultiTunIfEsp, self).setUp()
251
252        self.tun_if = self.pg0
253
254        self.multi_params = []
255        self.pg0.generate_remote_hosts(10)
256        self.pg0.configure_ipv4_neighbors()
257
258        for ii in range(10):
259            p = copy.copy(self.ipv4_params)
260
261            p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
262            p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
263            p.scapy_tun_spi = p.scapy_tun_spi + ii
264            p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
265            p.vpp_tun_spi = p.vpp_tun_spi + ii
266
267            p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
268            p.scapy_tra_spi = p.scapy_tra_spi + ii
269            p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
270            p.vpp_tra_spi = p.vpp_tra_spi + ii
271
272            p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
273                                            p.scapy_tun_spi,
274                                            p.crypt_algo_vpp_id,
275                                            p.crypt_key, p.crypt_key,
276                                            p.auth_algo_vpp_id, p.auth_key,
277                                            p.auth_key,
278                                            dst=self.pg0.remote_hosts[ii].ip4)
279            p.tun_if.add_vpp_config()
280            p.tun_if.admin_up()
281            p.tun_if.config_ip4()
282            config_tun_params(p, self.encryption_type, p.tun_if)
283            self.multi_params.append(p)
284
285            VppIpRoute(self, p.remote_tun_if_host, 32,
286                       [VppRoutePath(p.tun_if.remote_ip4,
287                                     0xffffffff)]).add_vpp_config()
288
289    def tearDown(self):
290        super(TestIpsec4MultiTunIfEsp, self).tearDown()
291
292    def test_tun_44(self):
293        """Multiple IPSEC tunnel interfaces """
294        for p in self.multi_params:
295            self.verify_tun_44(p, count=127)
296            c = p.tun_if.get_rx_stats()
297            self.assertEqual(c['packets'], 127)
298            c = p.tun_if.get_tx_stats()
299            self.assertEqual(c['packets'], 127)
300
301
302class TestIpsec4TunIfEspAll(TemplateIpsec, IpsecTun4):
303    """ IPsec IPv4 Tunnel interface all Algos """
304
305    encryption_type = ESP
306    tun4_encrypt_node_name = "esp4-encrypt-tun"
307    tun4_decrypt_node_name = "esp4-decrypt-tun"
308
309    def config_network(self, p):
310
311        p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
312                                        p.scapy_tun_spi,
313                                        p.crypt_algo_vpp_id,
314                                        p.crypt_key, p.crypt_key,
315                                        p.auth_algo_vpp_id, p.auth_key,
316                                        p.auth_key,
317                                        salt=p.salt)
318        p.tun_if.add_vpp_config()
319        p.tun_if.admin_up()
320        p.tun_if.config_ip4()
321        config_tun_params(p, self.encryption_type, p.tun_if)
322        self.logger.info(self.vapi.cli("sh ipsec sa 0"))
323        self.logger.info(self.vapi.cli("sh ipsec sa 1"))
324
325        p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
326                             [VppRoutePath(p.tun_if.remote_ip4,
327                                           0xffffffff)])
328        p.route.add_vpp_config()
329
330    def unconfig_network(self, p):
331        p.tun_if.unconfig_ip4()
332        p.tun_if.remove_vpp_config()
333        p.route.remove_vpp_config()
334
335    def setUp(self):
336        super(TestIpsec4TunIfEspAll, self).setUp()
337
338        self.tun_if = self.pg0
339
340    def tearDown(self):
341        super(TestIpsec4TunIfEspAll, self).tearDown()
342
343    def rekey(self, p):
344        #
345        # change the key and the SPI
346        #
347        p.crypt_key = b'X' + p.crypt_key[1:]
348        p.scapy_tun_spi += 1
349        p.scapy_tun_sa_id += 1
350        p.vpp_tun_spi += 1
351        p.vpp_tun_sa_id += 1
352        p.tun_if.local_spi = p.vpp_tun_spi
353        p.tun_if.remote_spi = p.scapy_tun_spi
354
355        config_tun_params(p, self.encryption_type, p.tun_if)
356
357        p.tun_sa_in = VppIpsecSA(self,
358                                 p.scapy_tun_sa_id,
359                                 p.scapy_tun_spi,
360                                 p.auth_algo_vpp_id,
361                                 p.auth_key,
362                                 p.crypt_algo_vpp_id,
363                                 p.crypt_key,
364                                 self.vpp_esp_protocol,
365                                 flags=p.flags,
366                                 salt=p.salt)
367        p.tun_sa_out = VppIpsecSA(self,
368                                  p.vpp_tun_sa_id,
369                                  p.vpp_tun_spi,
370                                  p.auth_algo_vpp_id,
371                                  p.auth_key,
372                                  p.crypt_algo_vpp_id,
373                                  p.crypt_key,
374                                  self.vpp_esp_protocol,
375                                  flags=p.flags,
376                                  salt=p.salt)
377        p.tun_sa_in.add_vpp_config()
378        p.tun_sa_out.add_vpp_config()
379
380        self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
381                                         sa_id=p.tun_sa_in.id,
382                                         is_outbound=1)
383        self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
384                                         sa_id=p.tun_sa_out.id,
385                                         is_outbound=0)
386        self.logger.info(self.vapi.cli("sh ipsec sa"))
387
388    def test_tun_44(self):
389        """IPSEC tunnel all algos """
390
391        # foreach VPP crypto engine
392        engines = ["ia32", "ipsecmb", "openssl"]
393
394        # foreach crypto algorithm
395        algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
396                                 IPSEC_API_CRYPTO_ALG_AES_GCM_128),
397                  'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
398                                IPSEC_API_INTEG_ALG_NONE),
399                  'scapy-crypto': "AES-GCM",
400                  'scapy-integ': "NULL",
401                  'key': b"JPjyOWBeVEQiMe7h",
402                  'salt': 3333},
403                 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
404                                 IPSEC_API_CRYPTO_ALG_AES_GCM_192),
405                  'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
406                                IPSEC_API_INTEG_ALG_NONE),
407                  'scapy-crypto': "AES-GCM",
408                  'scapy-integ': "NULL",
409                  'key': b"JPjyOWBeVEQiMe7hJPjyOWBe",
410                  'salt': 0},
411                 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
412                                 IPSEC_API_CRYPTO_ALG_AES_GCM_256),
413                  'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
414                                IPSEC_API_INTEG_ALG_NONE),
415                  'scapy-crypto': "AES-GCM",
416                  'scapy-integ': "NULL",
417                  'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
418                  'salt': 9999},
419                 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
420                                 IPSEC_API_CRYPTO_ALG_AES_CBC_128),
421                  'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
422                                IPSEC_API_INTEG_ALG_SHA1_96),
423                  'scapy-crypto': "AES-CBC",
424                  'scapy-integ': "HMAC-SHA1-96",
425                  'salt': 0,
426                  'key': b"JPjyOWBeVEQiMe7h"},
427                 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
428                                 IPSEC_API_CRYPTO_ALG_AES_CBC_192),
429                  'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
430                                IPSEC_API_INTEG_ALG_SHA1_96),
431                  'scapy-crypto': "AES-CBC",
432                  'scapy-integ': "HMAC-SHA1-96",
433                  'salt': 0,
434                  'key': b"JPjyOWBeVEQiMe7hJPjyOWBe"},
435                 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
436                                 IPSEC_API_CRYPTO_ALG_AES_CBC_256),
437                  'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
438                                IPSEC_API_INTEG_ALG_SHA1_96),
439                  'scapy-crypto': "AES-CBC",
440                  'scapy-integ': "HMAC-SHA1-96",
441                  'salt': 0,
442                  'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"},
443                 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
444                                 IPSEC_API_CRYPTO_ALG_NONE),
445                  'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
446                                IPSEC_API_INTEG_ALG_SHA1_96),
447                  'scapy-crypto': "NULL",
448                  'scapy-integ': "HMAC-SHA1-96",
449                  'salt': 0,
450                  'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
451
452        for engine in engines:
453            self.vapi.cli("set crypto handler all %s" % engine)
454
455            #
456            # loop through each of the algorithms
457            #
458            for algo in algos:
459                # with self.subTest(algo=algo['scapy']):
460
461                p = copy.copy(self.ipv4_params)
462                p.auth_algo_vpp_id = algo['vpp-integ']
463                p.crypt_algo_vpp_id = algo['vpp-crypto']
464                p.crypt_algo = algo['scapy-crypto']
465                p.auth_algo = algo['scapy-integ']
466                p.crypt_key = algo['key']
467                p.salt = algo['salt']
468
469                self.config_network(p)
470
471                self.verify_tun_44(p, count=127)
472                c = p.tun_if.get_rx_stats()
473                self.assertEqual(c['packets'], 127)
474                c = p.tun_if.get_tx_stats()
475                self.assertEqual(c['packets'], 127)
476
477                #
478                # rekey the tunnel
479                #
480                self.rekey(p)
481                self.verify_tun_44(p, count=127)
482
483                self.unconfig_network(p)
484                p.tun_sa_out.remove_vpp_config()
485                p.tun_sa_in.remove_vpp_config()
486
487
488class TestIpsec6MultiTunIfEsp(TemplateIpsec, IpsecTun6):
489    """ IPsec IPv6 Multi Tunnel interface """
490
491    encryption_type = ESP
492    tun6_encrypt_node_name = "esp6-encrypt-tun"
493    tun6_decrypt_node_name = "esp6-decrypt-tun"
494
495    def setUp(self):
496        super(TestIpsec6MultiTunIfEsp, self).setUp()
497
498        self.tun_if = self.pg0
499
500        self.multi_params = []
501        self.pg0.generate_remote_hosts(10)
502        self.pg0.configure_ipv6_neighbors()
503
504        for ii in range(10):
505            p = copy.copy(self.ipv6_params)
506
507            p.remote_tun_if_host = "1111::%d" % (ii + 1)
508            p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
509            p.scapy_tun_spi = p.scapy_tun_spi + ii
510            p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
511            p.vpp_tun_spi = p.vpp_tun_spi + ii
512
513            p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
514            p.scapy_tra_spi = p.scapy_tra_spi + ii
515            p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
516            p.vpp_tra_spi = p.vpp_tra_spi + ii
517
518            p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
519                                            p.scapy_tun_spi,
520                                            p.crypt_algo_vpp_id,
521                                            p.crypt_key, p.crypt_key,
522                                            p.auth_algo_vpp_id, p.auth_key,
523                                            p.auth_key, is_ip6=True,
524                                            dst=self.pg0.remote_hosts[ii].ip6)
525            p.tun_if.add_vpp_config()
526            p.tun_if.admin_up()
527            p.tun_if.config_ip6()
528            config_tun_params(p, self.encryption_type, p.tun_if)
529            self.multi_params.append(p)
530
531            r = VppIpRoute(self, p.remote_tun_if_host, 128,
532                           [VppRoutePath(p.tun_if.remote_ip6,
533                                         0xffffffff,
534                                         proto=DpoProto.DPO_PROTO_IP6)])
535            r.add_vpp_config()
536
537    def tearDown(self):
538        super(TestIpsec6MultiTunIfEsp, self).tearDown()
539
540    def test_tun_66(self):
541        """Multiple IPSEC tunnel interfaces """
542        for p in self.multi_params:
543            self.verify_tun_66(p, count=127)
544            c = p.tun_if.get_rx_stats()
545            self.assertEqual(c['packets'], 127)
546            c = p.tun_if.get_tx_stats()
547            self.assertEqual(c['packets'], 127)
548
549
550class TestIpsecGreTebIfEsp(TemplateIpsec,
551                           IpsecTun4Tests):
552    """ Ipsec GRE TEB ESP - TUN tests """
553    tun4_encrypt_node_name = "esp4-encrypt-tun"
554    tun4_decrypt_node_name = "esp4-decrypt-tun"
555    encryption_type = ESP
556    omac = "00:11:22:33:44:55"
557
558    def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
559                         payload_size=100):
560        return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
561                sa.encrypt(IP(src=self.pg0.remote_ip4,
562                              dst=self.pg0.local_ip4) /
563                           GRE() /
564                           Ether(dst=self.omac) /
565                           IP(src="1.1.1.1", dst="1.1.1.2") /
566                           UDP(sport=1144, dport=2233) /
567                           Raw(b'X' * payload_size))
568                for i in range(count)]
569
570    def gen_pkts(self, sw_intf, src, dst, count=1,
571                 payload_size=100):
572        return [Ether(dst=self.omac) /
573                IP(src="1.1.1.1", dst="1.1.1.2") /
574                UDP(sport=1144, dport=2233) /
575                Raw(b'X' * payload_size)
576                for i in range(count)]
577
578    def verify_decrypted(self, p, rxs):
579        for rx in rxs:
580            self.assert_equal(rx[Ether].dst, self.omac)
581            self.assert_equal(rx[IP].dst, "1.1.1.2")
582
583    def verify_encrypted(self, p, sa, rxs):
584        for rx in rxs:
585            try:
586                pkt = sa.decrypt(rx[IP])
587                if not pkt.haslayer(IP):
588                    pkt = IP(pkt[Raw].load)
589                self.assert_packet_checksums_valid(pkt)
590                self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
591                self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
592                self.assertTrue(pkt.haslayer(GRE))
593                e = pkt[Ether]
594                self.assertEqual(e[Ether].dst, self.omac)
595                self.assertEqual(e[IP].dst, "1.1.1.2")
596            except (IndexError, AssertionError):
597                self.logger.debug(ppp("Unexpected packet:", rx))
598                try:
599                    self.logger.debug(ppp("Decrypted packet:", pkt))
600                except:
601                    pass
602                raise
603
604    def setUp(self):
605        super(TestIpsecGreTebIfEsp, self).setUp()
606
607        self.tun_if = self.pg0
608
609        p = self.ipv4_params
610
611        bd1 = VppBridgeDomain(self, 1)
612        bd1.add_vpp_config()
613
614        p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
615                                  p.auth_algo_vpp_id, p.auth_key,
616                                  p.crypt_algo_vpp_id, p.crypt_key,
617                                  self.vpp_esp_protocol,
618                                  self.pg0.local_ip4,
619                                  self.pg0.remote_ip4)
620        p.tun_sa_out.add_vpp_config()
621
622        p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
623                                 p.auth_algo_vpp_id, p.auth_key,
624                                 p.crypt_algo_vpp_id, p.crypt_key,
625                                 self.vpp_esp_protocol,
626                                 self.pg0.remote_ip4,
627                                 self.pg0.local_ip4)
628        p.tun_sa_in.add_vpp_config()
629
630        p.tun_if = VppGreInterface(self,
631                                   self.pg0.local_ip4,
632                                   self.pg0.remote_ip4,
633                                   type=(VppEnum.vl_api_gre_tunnel_type_t.
634                                         GRE_API_TUNNEL_TYPE_TEB))
635        p.tun_if.add_vpp_config()
636
637        p.tun_protect = VppIpsecTunProtect(self,
638                                           p.tun_if,
639                                           p.tun_sa_out,
640                                           [p.tun_sa_in])
641
642        p.tun_protect.add_vpp_config()
643
644        p.tun_if.admin_up()
645        p.tun_if.config_ip4()
646        config_tun_params(p, self.encryption_type, p.tun_if)
647
648        VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
649        VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
650
651        self.vapi.cli("clear ipsec sa")
652
653    def tearDown(self):
654        p = self.ipv4_params
655        p.tun_if.unconfig_ip4()
656        super(TestIpsecGreTebIfEsp, self).tearDown()
657
658
659class TestIpsecGreIfEsp(TemplateIpsec,
660                        IpsecTun4Tests):
661    """ Ipsec GRE ESP - TUN tests """
662    tun4_encrypt_node_name = "esp4-encrypt-tun"
663    tun4_decrypt_node_name = "esp4-decrypt-tun"
664    encryption_type = ESP
665
666    def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
667                         payload_size=100):
668        return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
669                sa.encrypt(IP(src=self.pg0.remote_ip4,
670                              dst=self.pg0.local_ip4) /
671                           GRE() /
672                           IP(src=self.pg1.local_ip4,
673                              dst=self.pg1.remote_ip4) /
674                           UDP(sport=1144, dport=2233) /
675                           Raw(b'X' * payload_size))
676                for i in range(count)]
677
678    def gen_pkts(self, sw_intf, src, dst, count=1,
679                 payload_size=100):
680        return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
681                IP(src="1.1.1.1", dst="1.1.1.2") /
682                UDP(sport=1144, dport=2233) /
683                Raw(b'X' * payload_size)
684                for i in range(count)]
685
686    def verify_decrypted(self, p, rxs):
687        for rx in rxs:
688            self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
689            self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
690
691    def verify_encrypted(self, p, sa, rxs):
692        for rx in rxs:
693            try:
694                pkt = sa.decrypt(rx[IP])
695                if not pkt.haslayer(IP):
696                    pkt = IP(pkt[Raw].load)
697                self.assert_packet_checksums_valid(pkt)
698                self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
699                self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
700                self.assertTrue(pkt.haslayer(GRE))
701                e = pkt[GRE]
702                self.assertEqual(e[IP].dst, "1.1.1.2")
703            except (IndexError, AssertionError):
704                self.logger.debug(ppp("Unexpected packet:", rx))
705                try:
706                    self.logger.debug(ppp("Decrypted packet:", pkt))
707                except:
708                    pass
709                raise
710
711    def setUp(self):
712        super(TestIpsecGreIfEsp, self).setUp()
713
714        self.tun_if = self.pg0
715
716        p = self.ipv4_params
717
718        bd1 = VppBridgeDomain(self, 1)
719        bd1.add_vpp_config()
720
721        p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
722                                  p.auth_algo_vpp_id, p.auth_key,
723                                  p.crypt_algo_vpp_id, p.crypt_key,
724                                  self.vpp_esp_protocol,
725                                  self.pg0.local_ip4,
726                                  self.pg0.remote_ip4)
727        p.tun_sa_out.add_vpp_config()
728
729        p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
730                                 p.auth_algo_vpp_id, p.auth_key,
731                                 p.crypt_algo_vpp_id, p.crypt_key,
732                                 self.vpp_esp_protocol,
733                                 self.pg0.remote_ip4,
734                                 self.pg0.local_ip4)
735        p.tun_sa_in.add_vpp_config()
736
737        p.tun_if = VppGreInterface(self,
738                                   self.pg0.local_ip4,
739                                   self.pg0.remote_ip4)
740        p.tun_if.add_vpp_config()
741
742        p.tun_protect = VppIpsecTunProtect(self,
743                                           p.tun_if,
744                                           p.tun_sa_out,
745                                           [p.tun_sa_in])
746        p.tun_protect.add_vpp_config()
747
748        p.tun_if.admin_up()
749        p.tun_if.config_ip4()
750        config_tun_params(p, self.encryption_type, p.tun_if)
751
752        VppIpRoute(self, "1.1.1.2", 32,
753                   [VppRoutePath(p.tun_if.remote_ip4,
754                                 0xffffffff)]).add_vpp_config()
755
756    def tearDown(self):
757        p = self.ipv4_params
758        p.tun_if.unconfig_ip4()
759        super(TestIpsecGreIfEsp, self).tearDown()
760
761
762class TestIpsecGreIfEspTra(TemplateIpsec,
763                           IpsecTun4Tests):
764    """ Ipsec GRE ESP - TRA tests """
765    tun4_encrypt_node_name = "esp4-encrypt-tun"
766    tun4_decrypt_node_name = "esp4-decrypt-tun"
767    encryption_type = ESP
768
769    def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
770                         payload_size=100):
771        return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
772                sa.encrypt(IP(src=self.pg0.remote_ip4,
773                              dst=self.pg0.local_ip4) /
774                           GRE() /
775                           IP(src=self.pg1.local_ip4,
776                              dst=self.pg1.remote_ip4) /
777                           UDP(sport=1144, dport=2233) /
778                           Raw(b'X' * payload_size))
779                for i in range(count)]
780
781    def gen_pkts(self, sw_intf, src, dst, count=1,
782                 payload_size=100):
783        return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
784                IP(src="1.1.1.1", dst="1.1.1.2") /
785                UDP(sport=1144, dport=2233) /
786                Raw(b'X' * payload_size)
787                for i in range(count)]
788
789    def verify_decrypted(self, p, rxs):
790        for rx in rxs:
791            self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
792            self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
793
794    def verify_encrypted(self, p, sa, rxs):
795        for rx in rxs:
796            try:
797                pkt = sa.decrypt(rx[IP])
798                if not pkt.haslayer(IP):
799                    pkt = IP(pkt[Raw].load)
800                self.assert_packet_checksums_valid(pkt)
801                self.assertTrue(pkt.haslayer(GRE))
802                e = pkt[GRE]
803                self.assertEqual(e[IP].dst, "1.1.1.2")
804            except (IndexError, AssertionError):
805                self.logger.debug(ppp("Unexpected packet:", rx))
806                try:
807                    self.logger.debug(ppp("Decrypted packet:", pkt))
808                except:
809                    pass
810                raise
811
812    def setUp(self):
813        super(TestIpsecGreIfEspTra, self).setUp()
814
815        self.tun_if = self.pg0
816
817        p = self.ipv4_params
818
819        bd1 = VppBridgeDomain(self, 1)
820        bd1.add_vpp_config()
821
822        p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
823                                  p.auth_algo_vpp_id, p.auth_key,
824                                  p.crypt_algo_vpp_id, p.crypt_key,
825                                  self.vpp_esp_protocol)
826        p.tun_sa_out.add_vpp_config()
827
828        p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
829                                 p.auth_algo_vpp_id, p.auth_key,
830                                 p.crypt_algo_vpp_id, p.crypt_key,
831                                 self.vpp_esp_protocol)
832        p.tun_sa_in.add_vpp_config()
833
834        p.tun_if = VppGreInterface(self,
835                                   self.pg0.local_ip4,
836                                   self.pg0.remote_ip4)
837        p.tun_if.add_vpp_config()
838
839        p.tun_protect = VppIpsecTunProtect(self,
840                                           p.tun_if,
841                                           p.tun_sa_out,
842                                           [p.tun_sa_in])
843        p.tun_protect.add_vpp_config()
844
845        p.tun_if.admin_up()
846        p.tun_if.config_ip4()
847        config_tun_params(p, self.encryption_type, p.tun_if)
848
849        VppIpRoute(self, "1.1.1.2", 32,
850                   [VppRoutePath(p.tun_if.remote_ip4,
851                                 0xffffffff)]).add_vpp_config()
852
853    def tearDown(self):
854        p = self.ipv4_params
855        p.tun_if.unconfig_ip4()
856        super(TestIpsecGreIfEspTra, self).tearDown()
857
858
859class TemplateIpsec4TunProtect(object):
860    """ IPsec IPv4 Tunnel protect """
861
862    encryption_type = ESP
863    tun4_encrypt_node_name = "esp4-encrypt-tun"
864    tun4_decrypt_node_name = "esp4-decrypt-tun"
865    tun4_input_node = "ipsec4-tun-input"
866
867    def config_sa_tra(self, p):
868        config_tun_params(p, self.encryption_type, p.tun_if)
869
870        p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
871                                  p.auth_algo_vpp_id, p.auth_key,
872                                  p.crypt_algo_vpp_id, p.crypt_key,
873                                  self.vpp_esp_protocol,
874                                  flags=p.flags)
875        p.tun_sa_out.add_vpp_config()
876
877        p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
878                                 p.auth_algo_vpp_id, p.auth_key,
879                                 p.crypt_algo_vpp_id, p.crypt_key,
880                                 self.vpp_esp_protocol,
881                                 flags=p.flags)
882        p.tun_sa_in.add_vpp_config()
883
884    def config_sa_tun(self, p):
885        config_tun_params(p, self.encryption_type, p.tun_if)
886
887        p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
888                                  p.auth_algo_vpp_id, p.auth_key,
889                                  p.crypt_algo_vpp_id, p.crypt_key,
890                                  self.vpp_esp_protocol,
891                                  self.tun_if.remote_addr[p.addr_type],
892                                  self.tun_if.local_addr[p.addr_type],
893                                  flags=p.flags)
894        p.tun_sa_out.add_vpp_config()
895
896        p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
897                                 p.auth_algo_vpp_id, p.auth_key,
898                                 p.crypt_algo_vpp_id, p.crypt_key,
899                                 self.vpp_esp_protocol,
900                                 self.tun_if.remote_addr[p.addr_type],
901                                 self.tun_if.local_addr[p.addr_type],
902                                 flags=p.flags)
903        p.tun_sa_in.add_vpp_config()
904
905    def config_protect(self, p):
906        p.tun_protect = VppIpsecTunProtect(self,
907                                           p.tun_if,
908                                           p.tun_sa_out,
909                                           [p.tun_sa_in])
910        p.tun_protect.add_vpp_config()
911
912    def config_network(self, p):
913        p.tun_if = VppIpIpTunInterface(self, self.pg0,
914                                       self.pg0.local_ip4,
915                                       self.pg0.remote_ip4)
916        p.tun_if.add_vpp_config()
917        p.tun_if.admin_up()
918        p.tun_if.config_ip4()
919        p.tun_if.config_ip6()
920
921        p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
922                             [VppRoutePath(p.tun_if.remote_ip4,
923                                           0xffffffff)])
924        p.route.add_vpp_config()
925        r = VppIpRoute(self, p.remote_tun_if_host6, 128,
926                       [VppRoutePath(p.tun_if.remote_ip6,
927                                     0xffffffff,
928                                     proto=DpoProto.DPO_PROTO_IP6)])
929        r.add_vpp_config()
930
931    def unconfig_network(self, p):
932        p.route.remove_vpp_config()
933        p.tun_if.remove_vpp_config()
934
935    def unconfig_protect(self, p):
936        p.tun_protect.remove_vpp_config()
937
938    def unconfig_sa(self, p):
939        p.tun_sa_out.remove_vpp_config()
940        p.tun_sa_in.remove_vpp_config()
941
942
943class TestIpsec4TunProtect(TemplateIpsec,
944                           TemplateIpsec4TunProtect,
945                           IpsecTun4):
946    """ IPsec IPv4 Tunnel protect - transport mode"""
947
948    def setUp(self):
949        super(TestIpsec4TunProtect, self).setUp()
950
951        self.tun_if = self.pg0
952
953    def tearDown(self):
954        super(TestIpsec4TunProtect, self).tearDown()
955
956    def test_tun_44(self):
957        """IPSEC tunnel protect"""
958
959        p = self.ipv4_params
960
961        self.config_network(p)
962        self.config_sa_tra(p)
963        self.config_protect(p)
964
965        self.verify_tun_44(p, count=127)
966        c = p.tun_if.get_rx_stats()
967        self.assertEqual(c['packets'], 127)
968        c = p.tun_if.get_tx_stats()
969        self.assertEqual(c['packets'], 127)
970
971        self.vapi.cli("clear ipsec sa")
972        self.verify_tun_64(p, count=127)
973        c = p.tun_if.get_rx_stats()
974        self.assertEqual(c['packets'], 254)
975        c = p.tun_if.get_tx_stats()
976        self.assertEqual(c['packets'], 254)
977
978        # rekey - create new SAs and update the tunnel protection
979        np = copy.copy(p)
980        np.crypt_key = b'X' + p.crypt_key[1:]
981        np.scapy_tun_spi += 100
982        np.scapy_tun_sa_id += 1
983        np.vpp_tun_spi += 100
984        np.vpp_tun_sa_id += 1
985        np.tun_if.local_spi = p.vpp_tun_spi
986        np.tun_if.remote_spi = p.scapy_tun_spi
987
988        self.config_sa_tra(np)
989        self.config_protect(np)
990        self.unconfig_sa(p)
991
992        self.verify_tun_44(np, count=127)
993        c = p.tun_if.get_rx_stats()
994        self.assertEqual(c['packets'], 381)
995        c = p.tun_if.get_tx_stats()
996        self.assertEqual(c['packets'], 381)
997
998        # teardown
999        self.unconfig_protect(np)
1000        self.unconfig_sa(np)
1001        self.unconfig_network(p)
1002
1003
1004class TestIpsec4TunProtectUdp(TemplateIpsec,
1005                              TemplateIpsec4TunProtect,
1006                              IpsecTun4):
1007    """ IPsec IPv4 Tunnel protect - transport mode"""
1008
1009    def setUp(self):
1010        super(TestIpsec4TunProtectUdp, self).setUp()
1011
1012        self.tun_if = self.pg0
1013
1014        p = self.ipv4_params
1015        p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1016                   IPSEC_API_SAD_FLAG_UDP_ENCAP)
1017        p.nat_header = UDP(sport=5454, dport=4500)
1018        self.config_network(p)
1019        self.config_sa_tra(p)
1020        self.config_protect(p)
1021
1022    def tearDown(self):
1023        p = self.ipv4_params
1024        self.unconfig_protect(p)
1025        self.unconfig_sa(p)
1026        self.unconfig_network(p)
1027        super(TestIpsec4TunProtectUdp, self).tearDown()
1028
1029    def test_tun_44(self):
1030        """IPSEC UDP tunnel protect"""
1031
1032        p = self.ipv4_params
1033
1034        self.verify_tun_44(p, count=127)
1035        c = p.tun_if.get_rx_stats()
1036        self.assertEqual(c['packets'], 127)
1037        c = p.tun_if.get_tx_stats()
1038        self.assertEqual(c['packets'], 127)
1039
1040    def test_keepalive(self):
1041        """ IPSEC NAT Keepalive """
1042        self.verify_keepalive(self.ipv4_params)
1043
1044
1045class TestIpsec4TunProtectTun(TemplateIpsec,
1046                              TemplateIpsec4TunProtect,
1047                              IpsecTun4):
1048    """ IPsec IPv4 Tunnel protect - tunnel mode"""
1049
1050    encryption_type = ESP
1051    tun4_encrypt_node_name = "esp4-encrypt-tun"
1052    tun4_decrypt_node_name = "esp4-decrypt-tun"
1053
1054    def setUp(self):
1055        super(TestIpsec4TunProtectTun, self).setUp()
1056
1057        self.tun_if = self.pg0
1058
1059    def tearDown(self):
1060        super(TestIpsec4TunProtectTun, self).tearDown()
1061
1062    def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
1063                         payload_size=100):
1064        return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1065                sa.encrypt(IP(src=sw_intf.remote_ip4,
1066                              dst=sw_intf.local_ip4) /
1067                           IP(src=src, dst=dst) /
1068                           UDP(sport=1144, dport=2233) /
1069                           Raw(b'X' * payload_size))
1070                for i in range(count)]
1071
1072    def gen_pkts(self, sw_intf, src, dst, count=1,
1073                 payload_size=100):
1074        return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1075                IP(src=src, dst=dst) /
1076                UDP(sport=1144, dport=2233) /
1077                Raw(b'X' * payload_size)
1078                for i in range(count)]
1079
1080    def verify_decrypted(self, p, rxs):
1081        for rx in rxs:
1082            self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1083            self.assert_equal(rx[IP].src, p.remote_tun_if_host)
1084            self.assert_packet_checksums_valid(rx)
1085
1086    def verify_encrypted(self, p, sa, rxs):
1087        for rx in rxs:
1088            try:
1089                pkt = sa.decrypt(rx[IP])
1090                if not pkt.haslayer(IP):
1091                    pkt = IP(pkt[Raw].load)
1092                self.assert_packet_checksums_valid(pkt)
1093                self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1094                self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1095                inner = pkt[IP].payload
1096                self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
1097
1098            except (IndexError, AssertionError):
1099                self.logger.debug(ppp("Unexpected packet:", rx))
1100                try:
1101                    self.logger.debug(ppp("Decrypted packet:", pkt))
1102                except:
1103                    pass
1104                raise
1105
1106    def test_tun_44(self):
1107        """IPSEC tunnel protect """
1108
1109        p = self.ipv4_params
1110
1111        self.config_network(p)
1112        self.config_sa_tun(p)
1113        self.config_protect(p)
1114
1115        self.verify_tun_44(p, count=127)
1116
1117        c = p.tun_if.get_rx_stats()
1118        self.assertEqual(c['packets'], 127)
1119        c = p.tun_if.get_tx_stats()
1120        self.assertEqual(c['packets'], 127)
1121
1122        # rekey - create new SAs and update the tunnel protection
1123        np = copy.copy(p)
1124        np.crypt_key = b'X' + p.crypt_key[1:]
1125        np.scapy_tun_spi += 100
1126        np.scapy_tun_sa_id += 1
1127        np.vpp_tun_spi += 100
1128        np.vpp_tun_sa_id += 1
1129        np.tun_if.local_spi = p.vpp_tun_spi
1130        np.tun_if.remote_spi = p.scapy_tun_spi
1131
1132        self.config_sa_tun(np)
1133        self.config_protect(np)
1134        self.unconfig_sa(p)
1135
1136        self.verify_tun_44(np, count=127)
1137        c = p.tun_if.get_rx_stats()
1138        self.assertEqual(c['packets'], 254)
1139        c = p.tun_if.get_tx_stats()
1140        self.assertEqual(c['packets'], 254)
1141
1142        # teardown
1143        self.unconfig_protect(np)
1144        self.unconfig_sa(np)
1145        self.unconfig_network(p)
1146
1147
1148class TemplateIpsec6TunProtect(object):
1149    """ IPsec IPv6 Tunnel protect """
1150
1151    def config_sa_tra(self, p):
1152        config_tun_params(p, self.encryption_type, p.tun_if)
1153
1154        p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1155                                  p.auth_algo_vpp_id, p.auth_key,
1156                                  p.crypt_algo_vpp_id, p.crypt_key,
1157                                  self.vpp_esp_protocol)
1158        p.tun_sa_out.add_vpp_config()
1159
1160        p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1161                                 p.auth_algo_vpp_id, p.auth_key,
1162                                 p.crypt_algo_vpp_id, p.crypt_key,
1163                                 self.vpp_esp_protocol)
1164        p.tun_sa_in.add_vpp_config()
1165
1166    def config_sa_tun(self, p):
1167        config_tun_params(p, self.encryption_type, p.tun_if)
1168
1169        p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1170                                  p.auth_algo_vpp_id, p.auth_key,
1171                                  p.crypt_algo_vpp_id, p.crypt_key,
1172                                  self.vpp_esp_protocol,
1173                                  self.tun_if.remote_addr[p.addr_type],
1174                                  self.tun_if.local_addr[p.addr_type])
1175        p.tun_sa_out.add_vpp_config()
1176
1177        p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1178                                 p.auth_algo_vpp_id, p.auth_key,
1179                                 p.crypt_algo_vpp_id, p.crypt_key,
1180                                 self.vpp_esp_protocol,
1181                                 self.tun_if.remote_addr[p.addr_type],
1182                                 self.tun_if.local_addr[p.addr_type])
1183        p.tun_sa_in.add_vpp_config()
1184
1185    def config_protect(self, p):
1186        p.tun_protect = VppIpsecTunProtect(self,
1187                                           p.tun_if,
1188                                           p.tun_sa_out,
1189                                           [p.tun_sa_in])
1190        p.tun_protect.add_vpp_config()
1191
1192    def config_network(self, p):
1193        p.tun_if = VppIpIpTunInterface(self, self.pg0,
1194                                       self.pg0.local_ip6,
1195                                       self.pg0.remote_ip6)
1196        p.tun_if.add_vpp_config()
1197        p.tun_if.admin_up()
1198        p.tun_if.config_ip6()
1199        p.tun_if.config_ip4()
1200
1201        p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
1202                             [VppRoutePath(p.tun_if.remote_ip6,
1203                                           0xffffffff,
1204                                           proto=DpoProto.DPO_PROTO_IP6)])
1205        p.route.add_vpp_config()
1206        r = VppIpRoute(self, p.remote_tun_if_host4, 32,
1207                       [VppRoutePath(p.tun_if.remote_ip4,
1208                                     0xffffffff)])
1209        r.add_vpp_config()
1210
1211    def unconfig_network(self, p):
1212        p.route.remove_vpp_config()
1213        p.tun_if.remove_vpp_config()
1214
1215    def unconfig_protect(self, p):
1216        p.tun_protect.remove_vpp_config()
1217
1218    def unconfig_sa(self, p):
1219        p.tun_sa_out.remove_vpp_config()
1220        p.tun_sa_in.remove_vpp_config()
1221
1222
1223class TestIpsec6TunProtect(TemplateIpsec,
1224                           TemplateIpsec6TunProtect,
1225                           IpsecTun6):
1226    """ IPsec IPv6 Tunnel protect - transport mode"""
1227
1228    encryption_type = ESP
1229    tun6_encrypt_node_name = "esp6-encrypt-tun"
1230    tun6_decrypt_node_name = "esp6-decrypt-tun"
1231
1232    def setUp(self):
1233        super(TestIpsec6TunProtect, self).setUp()
1234
1235        self.tun_if = self.pg0
1236
1237    def tearDown(self):
1238        super(TestIpsec6TunProtect, self).tearDown()
1239
1240    def test_tun_66(self):
1241        """IPSEC tunnel protect"""
1242
1243        p = self.ipv6_params
1244
1245        self.config_network(p)
1246        self.config_sa_tra(p)
1247        self.config_protect(p)
1248
1249        self.verify_tun_66(p, count=127)
1250        c = p.tun_if.get_rx_stats()
1251        self.assertEqual(c['packets'], 127)
1252        c = p.tun_if.get_tx_stats()
1253        self.assertEqual(c['packets'], 127)
1254
1255        # rekey - create new SAs and update the tunnel protection
1256        np = copy.copy(p)
1257        np.crypt_key = b'X' + p.crypt_key[1:]
1258        np.scapy_tun_spi += 100
1259        np.scapy_tun_sa_id += 1
1260        np.vpp_tun_spi += 100
1261        np.vpp_tun_sa_id += 1
1262        np.tun_if.local_spi = p.vpp_tun_spi
1263        np.tun_if.remote_spi = p.scapy_tun_spi
1264
1265        self.config_sa_tra(np)
1266        self.config_protect(np)
1267        self.unconfig_sa(p)
1268
1269        self.verify_tun_66(np, count=127)
1270        c = p.tun_if.get_rx_stats()
1271        self.assertEqual(c['packets'], 254)
1272        c = p.tun_if.get_tx_stats()
1273        self.assertEqual(c['packets'], 254)
1274
1275        # 3 phase rekey
1276        #  1) add two input SAs [old, new]
1277        #  2) swap output SA to [new]
1278        #  3) use only [new] input SA
1279        np3 = copy.copy(np)
1280        np3.crypt_key = b'Z' + p.crypt_key[1:]
1281        np3.scapy_tun_spi += 100
1282        np3.scapy_tun_sa_id += 1
1283        np3.vpp_tun_spi += 100
1284        np3.vpp_tun_sa_id += 1
1285        np3.tun_if.local_spi = p.vpp_tun_spi
1286        np3.tun_if.remote_spi = p.scapy_tun_spi
1287
1288        self.config_sa_tra(np3)
1289
1290        # step 1;
1291        p.tun_protect.update_vpp_config(np.tun_sa_out,
1292                                        [np.tun_sa_in, np3.tun_sa_in])
1293        self.verify_tun_66(np, np, count=127)
1294        self.verify_tun_66(np3, np, count=127)
1295
1296        # step 2;
1297        p.tun_protect.update_vpp_config(np3.tun_sa_out,
1298                                        [np.tun_sa_in, np3.tun_sa_in])
1299        self.verify_tun_66(np, np3, count=127)
1300        self.verify_tun_66(np3, np3, count=127)
1301
1302        # step 1;
1303        p.tun_protect.update_vpp_config(np3.tun_sa_out,
1304                                        [np3.tun_sa_in])
1305        self.verify_tun_66(np3, np3, count=127)
1306        self.verify_drop_tun_66(np, count=127)
1307
1308        c = p.tun_if.get_rx_stats()
1309        self.assertEqual(c['packets'], 127*7)
1310        c = p.tun_if.get_tx_stats()
1311        self.assertEqual(c['packets'], 127*7)
1312        self.unconfig_sa(np)
1313
1314        # teardown
1315        self.unconfig_protect(np3)
1316        self.unconfig_sa(np3)
1317        self.unconfig_network(p)
1318
1319    def test_tun_46(self):
1320        """IPSEC tunnel protect"""
1321
1322        p = self.ipv6_params
1323
1324        self.config_network(p)
1325        self.config_sa_tra(p)
1326        self.config_protect(p)
1327
1328        self.verify_tun_46(p, count=127)
1329        c = p.tun_if.get_rx_stats()
1330        self.assertEqual(c['packets'], 127)
1331        c = p.tun_if.get_tx_stats()
1332        self.assertEqual(c['packets'], 127)
1333
1334        # teardown
1335        self.unconfig_protect(p)
1336        self.unconfig_sa(p)
1337        self.unconfig_network(p)
1338
1339
1340class TestIpsec6TunProtectTun(TemplateIpsec,
1341                              TemplateIpsec6TunProtect,
1342                              IpsecTun6):
1343    """ IPsec IPv6 Tunnel protect - tunnel mode"""
1344
1345    encryption_type = ESP
1346    tun6_encrypt_node_name = "esp6-encrypt-tun"
1347    tun6_decrypt_node_name = "esp6-decrypt-tun"
1348
1349    def setUp(self):
1350        super(TestIpsec6TunProtectTun, self).setUp()
1351
1352        self.tun_if = self.pg0
1353
1354    def tearDown(self):
1355        super(TestIpsec6TunProtectTun, self).tearDown()
1356
1357    def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
1358                          payload_size=100):
1359        return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1360                sa.encrypt(IPv6(src=sw_intf.remote_ip6,
1361                                dst=sw_intf.local_ip6) /
1362                           IPv6(src=src, dst=dst) /
1363                           UDP(sport=1166, dport=2233) /
1364                           Raw(b'X' * payload_size))
1365                for i in range(count)]
1366
1367    def gen_pkts6(self, sw_intf, src, dst, count=1,
1368                  payload_size=100):
1369        return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1370                IPv6(src=src, dst=dst) /
1371                UDP(sport=1166, dport=2233) /
1372                Raw(b'X' * payload_size)
1373                for i in range(count)]
1374
1375    def verify_decrypted6(self, p, rxs):
1376        for rx in rxs:
1377            self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1378            self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
1379            self.assert_packet_checksums_valid(rx)
1380
1381    def verify_encrypted6(self, p, sa, rxs):
1382        for rx in rxs:
1383            try:
1384                pkt = sa.decrypt(rx[IPv6])
1385                if not pkt.haslayer(IPv6):
1386                    pkt = IPv6(pkt[Raw].load)
1387                self.assert_packet_checksums_valid(pkt)
1388                self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
1389                self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
1390                inner = pkt[IPv6].payload
1391                self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
1392
1393            except (IndexError, AssertionError):
1394                self.logger.debug(ppp("Unexpected packet:", rx))
1395                try:
1396                    self.logger.debug(ppp("Decrypted packet:", pkt))
1397                except:
1398                    pass
1399                raise
1400
1401    def test_tun_66(self):
1402        """IPSEC tunnel protect """
1403
1404        p = self.ipv6_params
1405
1406        self.config_network(p)
1407        self.config_sa_tun(p)
1408        self.config_protect(p)
1409
1410        self.verify_tun_66(p, count=127)
1411
1412        c = p.tun_if.get_rx_stats()
1413        self.assertEqual(c['packets'], 127)
1414        c = p.tun_if.get_tx_stats()
1415        self.assertEqual(c['packets'], 127)
1416
1417        # rekey - create new SAs and update the tunnel protection
1418        np = copy.copy(p)
1419        np.crypt_key = b'X' + p.crypt_key[1:]
1420        np.scapy_tun_spi += 100
1421        np.scapy_tun_sa_id += 1
1422        np.vpp_tun_spi += 100
1423        np.vpp_tun_sa_id += 1
1424        np.tun_if.local_spi = p.vpp_tun_spi
1425        np.tun_if.remote_spi = p.scapy_tun_spi
1426
1427        self.config_sa_tun(np)
1428        self.config_protect(np)
1429        self.unconfig_sa(p)
1430
1431        self.verify_tun_66(np, count=127)
1432        c = p.tun_if.get_rx_stats()
1433        self.assertEqual(c['packets'], 254)
1434        c = p.tun_if.get_tx_stats()
1435        self.assertEqual(c['packets'], 254)
1436
1437        # teardown
1438        self.unconfig_protect(np)
1439        self.unconfig_sa(np)
1440        self.unconfig_network(p)
1441
1442
1443if __name__ == '__main__':
1444    unittest.main(testRunner=VppTestRunner)
1445