template_ipsec.py revision 12989b53
1import unittest
2import socket
3import struct
4
5from scapy.layers.inet import IP, ICMP, TCP, UDP
6from scapy.layers.ipsec import SecurityAssociation, ESP
7from scapy.layers.l2 import Ether, Raw
8from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
9
10from framework import VppTestCase, VppTestRunner
11from util import ppp, reassemble4, fragment_rfc791, fragment_rfc8200
12from vpp_papi import VppEnum
13
14
15class IPsecIPv4Params(object):
16
17    addr_type = socket.AF_INET
18    addr_any = "0.0.0.0"
19    addr_bcast = "255.255.255.255"
20    addr_len = 32
21    is_ipv6 = 0
22
23    def __init__(self):
24        self.remote_tun_if_host = '1.1.1.1'
25        self.remote_tun_if_host6 = '1111::1'
26
27        self.scapy_tun_sa_id = 10
28        self.scapy_tun_spi = 1001
29        self.vpp_tun_sa_id = 20
30        self.vpp_tun_spi = 1000
31
32        self.scapy_tra_sa_id = 30
33        self.scapy_tra_spi = 2001
34        self.vpp_tra_sa_id = 40
35        self.vpp_tra_spi = 2000
36
37        self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
38                                 IPSEC_API_INTEG_ALG_SHA1_96)
39        self.auth_algo = 'HMAC-SHA1-96'  # scapy name
40        self.auth_key = b'C91KUR9GYMm5GfkEvNjX'
41
42        self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
43                                  IPSEC_API_CRYPTO_ALG_AES_CBC_128)
44        self.crypt_algo = 'AES-CBC'  # scapy name
45        self.crypt_key = b'JPjyOWBeVEQiMe7h'
46        self.salt = 0
47        self.flags = 0
48        self.nat_header = None
49
50
51class IPsecIPv6Params(object):
52
53    addr_type = socket.AF_INET6
54    addr_any = "0::0"
55    addr_bcast = "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"
56    addr_len = 128
57    is_ipv6 = 1
58
59    def __init__(self):
60        self.remote_tun_if_host = '1111:1111:1111:1111:1111:1111:1111:1111'
61        self.remote_tun_if_host4 = '1.1.1.1'
62
63        self.scapy_tun_sa_id = 50
64        self.scapy_tun_spi = 3001
65        self.vpp_tun_sa_id = 60
66        self.vpp_tun_spi = 3000
67
68        self.scapy_tra_sa_id = 70
69        self.scapy_tra_spi = 4001
70        self.vpp_tra_sa_id = 80
71        self.vpp_tra_spi = 4000
72
73        self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
74                                 IPSEC_API_INTEG_ALG_SHA1_96)
75        self.auth_algo = 'HMAC-SHA1-96'  # scapy name
76        self.auth_key = b'C91KUR9GYMm5GfkEvNjX'
77
78        self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
79                                  IPSEC_API_CRYPTO_ALG_AES_CBC_128)
80        self.crypt_algo = 'AES-CBC'  # scapy name
81        self.crypt_key = b'JPjyOWBeVEQiMe7h'
82        self.salt = 0
83        self.flags = 0
84        self.nat_header = None
85
86
87def mk_scapy_crypt_key(p):
88    if p.crypt_algo == "AES-GCM":
89        return p.crypt_key + struct.pack("!I", p.salt)
90    else:
91        return p.crypt_key
92
93
94def config_tun_params(p, encryption_type, tun_if):
95    ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
96    use_esn = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
97                              IPSEC_API_SAD_FLAG_USE_ESN))
98    crypt_key = mk_scapy_crypt_key(p)
99    p.scapy_tun_sa = SecurityAssociation(
100        encryption_type, spi=p.vpp_tun_spi,
101        crypt_algo=p.crypt_algo,
102        crypt_key=crypt_key,
103        auth_algo=p.auth_algo, auth_key=p.auth_key,
104        tunnel_header=ip_class_by_addr_type[p.addr_type](
105            src=tun_if.remote_addr[p.addr_type],
106            dst=tun_if.local_addr[p.addr_type]),
107        nat_t_header=p.nat_header,
108        use_esn=use_esn)
109    p.vpp_tun_sa = SecurityAssociation(
110        encryption_type, spi=p.scapy_tun_spi,
111        crypt_algo=p.crypt_algo,
112        crypt_key=crypt_key,
113        auth_algo=p.auth_algo, auth_key=p.auth_key,
114        tunnel_header=ip_class_by_addr_type[p.addr_type](
115            dst=tun_if.remote_addr[p.addr_type],
116            src=tun_if.local_addr[p.addr_type]),
117        nat_t_header=p.nat_header,
118        use_esn=use_esn)
119
120
121def config_tra_params(p, encryption_type):
122    use_esn = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
123                              IPSEC_API_SAD_FLAG_USE_ESN))
124    crypt_key = mk_scapy_crypt_key(p)
125    p.scapy_tra_sa = SecurityAssociation(
126        encryption_type,
127        spi=p.vpp_tra_spi,
128        crypt_algo=p.crypt_algo,
129        crypt_key=crypt_key,
130        auth_algo=p.auth_algo,
131        auth_key=p.auth_key,
132        nat_t_header=p.nat_header,
133        use_esn=use_esn)
134    p.vpp_tra_sa = SecurityAssociation(
135        encryption_type,
136        spi=p.scapy_tra_spi,
137        crypt_algo=p.crypt_algo,
138        crypt_key=crypt_key,
139        auth_algo=p.auth_algo,
140        auth_key=p.auth_key,
141        nat_t_header=p.nat_header,
142        use_esn=use_esn)
143
144
145class TemplateIpsec(VppTestCase):
146    """
147    TRANSPORT MODE:
148
149     ------   encrypt   ---
150    |tra_if| <-------> |VPP|
151     ------   decrypt   ---
152
153    TUNNEL MODE:
154
155     ------   encrypt   ---   plain   ---
156    |tun_if| <-------  |VPP| <------ |pg1|
157     ------             ---           ---
158
159     ------   decrypt   ---   plain   ---
160    |tun_if| ------->  |VPP| ------> |pg1|
161     ------             ---           ---
162    """
163    tun_spd_id = 1
164    tra_spd_id = 2
165
166    def ipsec_select_backend(self):
167        """ empty method to be overloaded when necessary """
168        pass
169
170    @classmethod
171    def setUpClass(cls):
172        super(TemplateIpsec, cls).setUpClass()
173
174    @classmethod
175    def tearDownClass(cls):
176        super(TemplateIpsec, cls).tearDownClass()
177
178    def setup_params(self):
179        self.ipv4_params = IPsecIPv4Params()
180        self.ipv6_params = IPsecIPv6Params()
181        self.params = {self.ipv4_params.addr_type: self.ipv4_params,
182                       self.ipv6_params.addr_type: self.ipv6_params}
183
184    def config_interfaces(self):
185        self.create_pg_interfaces(range(3))
186        self.interfaces = list(self.pg_interfaces)
187        for i in self.interfaces:
188            i.admin_up()
189            i.config_ip4()
190            i.resolve_arp()
191            i.config_ip6()
192            i.resolve_ndp()
193
194    def setUp(self):
195        super(TemplateIpsec, self).setUp()
196
197        self.setup_params()
198
199        self.vpp_esp_protocol = (VppEnum.vl_api_ipsec_proto_t.
200                                 IPSEC_API_PROTO_ESP)
201        self.vpp_ah_protocol = (VppEnum.vl_api_ipsec_proto_t.
202                                IPSEC_API_PROTO_AH)
203
204        self.config_interfaces()
205
206        self.ipsec_select_backend()
207
208    def unconfig_interfaces(self):
209        for i in self.interfaces:
210            i.admin_down()
211            i.unconfig_ip4()
212            i.unconfig_ip6()
213
214    def tearDown(self):
215        super(TemplateIpsec, self).tearDown()
216
217        self.unconfig_interfaces()
218
219    def show_commands_at_teardown(self):
220        self.logger.info(self.vapi.cli("show hardware"))
221
222    def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
223                         payload_size=54):
224        return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
225                sa.encrypt(IP(src=src, dst=dst) /
226                           ICMP() / Raw(b'X' * payload_size))
227                for i in range(count)]
228
229    def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
230                          payload_size=54):
231        return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
232                sa.encrypt(IPv6(src=src, dst=dst) /
233                           ICMPv6EchoRequest(id=0, seq=1,
234                                             data='X' * payload_size))
235                for i in range(count)]
236
237    def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
238        return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
239                IP(src=src, dst=dst) / ICMP() / Raw(b'X' * payload_size)
240                for i in range(count)]
241
242    def gen_pkts6(self, sw_intf, src, dst, count=1, payload_size=54):
243        return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
244                IPv6(src=src, dst=dst) /
245                ICMPv6EchoRequest(id=0, seq=1, data='X' * payload_size)
246                for i in range(count)]
247
248
249class IpsecTcp(object):
250    def verify_tcp_checksum(self):
251        self.vapi.cli("test http server")
252        p = self.params[socket.AF_INET]
253        send = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
254                p.scapy_tun_sa.encrypt(IP(src=p.remote_tun_if_host,
255                                          dst=self.tun_if.local_ip4) /
256                                       TCP(flags='S', dport=80)))
257        self.logger.debug(ppp("Sending packet:", send))
258        recv = self.send_and_expect(self.tun_if, [send], self.tun_if)
259        recv = recv[0]
260        decrypted = p.vpp_tun_sa.decrypt(recv[IP])
261        self.assert_packet_checksums_valid(decrypted)
262
263
264class IpsecTcpTests(IpsecTcp):
265    def test_tcp_checksum(self):
266        """ verify checksum correctness for vpp generated packets """
267        self.verify_tcp_checksum()
268
269
270class IpsecTra4(object):
271    """ verify methods for Transport v4 """
272    def verify_tra_anti_replay(self):
273        p = self.params[socket.AF_INET]
274        use_esn = p.vpp_tra_sa.use_esn
275
276        seq_cycle_node_name = ('/err/%s/sequence number cycled' %
277                               self.tra4_encrypt_node_name)
278        replay_node_name = ('/err/%s/SA replayed packet' %
279                            self.tra4_decrypt_node_name)
280        if ESP == self.encryption_type and p.crypt_algo == "AES-GCM":
281            hash_failed_node_name = ('/err/%s/ESP decryption failed' %
282                                     self.tra4_decrypt_node_name)
283        else:
284            hash_failed_node_name = ('/err/%s/Integrity check failed' %
285                                     self.tra4_decrypt_node_name)
286        replay_count = self.statistics.get_err_counter(replay_node_name)
287        hash_failed_count = self.statistics.get_err_counter(
288            hash_failed_node_name)
289        seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
290
291        if ESP == self.encryption_type:
292            undersize_node_name = ('/err/%s/undersized packet' %
293                                   self.tra4_decrypt_node_name)
294            undersize_count = self.statistics.get_err_counter(
295                undersize_node_name)
296
297        #
298        # send packets with seq numbers 1->34
299        # this means the window size is still in Case B (see RFC4303
300        # Appendix A)
301        #
302        # for reasons i haven't investigated Scapy won't create a packet with
303        # seq_num=0
304        #
305        pkts = [(Ether(src=self.tra_if.remote_mac,
306                       dst=self.tra_if.local_mac) /
307                 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
308                                           dst=self.tra_if.local_ip4) /
309                                        ICMP(),
310                                        seq_num=seq))
311                for seq in range(1, 34)]
312        recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
313
314        # replayed packets are dropped
315        self.send_and_assert_no_replies(self.tra_if, pkts)
316        replay_count += len(pkts)
317        self.assert_error_counter_equal(replay_node_name, replay_count)
318
319        #
320        # now send a batch of packets all with the same sequence number
321        # the first packet in the batch is legitimate, the rest bogus
322        #
323        pkts = (Ether(src=self.tra_if.remote_mac,
324                      dst=self.tra_if.local_mac) /
325                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
326                                          dst=self.tra_if.local_ip4) /
327                                       ICMP(),
328                                       seq_num=35))
329        recv_pkts = self.send_and_expect(self.tra_if, pkts * 8,
330                                         self.tra_if, n_rx=1)
331        replay_count += 7
332        self.assert_error_counter_equal(replay_node_name, replay_count)
333
334        #
335        # now move the window over to 257 (more than one byte) and into Case A
336        #
337        pkt = (Ether(src=self.tra_if.remote_mac,
338                     dst=self.tra_if.local_mac) /
339               p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
340                                         dst=self.tra_if.local_ip4) /
341                                      ICMP(),
342                                      seq_num=257))
343        recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
344
345        # replayed packets are dropped
346        self.send_and_assert_no_replies(self.tra_if, pkt * 3)
347        replay_count += 3
348        self.assert_error_counter_equal(replay_node_name, replay_count)
349
350        # the window size is 64 packets
351        # in window are still accepted
352        pkt = (Ether(src=self.tra_if.remote_mac,
353                     dst=self.tra_if.local_mac) /
354               p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
355                                         dst=self.tra_if.local_ip4) /
356                                      ICMP(),
357                                      seq_num=200))
358        recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
359
360        # a packet that does not decrypt does not move the window forward
361        bogus_sa = SecurityAssociation(self.encryption_type,
362                                       p.vpp_tra_spi,
363                                       crypt_algo=p.crypt_algo,
364                                       crypt_key=mk_scapy_crypt_key(p)[::-1],
365                                       auth_algo=p.auth_algo,
366                                       auth_key=p.auth_key[::-1])
367        pkt = (Ether(src=self.tra_if.remote_mac,
368                     dst=self.tra_if.local_mac) /
369               bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
370                                   dst=self.tra_if.local_ip4) /
371                                ICMP(),
372                                seq_num=350))
373        self.send_and_assert_no_replies(self.tra_if, pkt * 17)
374
375        hash_failed_count += 17
376        self.assert_error_counter_equal(hash_failed_node_name,
377                                        hash_failed_count)
378
379        # a malformed 'runt' packet
380        #  created by a mis-constructed SA
381        if (ESP == self.encryption_type and p.crypt_algo != "NULL"):
382            bogus_sa = SecurityAssociation(self.encryption_type,
383                                           p.vpp_tra_spi)
384            pkt = (Ether(src=self.tra_if.remote_mac,
385                         dst=self.tra_if.local_mac) /
386                   bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
387                                       dst=self.tra_if.local_ip4) /
388                                    ICMP(),
389                                    seq_num=350))
390            self.send_and_assert_no_replies(self.tra_if, pkt * 17)
391
392            undersize_count += 17
393            self.assert_error_counter_equal(undersize_node_name,
394                                            undersize_count)
395
396        # which we can determine since this packet is still in the window
397        pkt = (Ether(src=self.tra_if.remote_mac,
398                     dst=self.tra_if.local_mac) /
399               p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
400                                         dst=self.tra_if.local_ip4) /
401                                      ICMP(),
402                                      seq_num=234))
403        self.send_and_expect(self.tra_if, [pkt], self.tra_if)
404
405        #
406        # out of window are dropped
407        #  this is Case B. So VPP will consider this to be a high seq num wrap
408        #  and so the decrypt attempt will fail
409        #
410        pkt = (Ether(src=self.tra_if.remote_mac,
411                     dst=self.tra_if.local_mac) /
412               p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
413                                         dst=self.tra_if.local_ip4) /
414                                      ICMP(),
415                                      seq_num=17))
416        self.send_and_assert_no_replies(self.tra_if, pkt * 17)
417
418        if use_esn:
419            # an out of window error with ESN looks like a high sequence
420            # wrap. but since it isn't then the verify will fail.
421            hash_failed_count += 17
422            self.assert_error_counter_equal(hash_failed_node_name,
423                                            hash_failed_count)
424
425        else:
426            replay_count += 17
427            self.assert_error_counter_equal(replay_node_name,
428                                            replay_count)
429
430        # valid packet moves the window over to 258
431        pkt = (Ether(src=self.tra_if.remote_mac,
432                     dst=self.tra_if.local_mac) /
433               p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
434                                         dst=self.tra_if.local_ip4) /
435                                      ICMP(),
436                                      seq_num=258))
437        rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
438        decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
439
440        #
441        # move VPP's SA TX seq-num to just before the seq-number wrap.
442        # then fire in a packet that VPP should drop on TX because it
443        # causes the TX seq number to wrap; unless we're using extened sequence
444        # numbers.
445        #
446        self.vapi.cli("test ipsec sa %d seq 0xffffffff" % p.scapy_tra_sa_id)
447        self.logger.info(self.vapi.ppcli("show ipsec sa 0"))
448        self.logger.info(self.vapi.ppcli("show ipsec sa 1"))
449
450        pkts = [(Ether(src=self.tra_if.remote_mac,
451                       dst=self.tra_if.local_mac) /
452                 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
453                                           dst=self.tra_if.local_ip4) /
454                                        ICMP(),
455                                        seq_num=seq))
456                for seq in range(259, 280)]
457
458        if use_esn:
459            rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
460
461            #
462            # in order for scapy to decrypt its SA's high order number needs
463            # to wrap
464            #
465            p.vpp_tra_sa.seq_num = 0x100000000
466            for rx in rxs:
467                decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
468
469            #
470            # wrap scapy's TX high sequence number. VPP is in case B, so it
471            # will consider this a high seq wrap also.
472            # The low seq num we set it to will place VPP's RX window in Case A
473            #
474            p.scapy_tra_sa.seq_num = 0x100000005
475            pkt = (Ether(src=self.tra_if.remote_mac,
476                         dst=self.tra_if.local_mac) /
477                   p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
478                                             dst=self.tra_if.local_ip4) /
479                                          ICMP(),
480                                          seq_num=0x100000005))
481            rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
482            decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
483
484            #
485            # A packet that has seq num between (2^32-64) and 5 is within
486            # the window
487            #
488            p.scapy_tra_sa.seq_num = 0xfffffffd
489            pkt = (Ether(src=self.tra_if.remote_mac,
490                         dst=self.tra_if.local_mac) /
491                   p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
492                                             dst=self.tra_if.local_ip4) /
493                                          ICMP(),
494                                          seq_num=0xfffffffd))
495            rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
496            decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
497
498            #
499            # While in case A we cannot wrap the high sequence number again
500            # becuase VPP will consider this packet to be one that moves the
501            # window forward
502            #
503            pkt = (Ether(src=self.tra_if.remote_mac,
504                         dst=self.tra_if.local_mac) /
505                   p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
506                                             dst=self.tra_if.local_ip4) /
507                                          ICMP(),
508                                          seq_num=0x200000999))
509            self.send_and_assert_no_replies(self.tra_if, [pkt], self.tra_if)
510
511            hash_failed_count += 1
512            self.assert_error_counter_equal(hash_failed_node_name,
513                                            hash_failed_count)
514
515            #
516            # but if we move the wondow forward to case B, then we can wrap
517            # again
518            #
519            p.scapy_tra_sa.seq_num = 0x100000555
520            pkt = (Ether(src=self.tra_if.remote_mac,
521                         dst=self.tra_if.local_mac) /
522                   p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
523                                             dst=self.tra_if.local_ip4) /
524                                          ICMP(),
525                                          seq_num=0x100000555))
526            rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
527            decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
528
529            p.scapy_tra_sa.seq_num = 0x200000444
530            pkt = (Ether(src=self.tra_if.remote_mac,
531                         dst=self.tra_if.local_mac) /
532                   p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
533                                             dst=self.tra_if.local_ip4) /
534                                          ICMP(),
535                                          seq_num=0x200000444))
536            rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
537            decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
538
539        else:
540            #
541            # without ESN TX sequence numbers can't wrap and packets are
542            # dropped from here on out.
543            #
544            self.send_and_assert_no_replies(self.tra_if, pkts)
545            seq_cycle_count += len(pkts)
546            self.assert_error_counter_equal(seq_cycle_node_name,
547                                            seq_cycle_count)
548
549        # move the security-associations seq number on to the last we used
550        self.vapi.cli("test ipsec sa %d seq 0x15f" % p.scapy_tra_sa_id)
551        p.scapy_tra_sa.seq_num = 351
552        p.vpp_tra_sa.seq_num = 351
553
554    def verify_tra_basic4(self, count=1):
555        """ ipsec v4 transport basic test """
556        self.vapi.cli("clear errors")
557        self.vapi.cli("clear ipsec sa")
558        try:
559            p = self.params[socket.AF_INET]
560            send_pkts = self.gen_encrypt_pkts(p.scapy_tra_sa, self.tra_if,
561                                              src=self.tra_if.remote_ip4,
562                                              dst=self.tra_if.local_ip4,
563                                              count=count)
564            recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
565                                             self.tra_if)
566            for rx in recv_pkts:
567                self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
568                self.assert_packet_checksums_valid(rx)
569                try:
570                    decrypted = p.vpp_tra_sa.decrypt(rx[IP])
571                    self.assert_packet_checksums_valid(decrypted)
572                except:
573                    self.logger.debug(ppp("Unexpected packet:", rx))
574                    raise
575        finally:
576            self.logger.info(self.vapi.ppcli("show error"))
577            self.logger.info(self.vapi.ppcli("show ipsec all"))
578
579        pkts = p.tra_sa_in.get_stats()['packets']
580        self.assertEqual(pkts, count,
581                         "incorrect SA in counts: expected %d != %d" %
582                         (count, pkts))
583        pkts = p.tra_sa_out.get_stats()['packets']
584        self.assertEqual(pkts, count,
585                         "incorrect SA out counts: expected %d != %d" %
586                         (count, pkts))
587
588        self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
589        self.assert_packet_counter_equal(self.tra4_decrypt_node_name, count)
590
591
592class IpsecTra4Tests(IpsecTra4):
593    """ UT test methods for Transport v4 """
594    def test_tra_anti_replay(self):
595        """ ipsec v4 transport anti-reply test """
596        self.verify_tra_anti_replay()
597
598    def test_tra_basic(self, count=1):
599        """ ipsec v4 transport basic test """
600        self.verify_tra_basic4(count=1)
601
602    def test_tra_burst(self):
603        """ ipsec v4 transport burst test """
604        self.verify_tra_basic4(count=257)
605
606
607class IpsecTra6(object):
608    """ verify methods for Transport v6 """
609    def verify_tra_basic6(self, count=1):
610        self.vapi.cli("clear errors")
611        try:
612            p = self.params[socket.AF_INET6]
613            send_pkts = self.gen_encrypt_pkts6(p.scapy_tra_sa, self.tra_if,
614                                               src=self.tra_if.remote_ip6,
615                                               dst=self.tra_if.local_ip6,
616                                               count=count)
617            recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
618                                             self.tra_if)
619            for rx in recv_pkts:
620                self.assertEqual(len(rx) - len(Ether()) - len(IPv6()),
621                                 rx[IPv6].plen)
622                try:
623                    decrypted = p.vpp_tra_sa.decrypt(rx[IPv6])
624                    self.assert_packet_checksums_valid(decrypted)
625                except:
626                    self.logger.debug(ppp("Unexpected packet:", rx))
627                    raise
628        finally:
629            self.logger.info(self.vapi.ppcli("show error"))
630            self.logger.info(self.vapi.ppcli("show ipsec all"))
631
632        pkts = p.tra_sa_in.get_stats()['packets']
633        self.assertEqual(pkts, count,
634                         "incorrect SA in counts: expected %d != %d" %
635                         (count, pkts))
636        pkts = p.tra_sa_out.get_stats()['packets']
637        self.assertEqual(pkts, count,
638                         "incorrect SA out counts: expected %d != %d" %
639                         (count, pkts))
640        self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
641        self.assert_packet_counter_equal(self.tra6_decrypt_node_name, count)
642
643
644class IpsecTra6Tests(IpsecTra6):
645    """ UT test methods for Transport v6 """
646    def test_tra_basic6(self):
647        """ ipsec v6 transport basic test """
648        self.verify_tra_basic6(count=1)
649
650    def test_tra_burst6(self):
651        """ ipsec v6 transport burst test """
652        self.verify_tra_basic6(count=257)
653
654
655class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
656    """ UT test methods for Transport v6 and v4"""
657    pass
658
659
660class IpsecTun4(object):
661    """ verify methods for Tunnel v4 """
662    def verify_counters4(self, p, count, n_frags=None):
663        if not n_frags:
664            n_frags = count
665        if (hasattr(p, "spd_policy_in_any")):
666            pkts = p.spd_policy_in_any.get_stats()['packets']
667            self.assertEqual(pkts, count,
668                             "incorrect SPD any policy: expected %d != %d" %
669                             (count, pkts))
670
671        if (hasattr(p, "tun_sa_in")):
672            pkts = p.tun_sa_in.get_stats()['packets']
673            self.assertEqual(pkts, count,
674                             "incorrect SA in counts: expected %d != %d" %
675                             (count, pkts))
676            pkts = p.tun_sa_out.get_stats()['packets']
677            self.assertEqual(pkts, count,
678                             "incorrect SA out counts: expected %d != %d" %
679                             (count, pkts))
680
681        self.assert_packet_counter_equal(self.tun4_encrypt_node_name, n_frags)
682        self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count)
683
684    def verify_decrypted(self, p, rxs):
685        for rx in rxs:
686            self.assert_equal(rx[IP].src, p.remote_tun_if_host)
687            self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
688            self.assert_packet_checksums_valid(rx)
689
690    def verify_encrypted(self, p, sa, rxs):
691        decrypt_pkts = []
692        for rx in rxs:
693            if p.nat_header:
694                self.assertEqual(rx[UDP].dport, 4500)
695            self.assert_packet_checksums_valid(rx)
696            self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
697            try:
698                decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IP])
699                if not decrypt_pkt.haslayer(IP):
700                    decrypt_pkt = IP(decrypt_pkt[Raw].load)
701                decrypt_pkts.append(decrypt_pkt)
702                self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
703                self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
704            except:
705                self.logger.debug(ppp("Unexpected packet:", rx))
706                try:
707                    self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
708                except:
709                    pass
710                raise
711        pkts = reassemble4(decrypt_pkts)
712        for pkt in pkts:
713            self.assert_packet_checksums_valid(pkt)
714
715    def verify_tun_44(self, p, count=1, payload_size=64, n_rx=None):
716        self.vapi.cli("clear errors")
717        if not n_rx:
718            n_rx = count
719        try:
720            send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
721                                              src=p.remote_tun_if_host,
722                                              dst=self.pg1.remote_ip4,
723                                              count=count)
724            recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
725            self.verify_decrypted(p, recv_pkts)
726
727            send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
728                                      dst=p.remote_tun_if_host, count=count,
729                                      payload_size=payload_size)
730            recv_pkts = self.send_and_expect(self.pg1, send_pkts,
731                                             self.tun_if, n_rx)
732            self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
733
734        finally:
735            self.logger.info(self.vapi.ppcli("show error"))
736            self.logger.info(self.vapi.ppcli("show ipsec all"))
737
738        self.verify_counters4(p, count, n_rx)
739
740    def verify_tun_reass_44(self, p):
741        self.vapi.cli("clear errors")
742        self.vapi.ip_reassembly_enable_disable(
743            sw_if_index=self.tun_if.sw_if_index, enable_ip4=True)
744
745        try:
746            send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
747                                              src=p.remote_tun_if_host,
748                                              dst=self.pg1.remote_ip4,
749                                              payload_size=1900,
750                                              count=1)
751            send_pkts = fragment_rfc791(send_pkts[0], 1400)
752            recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
753                                             self.pg1, n_rx=1)
754            self.verify_decrypted(p, recv_pkts)
755
756            send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
757                                      dst=p.remote_tun_if_host, count=1)
758            recv_pkts = self.send_and_expect(self.pg1, send_pkts,
759                                             self.tun_if)
760            self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
761
762        finally:
763            self.logger.info(self.vapi.ppcli("show error"))
764            self.logger.info(self.vapi.ppcli("show ipsec all"))
765
766        self.verify_counters4(p, 1, 1)
767        self.vapi.ip_reassembly_enable_disable(
768            sw_if_index=self.tun_if.sw_if_index, enable_ip4=False)
769
770    def verify_tun_64(self, p, count=1):
771        self.vapi.cli("clear errors")
772        try:
773            send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
774                                               src=p.remote_tun_if_host6,
775                                               dst=self.pg1.remote_ip6,
776                                               count=count)
777            recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
778            for recv_pkt in recv_pkts:
779                self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host6)
780                self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
781                self.assert_packet_checksums_valid(recv_pkt)
782            send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
783                                       dst=p.remote_tun_if_host6, count=count)
784            recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
785            for recv_pkt in recv_pkts:
786                try:
787                    decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP])
788                    if not decrypt_pkt.haslayer(IPv6):
789                        decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
790                    self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
791                    self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host6)
792                    self.assert_packet_checksums_valid(decrypt_pkt)
793                except:
794                    self.logger.error(ppp("Unexpected packet:", recv_pkt))
795                    try:
796                        self.logger.debug(
797                            ppp("Decrypted packet:", decrypt_pkt))
798                    except:
799                        pass
800                    raise
801        finally:
802            self.logger.info(self.vapi.ppcli("show error"))
803            self.logger.info(self.vapi.ppcli("show ipsec all"))
804
805        self.verify_counters4(p, count)
806
807    def verify_keepalive(self, p):
808        pkt = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
809               IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4) /
810               UDP(sport=333, dport=4500) /
811               Raw(b'\xff'))
812        self.send_and_assert_no_replies(self.tun_if, pkt*31)
813        self.assert_error_counter_equal(
814            '/err/%s/NAT Keepalive' % self.tun4_input_node, 31)
815
816        pkt = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
817               IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4) /
818               UDP(sport=333, dport=4500) /
819               Raw(b'\xfe'))
820        self.send_and_assert_no_replies(self.tun_if, pkt*31)
821        self.assert_error_counter_equal(
822            '/err/%s/Too Short' % self.tun4_input_node, 31)
823
824
825class IpsecTun4Tests(IpsecTun4):
826    """ UT test methods for Tunnel v4 """
827    def test_tun_basic44(self):
828        """ ipsec 4o4 tunnel basic test """
829        self.verify_tun_44(self.params[socket.AF_INET], count=1)
830
831    def test_tun_reass_basic44(self):
832        """ ipsec 4o4 tunnel basic reassembly test """
833        self.verify_tun_reass_44(self.params[socket.AF_INET])
834
835    def test_tun_burst44(self):
836        """ ipsec 4o4 tunnel burst test """
837        self.verify_tun_44(self.params[socket.AF_INET], count=257)
838
839
840class IpsecTun6(object):
841    """ verify methods for Tunnel v6 """
842    def verify_counters6(self, p_in, p_out, count):
843        if (hasattr(p_in, "tun_sa_in")):
844            pkts = p_in.tun_sa_in.get_stats()['packets']
845            self.assertEqual(pkts, count,
846                             "incorrect SA in counts: expected %d != %d" %
847                             (count, pkts))
848        if (hasattr(p_out, "tun_sa_out")):
849            pkts = p_out.tun_sa_out.get_stats()['packets']
850            self.assertEqual(pkts, count,
851                             "incorrect SA out counts: expected %d != %d" %
852                             (count, pkts))
853        self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
854        self.assert_packet_counter_equal(self.tun6_decrypt_node_name, count)
855
856    def verify_decrypted6(self, p, rxs):
857        for rx in rxs:
858            self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
859            self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
860            self.assert_packet_checksums_valid(rx)
861
862    def verify_encrypted6(self, p, sa, rxs):
863        for rx in rxs:
864            self.assert_packet_checksums_valid(rx)
865            self.assertEqual(len(rx) - len(Ether()) - len(IPv6()),
866                             rx[IPv6].plen)
867            try:
868                decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IPv6])
869                if not decrypt_pkt.haslayer(IPv6):
870                    decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
871                self.assert_packet_checksums_valid(decrypt_pkt)
872                self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
873                self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
874            except:
875                self.logger.debug(ppp("Unexpected packet:", rx))
876                try:
877                    self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
878                except:
879                    pass
880                raise
881
882    def verify_drop_tun_66(self, p_in, count=1, payload_size=64):
883        self.vapi.cli("clear errors")
884        self.vapi.cli("clear ipsec sa")
885
886        send_pkts = self.gen_encrypt_pkts6(p_in.scapy_tun_sa, self.tun_if,
887                                           src=p_in.remote_tun_if_host,
888                                           dst=self.pg1.remote_ip6,
889                                           count=count)
890        self.send_and_assert_no_replies(self.tun_if, send_pkts)
891        self.logger.info(self.vapi.cli("sh punt stats"))
892
893    def verify_tun_66(self, p_in, p_out=None, count=1, payload_size=64):
894        self.vapi.cli("clear errors")
895        self.vapi.cli("clear ipsec sa")
896        if not p_out:
897            p_out = p_in
898        try:
899            send_pkts = self.gen_encrypt_pkts6(p_in.scapy_tun_sa, self.tun_if,
900                                               src=p_in.remote_tun_if_host,
901                                               dst=self.pg1.remote_ip6,
902                                               count=count)
903            recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
904            self.verify_decrypted6(p_in, recv_pkts)
905
906            send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
907                                       dst=p_out.remote_tun_if_host,
908                                       count=count,
909                                       payload_size=payload_size)
910            recv_pkts = self.send_and_expect(self.pg1, send_pkts,
911                                             self.tun_if)
912            self.verify_encrypted6(p_out, p_out.vpp_tun_sa, recv_pkts)
913
914        finally:
915            self.logger.info(self.vapi.ppcli("show error"))
916            self.logger.info(self.vapi.ppcli("show ipsec all"))
917        self.verify_counters6(p_in, p_out, count)
918
919    def verify_tun_reass_66(self, p):
920        self.vapi.cli("clear errors")
921        self.vapi.ip_reassembly_enable_disable(
922            sw_if_index=self.tun_if.sw_if_index, enable_ip6=True)
923
924        try:
925            send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
926                                               src=p.remote_tun_if_host,
927                                               dst=self.pg1.remote_ip6,
928                                               count=1,
929                                               payload_size=1900)
930            send_pkts = fragment_rfc8200(send_pkts[0], 1, 1400, self.logger)
931            recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
932                                             self.pg1, n_rx=1)
933            self.verify_decrypted6(p, recv_pkts)
934
935            send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
936                                       dst=p.remote_tun_if_host,
937                                       count=1,
938                                       payload_size=64)
939            recv_pkts = self.send_and_expect(self.pg1, send_pkts,
940                                             self.tun_if)
941            self.verify_encrypted6(p, p.vpp_tun_sa, recv_pkts)
942        finally:
943            self.logger.info(self.vapi.ppcli("show error"))
944            self.logger.info(self.vapi.ppcli("show ipsec all"))
945        self.verify_counters6(p, p, 1)
946        self.vapi.ip_reassembly_enable_disable(
947            sw_if_index=self.tun_if.sw_if_index, enable_ip6=False)
948
949    def verify_tun_46(self, p, count=1):
950        """ ipsec 4o6 tunnel basic test """
951        self.vapi.cli("clear errors")
952        try:
953            send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
954                                              src=p.remote_tun_if_host4,
955                                              dst=self.pg1.remote_ip4,
956                                              count=count)
957            recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
958            for recv_pkt in recv_pkts:
959                self.assert_equal(recv_pkt[IP].src, p.remote_tun_if_host4)
960                self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4)
961                self.assert_packet_checksums_valid(recv_pkt)
962            send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
963                                      dst=p.remote_tun_if_host4,
964                                      count=count)
965            recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
966            for recv_pkt in recv_pkts:
967                try:
968                    decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
969                    if not decrypt_pkt.haslayer(IP):
970                        decrypt_pkt = IP(decrypt_pkt[Raw].load)
971                    self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
972                    self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host4)
973                    self.assert_packet_checksums_valid(decrypt_pkt)
974                except:
975                    self.logger.debug(ppp("Unexpected packet:", recv_pkt))
976                    try:
977                        self.logger.debug(ppp("Decrypted packet:",
978                                              decrypt_pkt))
979                    except:
980                        pass
981                    raise
982        finally:
983            self.logger.info(self.vapi.ppcli("show error"))
984            self.logger.info(self.vapi.ppcli("show ipsec all"))
985        self.verify_counters6(p, p, count)
986
987
988class IpsecTun6Tests(IpsecTun6):
989    """ UT test methods for Tunnel v6 """
990
991    def test_tun_basic66(self):
992        """ ipsec 6o6 tunnel basic test """
993        self.verify_tun_66(self.params[socket.AF_INET6], count=1)
994
995    def test_tun_reass_basic66(self):
996        """ ipsec 6o6 tunnel basic reassembly test """
997        self.verify_tun_reass_66(self.params[socket.AF_INET6])
998
999    def test_tun_burst66(self):
1000        """ ipsec 6o6 tunnel burst test """
1001        self.verify_tun_66(self.params[socket.AF_INET6], count=257)
1002
1003
1004class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests):
1005    """ UT test methods for Tunnel v6 & v4 """
1006    pass
1007
1008
1009if __name__ == '__main__':
1010    unittest.main(testRunner=VppTestRunner)
1011