test_nat.py revision efd7bc2b
1#!/usr/bin/env python3
2
3import socket
4import unittest
5import struct
6import random
7
8from framework import VppTestCase, VppTestRunner, running_extended_tests
9
10import scapy.compat
11from scapy.layers.inet import IP, TCP, UDP, ICMP
12from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \
14    ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, fragment6
15from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
16from scapy.layers.l2 import Ether, ARP, GRE
17from scapy.data import IP_PROTOS
18from scapy.packet import bind_layers, Raw
19from util import ppp
20from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21from time import sleep
22from util import ip4_range
23from vpp_papi import mac_pton
24from syslog_rfc5424_parser import SyslogMessage, ParseError
25from syslog_rfc5424_parser.constants import SyslogFacility, SyslogSeverity
26from io import BytesIO
27from vpp_papi import VppEnum
28from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathType
29from vpp_neighbor import VppNeighbor
30from scapy.all import bind_layers, Packet, ByteEnumField, ShortField, \
31    IPField, IntField, LongField, XByteField, FlagsField, FieldLenField, \
32    PacketListField
33from ipaddress import IPv6Network
34
35
36# NAT HA protocol event data
37class Event(Packet):
38    name = "Event"
39    fields_desc = [ByteEnumField("event_type", None,
40                                 {1: "add", 2: "del", 3: "refresh"}),
41                   ByteEnumField("protocol", None,
42                                 {0: "udp", 1: "tcp", 2: "icmp"}),
43                   ShortField("flags", 0),
44                   IPField("in_addr", None),
45                   IPField("out_addr", None),
46                   ShortField("in_port", None),
47                   ShortField("out_port", None),
48                   IPField("eh_addr", None),
49                   IPField("ehn_addr", None),
50                   ShortField("eh_port", None),
51                   ShortField("ehn_port", None),
52                   IntField("fib_index", None),
53                   IntField("total_pkts", 0),
54                   LongField("total_bytes", 0)]
55
56    def extract_padding(self, s):
57        return "", s
58
59
60# NAT HA protocol header
61class HANATStateSync(Packet):
62    name = "HA NAT state sync"
63    fields_desc = [XByteField("version", 1),
64                   FlagsField("flags", 0, 8, ['ACK']),
65                   FieldLenField("count", None, count_of="events"),
66                   IntField("sequence_number", 1),
67                   IntField("thread_index", 0),
68                   PacketListField("events", [], Event,
69                                   count_from=lambda pkt: pkt.count)]
70
71
72class MethodHolder(VppTestCase):
73    """ NAT create capture and verify method holder """
74
75    @property
76    def config_flags(self):
77        return VppEnum.vl_api_nat_config_flags_t
78
79    @property
80    def SYSLOG_SEVERITY(self):
81        return VppEnum.vl_api_syslog_severity_t
82
83    def clear_nat44(self):
84        """
85        Clear NAT44 configuration.
86        """
87        if hasattr(self, 'pg7') and hasattr(self, 'pg8'):
88            if self.pg7.has_ip4_config:
89                self.pg7.unconfig_ip4()
90
91        self.vapi.nat44_forwarding_enable_disable(enable=0)
92
93        interfaces = self.vapi.nat44_interface_addr_dump()
94        for intf in interfaces:
95            self.vapi.nat44_add_del_interface_addr(
96                is_add=0,
97                sw_if_index=intf.sw_if_index,
98                flags=intf.flags)
99
100        self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
101                                           src_port=self.ipfix_src_port,
102                                           enable=0)
103        self.ipfix_src_port = 4739
104        self.ipfix_domain_id = 1
105
106        self.vapi.syslog_set_filter(
107            self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_EMERG)
108
109        self.vapi.nat_ha_set_listener(ip_address='0.0.0.0', port=0,
110                                      path_mtu=512)
111        self.vapi.nat_ha_set_failover(ip_address='0.0.0.0', port=0,
112                                      session_refresh_interval=10)
113
114        interfaces = self.vapi.nat44_interface_dump()
115        for intf in interfaces:
116            if intf.flags & self.config_flags.NAT_IS_INSIDE and \
117                    intf.flags & self.config_flags.NAT_IS_OUTSIDE:
118                self.vapi.nat44_interface_add_del_feature(
119                    sw_if_index=intf.sw_if_index)
120            self.vapi.nat44_interface_add_del_feature(
121                sw_if_index=intf.sw_if_index,
122                flags=intf.flags)
123
124        interfaces = self.vapi.nat44_interface_output_feature_dump()
125        for intf in interfaces:
126            self.vapi.nat44_interface_add_del_output_feature(
127                is_add=0,
128                flags=intf.flags,
129                sw_if_index=intf.sw_if_index)
130        static_mappings = self.vapi.nat44_static_mapping_dump()
131        for sm in static_mappings:
132            self.vapi.nat44_add_del_static_mapping(
133                is_add=0,
134                local_ip_address=sm.local_ip_address,
135                external_ip_address=sm.external_ip_address,
136                external_sw_if_index=sm.external_sw_if_index,
137                local_port=sm.local_port,
138                external_port=sm.external_port,
139                vrf_id=sm.vrf_id,
140                protocol=sm.protocol,
141                flags=sm.flags, tag=sm.tag)
142
143        lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
144        for lb_sm in lb_static_mappings:
145            self.vapi.nat44_add_del_lb_static_mapping(
146                is_add=0,
147                flags=lb_sm.flags,
148                external_addr=lb_sm.external_addr,
149                external_port=lb_sm.external_port,
150                protocol=lb_sm.protocol,
151                local_num=0, locals=[],
152                tag=lb_sm.tag)
153
154        identity_mappings = self.vapi.nat44_identity_mapping_dump()
155        for id_m in identity_mappings:
156            self.vapi.nat44_add_del_identity_mapping(
157                ip_address=id_m.ip_address,
158                sw_if_index=id_m.sw_if_index,
159                port=id_m.port,
160                flags=id_m.flags,
161                vrf_id=id_m.vrf_id,
162                protocol=id_m.protocol)
163
164        addresses = self.vapi.nat44_address_dump()
165        for addr in addresses:
166            self.vapi.nat44_add_del_address_range(
167                first_ip_address=addr.ip_address,
168                last_ip_address=addr.ip_address,
169                vrf_id=0xFFFFFFFF, flags=addr.flags)
170
171        self.vapi.nat_set_reass(timeout=2, max_reass=1024, max_frag=5,
172                                drop_frag=0)
173        self.vapi.nat_set_reass(timeout=2, max_reass=1024, max_frag=5,
174                                drop_frag=0, is_ip6=1)
175        self.verify_no_nat44_user()
176        self.vapi.nat_set_timeouts(udp=300, tcp_established=7440,
177                                   tcp_transitory=240, icmp=60)
178        self.vapi.nat_set_addr_and_port_alloc_alg()
179        self.vapi.nat_set_mss_clamping(enable=0, mss_value=1500)
180
181    def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
182                                 local_port=0, external_port=0, vrf_id=0,
183                                 is_add=1, external_sw_if_index=0xFFFFFFFF,
184                                 proto=0, tag="", flags=0):
185        """
186        Add/delete NAT44 static mapping
187
188        :param local_ip: Local IP address
189        :param external_ip: External IP address
190        :param local_port: Local port number (Optional)
191        :param external_port: External port number (Optional)
192        :param vrf_id: VRF ID (Default 0)
193        :param is_add: 1 if add, 0 if delete (Default add)
194        :param external_sw_if_index: External interface instead of IP address
195        :param proto: IP protocol (Mandatory if port specified)
196        :param tag: Opaque string tag
197        :param flags: NAT configuration flags
198        """
199
200        if not (local_port and external_port):
201            flags |= self.config_flags.NAT_IS_ADDR_ONLY
202
203        self.vapi.nat44_add_del_static_mapping(
204            is_add=is_add,
205            local_ip_address=local_ip,
206            external_ip_address=external_ip,
207            external_sw_if_index=external_sw_if_index,
208            local_port=local_port,
209            external_port=external_port,
210            vrf_id=vrf_id, protocol=proto,
211            flags=flags,
212            tag=tag)
213
214    def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
215        """
216        Add/delete NAT44 address
217
218        :param ip: IP address
219        :param is_add: 1 if add, 0 if delete (Default add)
220        :param twice_nat: twice NAT address for external hosts
221        """
222        flags = self.config_flags.NAT_IS_TWICE_NAT if twice_nat else 0
223        self.vapi.nat44_add_del_address_range(first_ip_address=ip,
224                                              last_ip_address=ip,
225                                              vrf_id=vrf_id,
226                                              is_add=is_add,
227                                              flags=flags)
228
229    def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
230        """
231        Create packet stream for inside network
232
233        :param in_if: Inside interface
234        :param out_if: Outside interface
235        :param dst_ip: Destination address
236        :param ttl: TTL of generated packets
237        """
238        if dst_ip is None:
239            dst_ip = out_if.remote_ip4
240
241        pkts = []
242        # TCP
243        p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
244             IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
245             TCP(sport=self.tcp_port_in, dport=20))
246        pkts.extend([p, p])
247
248        # UDP
249        p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
250             IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
251             UDP(sport=self.udp_port_in, dport=20))
252        pkts.append(p)
253
254        # ICMP
255        p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
256             IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
257             ICMP(id=self.icmp_id_in, type='echo-request'))
258        pkts.append(p)
259
260        return pkts
261
262    def compose_ip6(self, ip4, pref, plen):
263        """
264        Compose IPv4-embedded IPv6 addresses
265
266        :param ip4: IPv4 address
267        :param pref: IPv6 prefix
268        :param plen: IPv6 prefix length
269        :returns: IPv4-embedded IPv6 addresses
270        """
271        pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
272        ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
273        if plen == 32:
274            pref_n[4] = ip4_n[0]
275            pref_n[5] = ip4_n[1]
276            pref_n[6] = ip4_n[2]
277            pref_n[7] = ip4_n[3]
278        elif plen == 40:
279            pref_n[5] = ip4_n[0]
280            pref_n[6] = ip4_n[1]
281            pref_n[7] = ip4_n[2]
282            pref_n[9] = ip4_n[3]
283        elif plen == 48:
284            pref_n[6] = ip4_n[0]
285            pref_n[7] = ip4_n[1]
286            pref_n[9] = ip4_n[2]
287            pref_n[10] = ip4_n[3]
288        elif plen == 56:
289            pref_n[7] = ip4_n[0]
290            pref_n[9] = ip4_n[1]
291            pref_n[10] = ip4_n[2]
292            pref_n[11] = ip4_n[3]
293        elif plen == 64:
294            pref_n[9] = ip4_n[0]
295            pref_n[10] = ip4_n[1]
296            pref_n[11] = ip4_n[2]
297            pref_n[12] = ip4_n[3]
298        elif plen == 96:
299            pref_n[12] = ip4_n[0]
300            pref_n[13] = ip4_n[1]
301            pref_n[14] = ip4_n[2]
302            pref_n[15] = ip4_n[3]
303        packed_pref_n = b''.join([scapy.compat.chb(x) for x in pref_n])
304        return socket.inet_ntop(socket.AF_INET6, packed_pref_n)
305
306    def extract_ip4(self, ip6, plen):
307        """
308        Extract IPv4 address embedded in IPv6 addresses
309
310        :param ip6: IPv6 address
311        :param plen: IPv6 prefix length
312        :returns: extracted IPv4 address
313        """
314        ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6))
315        ip4_n = [None] * 4
316        if plen == 32:
317            ip4_n[0] = ip6_n[4]
318            ip4_n[1] = ip6_n[5]
319            ip4_n[2] = ip6_n[6]
320            ip4_n[3] = ip6_n[7]
321        elif plen == 40:
322            ip4_n[0] = ip6_n[5]
323            ip4_n[1] = ip6_n[6]
324            ip4_n[2] = ip6_n[7]
325            ip4_n[3] = ip6_n[9]
326        elif plen == 48:
327            ip4_n[0] = ip6_n[6]
328            ip4_n[1] = ip6_n[7]
329            ip4_n[2] = ip6_n[9]
330            ip4_n[3] = ip6_n[10]
331        elif plen == 56:
332            ip4_n[0] = ip6_n[7]
333            ip4_n[1] = ip6_n[9]
334            ip4_n[2] = ip6_n[10]
335            ip4_n[3] = ip6_n[11]
336        elif plen == 64:
337            ip4_n[0] = ip6_n[9]
338            ip4_n[1] = ip6_n[10]
339            ip4_n[2] = ip6_n[11]
340            ip4_n[3] = ip6_n[12]
341        elif plen == 96:
342            ip4_n[0] = ip6_n[12]
343            ip4_n[1] = ip6_n[13]
344            ip4_n[2] = ip6_n[14]
345            ip4_n[3] = ip6_n[15]
346        return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n))
347
348    def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
349        """
350        Create IPv6 packet stream for inside network
351
352        :param in_if: Inside interface
353        :param out_if: Outside interface
354        :param ttl: Hop Limit of generated packets
355        :param pref: NAT64 prefix
356        :param plen: NAT64 prefix length
357        """
358        pkts = []
359        if pref is None:
360            dst = ''.join(['64:ff9b::', out_if.remote_ip4])
361        else:
362            dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
363
364        # TCP
365        p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
366             IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
367             TCP(sport=self.tcp_port_in, dport=20))
368        pkts.append(p)
369
370        # UDP
371        p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
372             IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
373             UDP(sport=self.udp_port_in, dport=20))
374        pkts.append(p)
375
376        # ICMP
377        p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
378             IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
379             ICMPv6EchoRequest(id=self.icmp_id_in))
380        pkts.append(p)
381
382        return pkts
383
384    def create_stream_out(self, out_if, dst_ip=None, ttl=64,
385                          use_inside_ports=False):
386        """
387        Create packet stream for outside network
388
389        :param out_if: Outside interface
390        :param dst_ip: Destination IP address (Default use global NAT address)
391        :param ttl: TTL of generated packets
392        :param use_inside_ports: Use inside NAT ports as destination ports
393               instead of outside ports
394        """
395        if dst_ip is None:
396            dst_ip = self.nat_addr
397        if not use_inside_ports:
398            tcp_port = self.tcp_port_out
399            udp_port = self.udp_port_out
400            icmp_id = self.icmp_id_out
401        else:
402            tcp_port = self.tcp_port_in
403            udp_port = self.udp_port_in
404            icmp_id = self.icmp_id_in
405        pkts = []
406        # TCP
407        p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
408             IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
409             TCP(dport=tcp_port, sport=20))
410        pkts.extend([p, p])
411
412        # UDP
413        p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
414             IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
415             UDP(dport=udp_port, sport=20))
416        pkts.append(p)
417
418        # ICMP
419        p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
420             IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
421             ICMP(id=icmp_id, type='echo-reply'))
422        pkts.append(p)
423
424        return pkts
425
426    def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
427        """
428        Create packet stream for outside network
429
430        :param out_if: Outside interface
431        :param dst_ip: Destination IP address (Default use global NAT address)
432        :param hl: HL of generated packets
433        """
434        pkts = []
435        # TCP
436        p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
437             IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
438             TCP(dport=self.tcp_port_out, sport=20))
439        pkts.append(p)
440
441        # UDP
442        p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
443             IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
444             UDP(dport=self.udp_port_out, sport=20))
445        pkts.append(p)
446
447        # ICMP
448        p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
449             IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
450             ICMPv6EchoReply(id=self.icmp_id_out))
451        pkts.append(p)
452
453        return pkts
454
455    def verify_capture_out(self, capture, nat_ip=None, same_port=False,
456                           dst_ip=None, is_ip6=False):
457        """
458        Verify captured packets on outside network
459
460        :param capture: Captured packets
461        :param nat_ip: Translated IP address (Default use global NAT address)
462        :param same_port: Source port number is not translated (Default False)
463        :param dst_ip: Destination IP address (Default do not verify)
464        :param is_ip6: If L3 protocol is IPv6 (Default False)
465        """
466        if is_ip6:
467            IP46 = IPv6
468            ICMP46 = ICMPv6EchoRequest
469        else:
470            IP46 = IP
471            ICMP46 = ICMP
472        if nat_ip is None:
473            nat_ip = self.nat_addr
474        for packet in capture:
475            try:
476                if not is_ip6:
477                    self.assert_packet_checksums_valid(packet)
478                self.assertEqual(packet[IP46].src, nat_ip)
479                if dst_ip is not None:
480                    self.assertEqual(packet[IP46].dst, dst_ip)
481                if packet.haslayer(TCP):
482                    if same_port:
483                        self.assertEqual(packet[TCP].sport, self.tcp_port_in)
484                    else:
485                        self.assertNotEqual(
486                            packet[TCP].sport, self.tcp_port_in)
487                    self.tcp_port_out = packet[TCP].sport
488                    self.assert_packet_checksums_valid(packet)
489                elif packet.haslayer(UDP):
490                    if same_port:
491                        self.assertEqual(packet[UDP].sport, self.udp_port_in)
492                    else:
493                        self.assertNotEqual(
494                            packet[UDP].sport, self.udp_port_in)
495                    self.udp_port_out = packet[UDP].sport
496                else:
497                    if same_port:
498                        self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
499                    else:
500                        self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
501                    self.icmp_id_out = packet[ICMP46].id
502                    self.assert_packet_checksums_valid(packet)
503            except:
504                self.logger.error(ppp("Unexpected or invalid packet "
505                                      "(outside network):", packet))
506                raise
507
508    def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
509                               dst_ip=None):
510        """
511        Verify captured packets on outside network
512
513        :param capture: Captured packets
514        :param nat_ip: Translated IP address
515        :param same_port: Source port number is not translated (Default False)
516        :param dst_ip: Destination IP address (Default do not verify)
517        """
518        return self.verify_capture_out(capture, nat_ip, same_port, dst_ip,
519                                       True)
520
521    def verify_capture_in(self, capture, in_if):
522        """
523        Verify captured packets on inside network
524
525        :param capture: Captured packets
526        :param in_if: Inside interface
527        """
528        for packet in capture:
529            try:
530                self.assert_packet_checksums_valid(packet)
531                self.assertEqual(packet[IP].dst, in_if.remote_ip4)
532                if packet.haslayer(TCP):
533                    self.assertEqual(packet[TCP].dport, self.tcp_port_in)
534                elif packet.haslayer(UDP):
535                    self.assertEqual(packet[UDP].dport, self.udp_port_in)
536                else:
537                    self.assertEqual(packet[ICMP].id, self.icmp_id_in)
538            except:
539                self.logger.error(ppp("Unexpected or invalid packet "
540                                      "(inside network):", packet))
541                raise
542
543    def verify_capture_in_ip6(self, capture, src_ip, dst_ip):
544        """
545        Verify captured IPv6 packets on inside network
546
547        :param capture: Captured packets
548        :param src_ip: Source IP
549        :param dst_ip: Destination IP address
550        """
551        for packet in capture:
552            try:
553                self.assertEqual(packet[IPv6].src, src_ip)
554                self.assertEqual(packet[IPv6].dst, dst_ip)
555                self.assert_packet_checksums_valid(packet)
556                if packet.haslayer(TCP):
557                    self.assertEqual(packet[TCP].dport, self.tcp_port_in)
558                elif packet.haslayer(UDP):
559                    self.assertEqual(packet[UDP].dport, self.udp_port_in)
560                else:
561                    self.assertEqual(packet[ICMPv6EchoReply].id,
562                                     self.icmp_id_in)
563            except:
564                self.logger.error(ppp("Unexpected or invalid packet "
565                                      "(inside network):", packet))
566                raise
567
568    def verify_capture_no_translation(self, capture, ingress_if, egress_if):
569        """
570        Verify captured packet that don't have to be translated
571
572        :param capture: Captured packets
573        :param ingress_if: Ingress interface
574        :param egress_if: Egress interface
575        """
576        for packet in capture:
577            try:
578                self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
579                self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
580                if packet.haslayer(TCP):
581                    self.assertEqual(packet[TCP].sport, self.tcp_port_in)
582                elif packet.haslayer(UDP):
583                    self.assertEqual(packet[UDP].sport, self.udp_port_in)
584                else:
585                    self.assertEqual(packet[ICMP].id, self.icmp_id_in)
586            except:
587                self.logger.error(ppp("Unexpected or invalid packet "
588                                      "(inside network):", packet))
589                raise
590
591    def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
592                                            icmp_type=11):
593        """
594        Verify captured packets with ICMP errors on outside network
595
596        :param capture: Captured packets
597        :param src_ip: Translated IP address or IP address of VPP
598                       (Default use global NAT address)
599        :param icmp_type: Type of error ICMP packet
600                          we are expecting (Default 11)
601        """
602        if src_ip is None:
603            src_ip = self.nat_addr
604        for packet in capture:
605            try:
606                self.assertEqual(packet[IP].src, src_ip)
607                self.assertEqual(packet.haslayer(ICMP), 1)
608                icmp = packet[ICMP]
609                self.assertEqual(icmp.type, icmp_type)
610                self.assertTrue(icmp.haslayer(IPerror))
611                inner_ip = icmp[IPerror]
612                if inner_ip.haslayer(TCPerror):
613                    self.assertEqual(inner_ip[TCPerror].dport,
614                                     self.tcp_port_out)
615                elif inner_ip.haslayer(UDPerror):
616                    self.assertEqual(inner_ip[UDPerror].dport,
617                                     self.udp_port_out)
618                else:
619                    self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
620            except:
621                self.logger.error(ppp("Unexpected or invalid packet "
622                                      "(outside network):", packet))
623                raise
624
625    def verify_capture_in_with_icmp_errors(self, capture, in_if, icmp_type=11):
626        """
627        Verify captured packets with ICMP errors on inside network
628
629        :param capture: Captured packets
630        :param in_if: Inside interface
631        :param icmp_type: Type of error ICMP packet
632                          we are expecting (Default 11)
633        """
634        for packet in capture:
635            try:
636                self.assertEqual(packet[IP].dst, in_if.remote_ip4)
637                self.assertEqual(packet.haslayer(ICMP), 1)
638                icmp = packet[ICMP]
639                self.assertEqual(icmp.type, icmp_type)
640                self.assertTrue(icmp.haslayer(IPerror))
641                inner_ip = icmp[IPerror]
642                if inner_ip.haslayer(TCPerror):
643                    self.assertEqual(inner_ip[TCPerror].sport,
644                                     self.tcp_port_in)
645                elif inner_ip.haslayer(UDPerror):
646                    self.assertEqual(inner_ip[UDPerror].sport,
647                                     self.udp_port_in)
648                else:
649                    self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
650            except:
651                self.logger.error(ppp("Unexpected or invalid packet "
652                                      "(inside network):", packet))
653                raise
654
655    def create_stream_frag(self, src_if, dst, sport, dport, data,
656                           proto=IP_PROTOS.tcp, echo_reply=False):
657        """
658        Create fragmented packet stream
659
660        :param src_if: Source interface
661        :param dst: Destination IPv4 address
662        :param sport: Source port
663        :param dport: Destination port
664        :param data: Payload data
665        :param proto: protocol (TCP, UDP, ICMP)
666        :param echo_reply: use echo_reply if protocol is ICMP
667        :returns: Fragments
668        """
669        if proto == IP_PROTOS.tcp:
670            p = (IP(src=src_if.remote_ip4, dst=dst) /
671                 TCP(sport=sport, dport=dport) /
672                 Raw(data))
673            p = p.__class__(scapy.compat.raw(p))
674            chksum = p[TCP].chksum
675            proto_header = TCP(sport=sport, dport=dport, chksum=chksum)
676        elif proto == IP_PROTOS.udp:
677            proto_header = UDP(sport=sport, dport=dport)
678        elif proto == IP_PROTOS.icmp:
679            if not echo_reply:
680                proto_header = ICMP(id=sport, type='echo-request')
681            else:
682                proto_header = ICMP(id=sport, type='echo-reply')
683        else:
684            raise Exception("Unsupported protocol")
685        id = random.randint(0, 65535)
686        pkts = []
687        if proto == IP_PROTOS.tcp:
688            raw = Raw(data[0:4])
689        else:
690            raw = Raw(data[0:16])
691        p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
692             IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
693             proto_header /
694             raw)
695        pkts.append(p)
696        if proto == IP_PROTOS.tcp:
697            raw = Raw(data[4:20])
698        else:
699            raw = Raw(data[16:32])
700        p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
701             IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
702                proto=proto) /
703             raw)
704        pkts.append(p)
705        if proto == IP_PROTOS.tcp:
706            raw = Raw(data[20:])
707        else:
708            raw = Raw(data[32:])
709        p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
710             IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto,
711                id=id) /
712             raw)
713        pkts.append(p)
714        return pkts
715
716    def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
717                               pref=None, plen=0, frag_size=128):
718        """
719        Create fragmented packet stream
720
721        :param src_if: Source interface
722        :param dst: Destination IPv4 address
723        :param sport: Source TCP port
724        :param dport: Destination TCP port
725        :param data: Payload data
726        :param pref: NAT64 prefix
727        :param plen: NAT64 prefix length
728        :param fragsize: size of fragments
729        :returns: Fragments
730        """
731        if pref is None:
732            dst_ip6 = ''.join(['64:ff9b::', dst])
733        else:
734            dst_ip6 = self.compose_ip6(dst, pref, plen)
735
736        p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
737             IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
738             IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
739             TCP(sport=sport, dport=dport) /
740             Raw(data))
741
742        return fragment6(p, frag_size)
743
744    def reass_frags_and_verify(self, frags, src, dst):
745        """
746        Reassemble and verify fragmented packet
747
748        :param frags: Captured fragments
749        :param src: Source IPv4 address to verify
750        :param dst: Destination IPv4 address to verify
751
752        :returns: Reassembled IPv4 packet
753        """
754        buffer = BytesIO()
755        for p in frags:
756            self.assertEqual(p[IP].src, src)
757            self.assertEqual(p[IP].dst, dst)
758            self.assert_ip_checksum_valid(p)
759            buffer.seek(p[IP].frag * 8)
760            buffer.write(bytes(p[IP].payload))
761        ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
762                proto=frags[0][IP].proto)
763        if ip.proto == IP_PROTOS.tcp:
764            p = (ip / TCP(buffer.getvalue()))
765            self.assert_tcp_checksum_valid(p)
766        elif ip.proto == IP_PROTOS.udp:
767            p = (ip / UDP(buffer.getvalue()[:8]) /
768                 Raw(buffer.getvalue()[8:]))
769        elif ip.proto == IP_PROTOS.icmp:
770            p = (ip / ICMP(buffer.getvalue()))
771        return p
772
773    def reass_frags_and_verify_ip6(self, frags, src, dst):
774        """
775        Reassemble and verify fragmented packet
776
777        :param frags: Captured fragments
778        :param src: Source IPv6 address to verify
779        :param dst: Destination IPv6 address to verify
780
781        :returns: Reassembled IPv6 packet
782        """
783        buffer = BytesIO()
784        for p in frags:
785            self.assertEqual(p[IPv6].src, src)
786            self.assertEqual(p[IPv6].dst, dst)
787            buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
788            buffer.write(bytes(p[IPv6ExtHdrFragment].payload))
789        ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
790                  nh=frags[0][IPv6ExtHdrFragment].nh)
791        if ip.nh == IP_PROTOS.tcp:
792            p = (ip / TCP(buffer.getvalue()))
793        elif ip.nh == IP_PROTOS.udp:
794            p = (ip / UDP(buffer.getvalue()))
795        self.assert_packet_checksums_valid(p)
796        return p
797
798    def initiate_tcp_session(self, in_if, out_if):
799        """
800        Initiates TCP session
801
802        :param in_if: Inside interface
803        :param out_if: Outside interface
804        """
805        try:
806            # SYN packet in->out
807            p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
808                 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
809                 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
810                     flags="S"))
811            in_if.add_stream(p)
812            self.pg_enable_capture(self.pg_interfaces)
813            self.pg_start()
814            capture = out_if.get_capture(1)
815            p = capture[0]
816            self.tcp_port_out = p[TCP].sport
817
818            # SYN + ACK packet out->in
819            p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
820                 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
821                 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
822                     flags="SA"))
823            out_if.add_stream(p)
824            self.pg_enable_capture(self.pg_interfaces)
825            self.pg_start()
826            in_if.get_capture(1)
827
828            # ACK packet in->out
829            p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
830                 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
831                 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
832                     flags="A"))
833            in_if.add_stream(p)
834            self.pg_enable_capture(self.pg_interfaces)
835            self.pg_start()
836            out_if.get_capture(1)
837
838        except:
839            self.logger.error("TCP 3 way handshake failed")
840            raise
841
842    def verify_ipfix_nat44_ses(self, data):
843        """
844        Verify IPFIX NAT44 session create/delete event
845
846        :param data: Decoded IPFIX data records
847        """
848        nat44_ses_create_num = 0
849        nat44_ses_delete_num = 0
850        self.assertEqual(6, len(data))
851        for record in data:
852            # natEvent
853            self.assertIn(scapy.compat.orb(record[230]), [4, 5])
854            if scapy.compat.orb(record[230]) == 4:
855                nat44_ses_create_num += 1
856            else:
857                nat44_ses_delete_num += 1
858            # sourceIPv4Address
859            self.assertEqual(self.pg0.remote_ip4n, record[8])
860            # postNATSourceIPv4Address
861            self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
862                             record[225])
863            # ingressVRFID
864            self.assertEqual(struct.pack("!I", 0), record[234])
865            # protocolIdentifier/sourceTransportPort
866            # /postNAPTSourceTransportPort
867            if IP_PROTOS.icmp == scapy.compat.orb(record[4]):
868                self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
869                self.assertEqual(struct.pack("!H", self.icmp_id_out),
870                                 record[227])
871            elif IP_PROTOS.tcp == scapy.compat.orb(record[4]):
872                self.assertEqual(struct.pack("!H", self.tcp_port_in),
873                                 record[7])
874                self.assertEqual(struct.pack("!H", self.tcp_port_out),
875                                 record[227])
876            elif IP_PROTOS.udp == scapy.compat.orb(record[4]):
877                self.assertEqual(struct.pack("!H", self.udp_port_in),
878                                 record[7])
879                self.assertEqual(struct.pack("!H", self.udp_port_out),
880                                 record[227])
881            else:
882                self.fail("Invalid protocol")
883        self.assertEqual(3, nat44_ses_create_num)
884        self.assertEqual(3, nat44_ses_delete_num)
885
886    def verify_ipfix_addr_exhausted(self, data):
887        """
888        Verify IPFIX NAT addresses event
889
890        :param data: Decoded IPFIX data records
891        """
892        self.assertEqual(1, len(data))
893        record = data[0]
894        # natEvent
895        self.assertEqual(scapy.compat.orb(record[230]), 3)
896        # natPoolID
897        self.assertEqual(struct.pack("!I", 0), record[283])
898
899    def verify_ipfix_max_sessions(self, data, limit):
900        """
901        Verify IPFIX maximum session entries exceeded event
902
903        :param data: Decoded IPFIX data records
904        :param limit: Number of maximum session entries that can be created.
905        """
906        self.assertEqual(1, len(data))
907        record = data[0]
908        # natEvent
909        self.assertEqual(scapy.compat.orb(record[230]), 13)
910        # natQuotaExceededEvent
911        self.assertEqual(struct.pack("I", 1), record[466])
912        # maxSessionEntries
913        self.assertEqual(struct.pack("I", limit), record[471])
914
915    def verify_ipfix_max_bibs(self, data, limit):
916        """
917        Verify IPFIX maximum BIB entries exceeded event
918
919        :param data: Decoded IPFIX data records
920        :param limit: Number of maximum BIB entries that can be created.
921        """
922        self.assertEqual(1, len(data))
923        record = data[0]
924        # natEvent
925        self.assertEqual(scapy.compat.orb(record[230]), 13)
926        # natQuotaExceededEvent
927        self.assertEqual(struct.pack("I", 2), record[466])
928        # maxBIBEntries
929        self.assertEqual(struct.pack("I", limit), record[472])
930
931    def verify_ipfix_max_fragments_ip6(self, data, limit, src_addr):
932        """
933        Verify IPFIX maximum IPv6 fragments pending reassembly exceeded event
934
935        :param data: Decoded IPFIX data records
936        :param limit: Number of maximum fragments pending reassembly
937        :param src_addr: IPv6 source address
938        """
939        self.assertEqual(1, len(data))
940        record = data[0]
941        # natEvent
942        self.assertEqual(scapy.compat.orb(record[230]), 13)
943        # natQuotaExceededEvent
944        self.assertEqual(struct.pack("I", 5), record[466])
945        # maxFragmentsPendingReassembly
946        self.assertEqual(struct.pack("I", limit), record[475])
947        # sourceIPv6Address
948        self.assertEqual(src_addr, record[27])
949
950    def verify_ipfix_max_fragments_ip4(self, data, limit, src_addr):
951        """
952        Verify IPFIX maximum IPv4 fragments pending reassembly exceeded event
953
954        :param data: Decoded IPFIX data records
955        :param limit: Number of maximum fragments pending reassembly
956        :param src_addr: IPv4 source address
957        """
958        self.assertEqual(1, len(data))
959        record = data[0]
960        # natEvent
961        self.assertEqual(scapy.compat.orb(record[230]), 13)
962        # natQuotaExceededEvent
963        self.assertEqual(struct.pack("I", 5), record[466])
964        # maxFragmentsPendingReassembly
965        self.assertEqual(struct.pack("I", limit), record[475])
966        # sourceIPv4Address
967        self.assertEqual(src_addr, record[8])
968
969    def verify_ipfix_bib(self, data, is_create, src_addr):
970        """
971        Verify IPFIX NAT64 BIB create and delete events
972
973        :param data: Decoded IPFIX data records
974        :param is_create: Create event if nonzero value otherwise delete event
975        :param src_addr: IPv6 source address
976        """
977        self.assertEqual(1, len(data))
978        record = data[0]
979        # natEvent
980        if is_create:
981            self.assertEqual(scapy.compat.orb(record[230]), 10)
982        else:
983            self.assertEqual(scapy.compat.orb(record[230]), 11)
984        # sourceIPv6Address
985        self.assertEqual(src_addr, record[27])
986        # postNATSourceIPv4Address
987        self.assertEqual(self.nat_addr_n, record[225])
988        # protocolIdentifier
989        self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4]))
990        # ingressVRFID
991        self.assertEqual(struct.pack("!I", 0), record[234])
992        # sourceTransportPort
993        self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
994        # postNAPTSourceTransportPort
995        self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
996
997    def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
998                               dst_port):
999        """
1000        Verify IPFIX NAT64 session create and delete events
1001
1002        :param data: Decoded IPFIX data records
1003        :param is_create: Create event if nonzero value otherwise delete event
1004        :param src_addr: IPv6 source address
1005        :param dst_addr: IPv4 destination address
1006        :param dst_port: destination TCP port
1007        """
1008        self.assertEqual(1, len(data))
1009        record = data[0]
1010        # natEvent
1011        if is_create:
1012            self.assertEqual(scapy.compat.orb(record[230]), 6)
1013        else:
1014            self.assertEqual(scapy.compat.orb(record[230]), 7)
1015        # sourceIPv6Address
1016        self.assertEqual(src_addr, record[27])
1017        # destinationIPv6Address
1018        self.assertEqual(socket.inet_pton(socket.AF_INET6,
1019                                          self.compose_ip6(dst_addr,
1020                                                           '64:ff9b::',
1021                                                           96)),
1022                         record[28])
1023        # postNATSourceIPv4Address
1024        self.assertEqual(self.nat_addr_n, record[225])
1025        # postNATDestinationIPv4Address
1026        self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
1027                         record[226])
1028        # protocolIdentifier
1029        self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4]))
1030        # ingressVRFID
1031        self.assertEqual(struct.pack("!I", 0), record[234])
1032        # sourceTransportPort
1033        self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
1034        # postNAPTSourceTransportPort
1035        self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
1036        # destinationTransportPort
1037        self.assertEqual(struct.pack("!H", dst_port), record[11])
1038        # postNAPTDestinationTransportPort
1039        self.assertEqual(struct.pack("!H", dst_port), record[228])
1040
1041    def verify_no_nat44_user(self):
1042        """ Verify that there is no NAT44 user """
1043        users = self.vapi.nat44_user_dump()
1044        self.assertEqual(len(users), 0)
1045        users = self.statistics.get_counter('/nat44/total-users')
1046        self.assertEqual(users[0][0], 0)
1047        sessions = self.statistics.get_counter('/nat44/total-sessions')
1048        self.assertEqual(sessions[0][0], 0)
1049
1050    def verify_ipfix_max_entries_per_user(self, data, limit, src_addr):
1051        """
1052        Verify IPFIX maximum entries per user exceeded event
1053
1054        :param data: Decoded IPFIX data records
1055        :param limit: Number of maximum entries per user
1056        :param src_addr: IPv4 source address
1057        """
1058        self.assertEqual(1, len(data))
1059        record = data[0]
1060        # natEvent
1061        self.assertEqual(scapy.compat.orb(record[230]), 13)
1062        # natQuotaExceededEvent
1063        self.assertEqual(struct.pack("I", 3), record[466])
1064        # maxEntriesPerUser
1065        self.assertEqual(struct.pack("I", limit), record[473])
1066        # sourceIPv4Address
1067        self.assertEqual(socket.inet_pton(socket.AF_INET, src_addr), record[8])
1068
1069    def verify_syslog_apmap(self, data, is_add=True):
1070        message = data.decode('utf-8')
1071        try:
1072            message = SyslogMessage.parse(message)
1073        except ParseError as e:
1074            self.logger.error(e)
1075            raise
1076        else:
1077            self.assertEqual(message.severity, SyslogSeverity.info)
1078            self.assertEqual(message.appname, 'NAT')
1079            self.assertEqual(message.msgid, 'APMADD' if is_add else 'APMDEL')
1080            sd_params = message.sd.get('napmap')
1081            self.assertTrue(sd_params is not None)
1082            self.assertEqual(sd_params.get('IATYP'), 'IPv4')
1083            self.assertEqual(sd_params.get('ISADDR'), self.pg0.remote_ip4)
1084            self.assertEqual(sd_params.get('ISPORT'), "%d" % self.tcp_port_in)
1085            self.assertEqual(sd_params.get('XATYP'), 'IPv4')
1086            self.assertEqual(sd_params.get('XSADDR'), self.nat_addr)
1087            self.assertEqual(sd_params.get('XSPORT'), "%d" % self.tcp_port_out)
1088            self.assertEqual(sd_params.get('PROTO'), "%d" % IP_PROTOS.tcp)
1089            self.assertTrue(sd_params.get('SSUBIX') is not None)
1090            self.assertEqual(sd_params.get('SVLAN'), '0')
1091
1092    def verify_syslog_sess(self, data, is_add=True, is_ip6=False):
1093        message = data.decode('utf-8')
1094        try:
1095            message = SyslogMessage.parse(message)
1096        except ParseError as e:
1097            self.logger.error(e)
1098            raise
1099        else:
1100            self.assertEqual(message.severity, SyslogSeverity.info)
1101            self.assertEqual(message.appname, 'NAT')
1102            self.assertEqual(message.msgid, 'SADD' if is_add else 'SDEL')
1103            sd_params = message.sd.get('nsess')
1104            self.assertTrue(sd_params is not None)
1105            if is_ip6:
1106                self.assertEqual(sd_params.get('IATYP'), 'IPv6')
1107                self.assertEqual(sd_params.get('ISADDR'), self.pg0.remote_ip6)
1108            else:
1109                self.assertEqual(sd_params.get('IATYP'), 'IPv4')
1110                self.assertEqual(sd_params.get('ISADDR'), self.pg0.remote_ip4)
1111                self.assertTrue(sd_params.get('SSUBIX') is not None)
1112            self.assertEqual(sd_params.get('ISPORT'), "%d" % self.tcp_port_in)
1113            self.assertEqual(sd_params.get('XATYP'), 'IPv4')
1114            self.assertEqual(sd_params.get('XSADDR'), self.nat_addr)
1115            self.assertEqual(sd_params.get('XSPORT'), "%d" % self.tcp_port_out)
1116            self.assertEqual(sd_params.get('PROTO'), "%d" % IP_PROTOS.tcp)
1117            self.assertEqual(sd_params.get('SVLAN'), '0')
1118            self.assertEqual(sd_params.get('XDADDR'), self.pg1.remote_ip4)
1119            self.assertEqual(sd_params.get('XDPORT'),
1120                             "%d" % self.tcp_external_port)
1121
1122    def verify_mss_value(self, pkt, mss):
1123        """
1124        Verify TCP MSS value
1125
1126        :param pkt:
1127        :param mss:
1128        """
1129        if not pkt.haslayer(IP) or not pkt.haslayer(TCP):
1130            raise TypeError("Not a TCP/IP packet")
1131
1132        for option in pkt[TCP].options:
1133            if option[0] == 'MSS':
1134                self.assertEqual(option[1], mss)
1135                self.assert_tcp_checksum_valid(pkt)
1136
1137    @staticmethod
1138    def proto2layer(proto):
1139        if proto == IP_PROTOS.tcp:
1140            return TCP
1141        elif proto == IP_PROTOS.udp:
1142            return UDP
1143        elif proto == IP_PROTOS.icmp:
1144            return ICMP
1145        else:
1146            raise Exception("Unsupported protocol")
1147
1148    def frag_in_order(self, proto=IP_PROTOS.tcp, dont_translate=False):
1149        layer = self.proto2layer(proto)
1150
1151        if proto == IP_PROTOS.tcp:
1152            data = b"A" * 4 + b"B" * 16 + b"C" * 3
1153        else:
1154            data = b"A" * 16 + b"B" * 16 + b"C" * 3
1155        self.port_in = random.randint(1025, 65535)
1156
1157        reass = self.vapi.nat_reass_dump()
1158        reass_n_start = len(reass)
1159
1160        # in2out
1161        pkts = self.create_stream_frag(self.pg0,
1162                                       self.pg1.remote_ip4,
1163                                       self.port_in,
1164                                       20,
1165                                       data,
1166                                       proto)
1167        self.pg0.add_stream(pkts)
1168        self.pg_enable_capture(self.pg_interfaces)
1169        self.pg_start()
1170        frags = self.pg1.get_capture(len(pkts))
1171        if not dont_translate:
1172            p = self.reass_frags_and_verify(frags,
1173                                            self.nat_addr,
1174                                            self.pg1.remote_ip4)
1175        else:
1176            p = self.reass_frags_and_verify(frags,
1177                                            self.pg0.remote_ip4,
1178                                            self.pg1.remote_ip4)
1179        if proto != IP_PROTOS.icmp:
1180            if not dont_translate:
1181                self.assertEqual(p[layer].dport, 20)
1182                self.assertNotEqual(p[layer].sport, self.port_in)
1183            else:
1184                self.assertEqual(p[layer].sport, self.port_in)
1185        else:
1186            if not dont_translate:
1187                self.assertNotEqual(p[layer].id, self.port_in)
1188            else:
1189                self.assertEqual(p[layer].id, self.port_in)
1190        self.assertEqual(data, p[Raw].load)
1191
1192        # out2in
1193        if not dont_translate:
1194            dst_addr = self.nat_addr
1195        else:
1196            dst_addr = self.pg0.remote_ip4
1197        if proto != IP_PROTOS.icmp:
1198            sport = 20
1199            dport = p[layer].sport
1200        else:
1201            sport = p[layer].id
1202            dport = 0
1203        pkts = self.create_stream_frag(self.pg1,
1204                                       dst_addr,
1205                                       sport,
1206                                       dport,
1207                                       data,
1208                                       proto,
1209                                       echo_reply=True)
1210        self.pg1.add_stream(pkts)
1211        self.pg_enable_capture(self.pg_interfaces)
1212        self.pg_start()
1213        frags = self.pg0.get_capture(len(pkts))
1214        p = self.reass_frags_and_verify(frags,
1215                                        self.pg1.remote_ip4,
1216                                        self.pg0.remote_ip4)
1217        if proto != IP_PROTOS.icmp:
1218            self.assertEqual(p[layer].sport, 20)
1219            self.assertEqual(p[layer].dport, self.port_in)
1220        else:
1221            self.assertEqual(p[layer].id, self.port_in)
1222        self.assertEqual(data, p[Raw].load)
1223
1224        reass = self.vapi.nat_reass_dump()
1225        reass_n_end = len(reass)
1226
1227        self.assertEqual(reass_n_end - reass_n_start, 2)
1228
1229    def frag_in_order_in_plus_out(self, proto=IP_PROTOS.tcp):
1230        layer = self.proto2layer(proto)
1231
1232        if proto == IP_PROTOS.tcp:
1233            data = b"A" * 4 + b"B" * 16 + b"C" * 3
1234        else:
1235            data = b"A" * 16 + b"B" * 16 + b"C" * 3
1236        self.port_in = random.randint(1025, 65535)
1237
1238        for i in range(2):
1239            reass = self.vapi.nat_reass_dump()
1240            reass_n_start = len(reass)
1241
1242            # out2in
1243            pkts = self.create_stream_frag(self.pg0,
1244                                           self.server_out_addr,
1245                                           self.port_in,
1246                                           self.server_out_port,
1247                                           data,
1248                                           proto)
1249            self.pg0.add_stream(pkts)
1250            self.pg_enable_capture(self.pg_interfaces)
1251            self.pg_start()
1252            frags = self.pg1.get_capture(len(pkts))
1253            p = self.reass_frags_and_verify(frags,
1254                                            self.pg0.remote_ip4,
1255                                            self.server_in_addr)
1256            if proto != IP_PROTOS.icmp:
1257                self.assertEqual(p[layer].sport, self.port_in)
1258                self.assertEqual(p[layer].dport, self.server_in_port)
1259            else:
1260                self.assertEqual(p[layer].id, self.port_in)
1261            self.assertEqual(data, p[Raw].load)
1262
1263            # in2out
1264            if proto != IP_PROTOS.icmp:
1265                pkts = self.create_stream_frag(self.pg1,
1266                                               self.pg0.remote_ip4,
1267                                               self.server_in_port,
1268                                               p[layer].sport,
1269                                               data,
1270                                               proto)
1271            else:
1272                pkts = self.create_stream_frag(self.pg1,
1273                                               self.pg0.remote_ip4,
1274                                               p[layer].id,
1275                                               0,
1276                                               data,
1277                                               proto,
1278                                               echo_reply=True)
1279            self.pg1.add_stream(pkts)
1280            self.pg_enable_capture(self.pg_interfaces)
1281            self.pg_start()
1282            frags = self.pg0.get_capture(len(pkts))
1283            p = self.reass_frags_and_verify(frags,
1284                                            self.server_out_addr,
1285                                            self.pg0.remote_ip4)
1286            if proto != IP_PROTOS.icmp:
1287                self.assertEqual(p[layer].sport, self.server_out_port)
1288                self.assertEqual(p[layer].dport, self.port_in)
1289            else:
1290                self.assertEqual(p[layer].id, self.port_in)
1291            self.assertEqual(data, p[Raw].load)
1292
1293            reass = self.vapi.nat_reass_dump()
1294            reass_n_end = len(reass)
1295
1296            self.assertEqual(reass_n_end - reass_n_start, 2)
1297
1298    def reass_hairpinning(self, proto=IP_PROTOS.tcp):
1299        layer = self.proto2layer(proto)
1300
1301        if proto == IP_PROTOS.tcp:
1302            data = b"A" * 4 + b"B" * 16 + b"C" * 3
1303        else:
1304            data = b"A" * 16 + b"B" * 16 + b"C" * 3
1305
1306        # send packet from host to server
1307        pkts = self.create_stream_frag(self.pg0,
1308                                       self.nat_addr,
1309                                       self.host_in_port,
1310                                       self.server_out_port,
1311                                       data,
1312                                       proto)
1313        self.pg0.add_stream(pkts)
1314        self.pg_enable_capture(self.pg_interfaces)
1315        self.pg_start()
1316        frags = self.pg0.get_capture(len(pkts))
1317        p = self.reass_frags_and_verify(frags,
1318                                        self.nat_addr,
1319                                        self.server.ip4)
1320        if proto != IP_PROTOS.icmp:
1321            self.assertNotEqual(p[layer].sport, self.host_in_port)
1322            self.assertEqual(p[layer].dport, self.server_in_port)
1323        else:
1324            self.assertNotEqual(p[layer].id, self.host_in_port)
1325        self.assertEqual(data, p[Raw].load)
1326
1327    def frag_out_of_order(self, proto=IP_PROTOS.tcp, dont_translate=False):
1328        layer = self.proto2layer(proto)
1329
1330        if proto == IP_PROTOS.tcp:
1331            data = b"A" * 4 + b"B" * 16 + b"C" * 3
1332        else:
1333            data = b"A" * 16 + b"B" * 16 + b"C" * 3
1334        self.port_in = random.randint(1025, 65535)
1335
1336        for i in range(2):
1337            # in2out
1338            pkts = self.create_stream_frag(self.pg0,
1339                                           self.pg1.remote_ip4,
1340                                           self.port_in,
1341                                           20,
1342                                           data,
1343                                           proto)
1344            pkts.reverse()
1345            self.pg0.add_stream(pkts)
1346            self.pg_enable_capture(self.pg_interfaces)
1347            self.pg_start()
1348            frags = self.pg1.get_capture(len(pkts))
1349            if not dont_translate:
1350                p = self.reass_frags_and_verify(frags,
1351                                                self.nat_addr,
1352                                                self.pg1.remote_ip4)
1353            else:
1354                p = self.reass_frags_and_verify(frags,
1355                                                self.pg0.remote_ip4,
1356                                                self.pg1.remote_ip4)
1357            if proto != IP_PROTOS.icmp:
1358                if not dont_translate:
1359                    self.assertEqual(p[layer].dport, 20)
1360                    self.assertNotEqual(p[layer].sport, self.port_in)
1361                else:
1362                    self.assertEqual(p[layer].sport, self.port_in)
1363            else:
1364                if not dont_translate:
1365                    self.assertNotEqual(p[layer].id, self.port_in)
1366                else:
1367                    self.assertEqual(p[layer].id, self.port_in)
1368            self.assertEqual(data, p[Raw].load)
1369
1370            # out2in
1371            if not dont_translate:
1372                dst_addr = self.nat_addr
1373            else:
1374                dst_addr = self.pg0.remote_ip4
1375            if proto != IP_PROTOS.icmp:
1376                sport = 20
1377                dport = p[layer].sport
1378            else:
1379                sport = p[layer].id
1380                dport = 0
1381            pkts = self.create_stream_frag(self.pg1,
1382                                           dst_addr,
1383                                           sport,
1384                                           dport,
1385                                           data,
1386                                           proto,
1387                                           echo_reply=True)
1388            pkts.reverse()
1389            self.pg1.add_stream(pkts)
1390            self.pg_enable_capture(self.pg_interfaces)
1391            self.pg_start()
1392            frags = self.pg0.get_capture(len(pkts))
1393            p = self.reass_frags_and_verify(frags,
1394                                            self.pg1.remote_ip4,
1395                                            self.pg0.remote_ip4)
1396            if proto != IP_PROTOS.icmp:
1397                self.assertEqual(p[layer].sport, 20)
1398                self.assertEqual(p[layer].dport, self.port_in)
1399            else:
1400                self.assertEqual(p[layer].id, self.port_in)
1401            self.assertEqual(data, p[Raw].load)
1402
1403    def frag_out_of_order_in_plus_out(self, proto=IP_PROTOS.tcp):
1404        layer = self.proto2layer(proto)
1405
1406        if proto == IP_PROTOS.tcp:
1407            data = b"A" * 4 + b"B" * 16 + b"C" * 3
1408        else:
1409            data = b"A" * 16 + b"B" * 16 + b"C" * 3
1410        self.port_in = random.randint(1025, 65535)
1411
1412        for i in range(2):
1413            # out2in
1414            pkts = self.create_stream_frag(self.pg0,
1415                                           self.server_out_addr,
1416                                           self.port_in,
1417                                           self.server_out_port,
1418                                           data,
1419                                           proto)
1420            pkts.reverse()
1421            self.pg0.add_stream(pkts)
1422            self.pg_enable_capture(self.pg_interfaces)
1423            self.pg_start()
1424            frags = self.pg1.get_capture(len(pkts))
1425            p = self.reass_frags_and_verify(frags,
1426                                            self.pg0.remote_ip4,
1427                                            self.server_in_addr)
1428            if proto != IP_PROTOS.icmp:
1429                self.assertEqual(p[layer].dport, self.server_in_port)
1430                self.assertEqual(p[layer].sport, self.port_in)
1431                self.assertEqual(p[layer].dport, self.server_in_port)
1432            else:
1433                self.assertEqual(p[layer].id, self.port_in)
1434            self.assertEqual(data, p[Raw].load)
1435
1436            # in2out
1437            if proto != IP_PROTOS.icmp:
1438                pkts = self.create_stream_frag(self.pg1,
1439                                               self.pg0.remote_ip4,
1440                                               self.server_in_port,
1441                                               p[layer].sport,
1442                                               data,
1443                                               proto)
1444            else:
1445                pkts = self.create_stream_frag(self.pg1,
1446                                               self.pg0.remote_ip4,
1447                                               p[layer].id,
1448                                               0,
1449                                               data,
1450                                               proto,
1451                                               echo_reply=True)
1452            pkts.reverse()
1453            self.pg1.add_stream(pkts)
1454            self.pg_enable_capture(self.pg_interfaces)
1455            self.pg_start()
1456            frags = self.pg0.get_capture(len(pkts))
1457            p = self.reass_frags_and_verify(frags,
1458                                            self.server_out_addr,
1459                                            self.pg0.remote_ip4)
1460            if proto != IP_PROTOS.icmp:
1461                self.assertEqual(p[layer].sport, self.server_out_port)
1462                self.assertEqual(p[layer].dport, self.port_in)
1463            else:
1464                self.assertEqual(p[layer].id, self.port_in)
1465            self.assertEqual(data, p[Raw].load)
1466
1467
1468class TestNAT44(MethodHolder):
1469    """ NAT44 Test Cases """
1470
1471    @classmethod
1472    def setUpClass(cls):
1473        super(TestNAT44, cls).setUpClass()
1474        cls.vapi.cli("set log class nat level debug")
1475
1476        try:
1477            cls.tcp_port_in = 6303
1478            cls.tcp_port_out = 6303
1479            cls.udp_port_in = 6304
1480            cls.udp_port_out = 6304
1481            cls.icmp_id_in = 6305
1482            cls.icmp_id_out = 6305
1483            cls.nat_addr = '10.0.0.3'
1484            cls.ipfix_src_port = 4739
1485            cls.ipfix_domain_id = 1
1486            cls.tcp_external_port = 80
1487            cls.udp_external_port = 69
1488
1489            cls.create_pg_interfaces(range(10))
1490            cls.interfaces = list(cls.pg_interfaces[0:4])
1491
1492            for i in cls.interfaces:
1493                i.admin_up()
1494                i.config_ip4()
1495                i.resolve_arp()
1496
1497            cls.pg0.generate_remote_hosts(3)
1498            cls.pg0.configure_ipv4_neighbors()
1499
1500            cls.pg1.generate_remote_hosts(1)
1501            cls.pg1.configure_ipv4_neighbors()
1502
1503            cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
1504            cls.vapi.ip_table_add_del(is_add=1, table_id=10)
1505            cls.vapi.ip_table_add_del(is_add=1, table_id=20)
1506
1507            cls.pg4._local_ip4 = "172.16.255.1"
1508            cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
1509            cls.pg4.set_table_ip4(10)
1510            cls.pg5._local_ip4 = "172.17.255.3"
1511            cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
1512            cls.pg5.set_table_ip4(10)
1513            cls.pg6._local_ip4 = "172.16.255.1"
1514            cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
1515            cls.pg6.set_table_ip4(20)
1516            for i in cls.overlapping_interfaces:
1517                i.config_ip4()
1518                i.admin_up()
1519                i.resolve_arp()
1520
1521            cls.pg7.admin_up()
1522            cls.pg8.admin_up()
1523
1524            cls.pg9.generate_remote_hosts(2)
1525            cls.pg9.config_ip4()
1526            cls.vapi.sw_interface_add_del_address(
1527                sw_if_index=cls.pg9.sw_if_index,
1528                prefix="10.0.0.1/24")
1529
1530            cls.pg9.admin_up()
1531            cls.pg9.resolve_arp()
1532            cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
1533            cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
1534            cls.pg9.resolve_arp()
1535
1536        except Exception:
1537            super(TestNAT44, cls).tearDownClass()
1538            raise
1539
1540    @classmethod
1541    def tearDownClass(cls):
1542        super(TestNAT44, cls).tearDownClass()
1543
1544    def test_dynamic(self):
1545        """ NAT44 dynamic translation test """
1546        self.nat44_add_address(self.nat_addr)
1547        flags = self.config_flags.NAT_IS_INSIDE
1548        self.vapi.nat44_interface_add_del_feature(
1549            sw_if_index=self.pg0.sw_if_index,
1550            flags=flags, is_add=1)
1551        self.vapi.nat44_interface_add_del_feature(
1552            sw_if_index=self.pg1.sw_if_index,
1553            is_add=1)
1554
1555        # in2out
1556        tcpn = self.statistics.get_err_counter(
1557            '/err/nat44-in2out-slowpath/TCP packets')
1558        udpn = self.statistics.get_err_counter(
1559            '/err/nat44-in2out-slowpath/UDP packets')
1560        icmpn = self.statistics.get_err_counter(
1561            '/err/nat44-in2out-slowpath/ICMP packets')
1562        totaln = self.statistics.get_err_counter(
1563            '/err/nat44-in2out-slowpath/good in2out packets processed')
1564
1565        pkts = self.create_stream_in(self.pg0, self.pg1)
1566        self.pg0.add_stream(pkts)
1567        self.pg_enable_capture(self.pg_interfaces)
1568        self.pg_start()
1569        capture = self.pg1.get_capture(len(pkts))
1570        self.verify_capture_out(capture)
1571
1572        err = self.statistics.get_err_counter(
1573            '/err/nat44-in2out-slowpath/TCP packets')
1574        self.assertEqual(err - tcpn, 2)
1575        err = self.statistics.get_err_counter(
1576            '/err/nat44-in2out-slowpath/UDP packets')
1577        self.assertEqual(err - udpn, 1)
1578        err = self.statistics.get_err_counter(
1579            '/err/nat44-in2out-slowpath/ICMP packets')
1580        self.assertEqual(err - icmpn, 1)
1581        err = self.statistics.get_err_counter(
1582            '/err/nat44-in2out-slowpath/good in2out packets processed')
1583        self.assertEqual(err - totaln, 4)
1584
1585        # out2in
1586        tcpn = self.statistics.get_err_counter('/err/nat44-out2in/TCP packets')
1587        udpn = self.statistics.get_err_counter('/err/nat44-out2in/UDP packets')
1588        icmpn = self.statistics.get_err_counter(
1589            '/err/nat44-out2in/ICMP packets')
1590        totaln = self.statistics.get_err_counter(
1591            '/err/nat44-out2in/good out2in packets processed')
1592
1593        pkts = self.create_stream_out(self.pg1)
1594        self.pg1.add_stream(pkts)
1595        self.pg_enable_capture(self.pg_interfaces)
1596        self.pg_start()
1597        capture = self.pg0.get_capture(len(pkts))
1598        self.verify_capture_in(capture, self.pg0)
1599
1600        err = self.statistics.get_err_counter('/err/nat44-out2in/TCP packets')
1601        self.assertEqual(err - tcpn, 2)
1602        err = self.statistics.get_err_counter('/err/nat44-out2in/UDP packets')
1603        self.assertEqual(err - udpn, 1)
1604        err = self.statistics.get_err_counter('/err/nat44-out2in/ICMP packets')
1605        self.assertEqual(err - icmpn, 1)
1606        err = self.statistics.get_err_counter(
1607            '/err/nat44-out2in/good out2in packets processed')
1608        self.assertEqual(err - totaln, 4)
1609
1610        users = self.statistics.get_counter('/nat44/total-users')
1611        self.assertEqual(users[0][0], 1)
1612        sessions = self.statistics.get_counter('/nat44/total-sessions')
1613        self.assertEqual(sessions[0][0], 3)
1614
1615    def test_dynamic_icmp_errors_in2out_ttl_1(self):
1616        """ NAT44 handling of client packets with TTL=1 """
1617
1618        self.nat44_add_address(self.nat_addr)
1619        flags = self.config_flags.NAT_IS_INSIDE
1620        self.vapi.nat44_interface_add_del_feature(
1621            sw_if_index=self.pg0.sw_if_index,
1622            flags=flags, is_add=1)
1623        self.vapi.nat44_interface_add_del_feature(
1624            sw_if_index=self.pg1.sw_if_index,
1625            is_add=1)
1626
1627        # Client side - generate traffic
1628        pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1629        self.pg0.add_stream(pkts)
1630        self.pg_enable_capture(self.pg_interfaces)
1631        self.pg_start()
1632
1633        # Client side - verify ICMP type 11 packets
1634        capture = self.pg0.get_capture(len(pkts))
1635        self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1636
1637    def test_dynamic_icmp_errors_out2in_ttl_1(self):
1638        """ NAT44 handling of server packets with TTL=1 """
1639
1640        self.nat44_add_address(self.nat_addr)
1641        flags = self.config_flags.NAT_IS_INSIDE
1642        self.vapi.nat44_interface_add_del_feature(
1643            sw_if_index=self.pg0.sw_if_index,
1644            flags=flags, is_add=1)
1645        self.vapi.nat44_interface_add_del_feature(
1646            sw_if_index=self.pg1.sw_if_index,
1647            is_add=1)
1648
1649        # Client side - create sessions
1650        pkts = self.create_stream_in(self.pg0, self.pg1)
1651        self.pg0.add_stream(pkts)
1652        self.pg_enable_capture(self.pg_interfaces)
1653        self.pg_start()
1654
1655        # Server side - generate traffic
1656        capture = self.pg1.get_capture(len(pkts))
1657        self.verify_capture_out(capture)
1658        pkts = self.create_stream_out(self.pg1, ttl=1)
1659        self.pg1.add_stream(pkts)
1660        self.pg_enable_capture(self.pg_interfaces)
1661        self.pg_start()
1662
1663        # Server side - verify ICMP type 11 packets
1664        capture = self.pg1.get_capture(len(pkts))
1665        self.verify_capture_out_with_icmp_errors(capture,
1666                                                 src_ip=self.pg1.local_ip4)
1667
1668    def test_dynamic_icmp_errors_in2out_ttl_2(self):
1669        """ NAT44 handling of error responses to client packets with TTL=2 """
1670
1671        self.nat44_add_address(self.nat_addr)
1672        flags = self.config_flags.NAT_IS_INSIDE
1673        self.vapi.nat44_interface_add_del_feature(
1674            sw_if_index=self.pg0.sw_if_index,
1675            flags=flags, is_add=1)
1676        self.vapi.nat44_interface_add_del_feature(
1677            sw_if_index=self.pg1.sw_if_index,
1678            is_add=1)
1679
1680        # Client side - generate traffic
1681        pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1682        self.pg0.add_stream(pkts)
1683        self.pg_enable_capture(self.pg_interfaces)
1684        self.pg_start()
1685
1686        # Server side - simulate ICMP type 11 response
1687        capture = self.pg1.get_capture(len(pkts))
1688        pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1689                IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1690                ICMP(type=11) / packet[IP] for packet in capture]
1691        self.pg1.add_stream(pkts)
1692        self.pg_enable_capture(self.pg_interfaces)
1693        self.pg_start()
1694
1695        # Client side - verify ICMP type 11 packets
1696        capture = self.pg0.get_capture(len(pkts))
1697        self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1698
1699    def test_dynamic_icmp_errors_out2in_ttl_2(self):
1700        """ NAT44 handling of error responses to server packets with TTL=2 """
1701
1702        self.nat44_add_address(self.nat_addr)
1703        flags = self.config_flags.NAT_IS_INSIDE
1704        self.vapi.nat44_interface_add_del_feature(
1705            sw_if_index=self.pg0.sw_if_index,
1706            flags=flags, is_add=1)
1707        self.vapi.nat44_interface_add_del_feature(
1708            sw_if_index=self.pg1.sw_if_index,
1709            is_add=1)
1710
1711        # Client side - create sessions
1712        pkts = self.create_stream_in(self.pg0, self.pg1)
1713        self.pg0.add_stream(pkts)
1714        self.pg_enable_capture(self.pg_interfaces)
1715        self.pg_start()
1716
1717        # Server side - generate traffic
1718        capture = self.pg1.get_capture(len(pkts))
1719        self.verify_capture_out(capture)
1720        pkts = self.create_stream_out(self.pg1, ttl=2)
1721        self.pg1.add_stream(pkts)
1722        self.pg_enable_capture(self.pg_interfaces)
1723        self.pg_start()
1724
1725        # Client side - simulate ICMP type 11 response
1726        capture = self.pg0.get_capture(len(pkts))
1727        pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1728                IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1729                ICMP(type=11) / packet[IP] for packet in capture]
1730        self.pg0.add_stream(pkts)
1731        self.pg_enable_capture(self.pg_interfaces)
1732        self.pg_start()
1733
1734        # Server side - verify ICMP type 11 packets
1735        capture = self.pg1.get_capture(len(pkts))
1736        self.verify_capture_out_with_icmp_errors(capture)
1737
1738    def test_ping_out_interface_from_outside(self):
1739        """ Ping NAT44 out interface from outside network """
1740
1741        self.nat44_add_address(self.nat_addr)
1742        flags = self.config_flags.NAT_IS_INSIDE
1743        self.vapi.nat44_interface_add_del_feature(
1744            sw_if_index=self.pg0.sw_if_index,
1745            flags=flags, is_add=1)
1746        self.vapi.nat44_interface_add_del_feature(
1747            sw_if_index=self.pg1.sw_if_index,
1748            is_add=1)
1749
1750        p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1751             IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1752             ICMP(id=self.icmp_id_out, type='echo-request'))
1753        pkts = [p]
1754        self.pg1.add_stream(pkts)
1755        self.pg_enable_capture(self.pg_interfaces)
1756        self.pg_start()
1757        capture = self.pg1.get_capture(len(pkts))
1758        packet = capture[0]
1759        try:
1760            self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1761            self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1762            self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1763            self.assertEqual(packet[ICMP].type, 0)  # echo reply
1764        except:
1765            self.logger.error(ppp("Unexpected or invalid packet "
1766                                  "(outside network):", packet))
1767            raise
1768
1769    def test_ping_internal_host_from_outside(self):
1770        """ Ping internal host from outside network """
1771
1772        self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1773        flags = self.config_flags.NAT_IS_INSIDE
1774        self.vapi.nat44_interface_add_del_feature(
1775            sw_if_index=self.pg0.sw_if_index,
1776            flags=flags, is_add=1)
1777        self.vapi.nat44_interface_add_del_feature(
1778            sw_if_index=self.pg1.sw_if_index,
1779            is_add=1)
1780
1781        # out2in
1782        pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1783               IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1784               ICMP(id=self.icmp_id_out, type='echo-request'))
1785        self.pg1.add_stream(pkt)
1786        self.pg_enable_capture(self.pg_interfaces)
1787        self.pg_start()
1788        capture = self.pg0.get_capture(1)
1789        self.verify_capture_in(capture, self.pg0)
1790        self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1791
1792        # in2out
1793        pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1794               IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1795               ICMP(id=self.icmp_id_in, type='echo-reply'))
1796        self.pg0.add_stream(pkt)
1797        self.pg_enable_capture(self.pg_interfaces)
1798        self.pg_start()
1799        capture = self.pg1.get_capture(1)
1800        self.verify_capture_out(capture, same_port=True)
1801        self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1802
1803    def test_forwarding(self):
1804        """ NAT44 forwarding test """
1805
1806        flags = self.config_flags.NAT_IS_INSIDE
1807        self.vapi.nat44_interface_add_del_feature(
1808            sw_if_index=self.pg0.sw_if_index,
1809            flags=flags, is_add=1)
1810        self.vapi.nat44_interface_add_del_feature(
1811            sw_if_index=self.pg1.sw_if_index,
1812            is_add=1)
1813        self.vapi.nat44_forwarding_enable_disable(enable=1)
1814
1815        real_ip = self.pg0.remote_ip4
1816        alias_ip = self.nat_addr
1817        flags = self.config_flags.NAT_IS_ADDR_ONLY
1818        self.vapi.nat44_add_del_static_mapping(is_add=1,
1819                                               local_ip_address=real_ip,
1820                                               external_ip_address=alias_ip,
1821                                               external_sw_if_index=0xFFFFFFFF,
1822                                               flags=flags)
1823
1824        try:
1825            # static mapping match
1826
1827            pkts = self.create_stream_out(self.pg1)
1828            self.pg1.add_stream(pkts)
1829            self.pg_enable_capture(self.pg_interfaces)
1830            self.pg_start()
1831            capture = self.pg0.get_capture(len(pkts))
1832            self.verify_capture_in(capture, self.pg0)
1833
1834            pkts = self.create_stream_in(self.pg0, self.pg1)
1835            self.pg0.add_stream(pkts)
1836            self.pg_enable_capture(self.pg_interfaces)
1837            self.pg_start()
1838            capture = self.pg1.get_capture(len(pkts))
1839            self.verify_capture_out(capture, same_port=True)
1840
1841            # no static mapping match
1842
1843            host0 = self.pg0.remote_hosts[0]
1844            self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1845            try:
1846                pkts = self.create_stream_out(self.pg1,
1847                                              dst_ip=self.pg0.remote_ip4,
1848                                              use_inside_ports=True)
1849                self.pg1.add_stream(pkts)
1850                self.pg_enable_capture(self.pg_interfaces)
1851                self.pg_start()
1852                capture = self.pg0.get_capture(len(pkts))
1853                self.verify_capture_in(capture, self.pg0)
1854
1855                pkts = self.create_stream_in(self.pg0, self.pg1)
1856                self.pg0.add_stream(pkts)
1857                self.pg_enable_capture(self.pg_interfaces)
1858                self.pg_start()
1859                capture = self.pg1.get_capture(len(pkts))
1860                self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1861                                        same_port=True)
1862            finally:
1863                self.pg0.remote_hosts[0] = host0
1864
1865        finally:
1866            self.vapi.nat44_forwarding_enable_disable(enable=0)
1867            flags = self.config_flags.NAT_IS_ADDR_ONLY
1868            self.vapi.nat44_add_del_static_mapping(
1869                is_add=0,
1870                local_ip_address=real_ip,
1871                external_ip_address=alias_ip,
1872                external_sw_if_index=0xFFFFFFFF,
1873                flags=flags)
1874
1875    def test_static_in(self):
1876        """ 1:1 NAT initialized from inside network """
1877
1878        nat_ip = "10.0.0.10"
1879        self.tcp_port_out = 6303
1880        self.udp_port_out = 6304
1881        self.icmp_id_out = 6305
1882
1883        self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1884        flags = self.config_flags.NAT_IS_INSIDE
1885        self.vapi.nat44_interface_add_del_feature(
1886            sw_if_index=self.pg0.sw_if_index,
1887            flags=flags, is_add=1)
1888        self.vapi.nat44_interface_add_del_feature(
1889            sw_if_index=self.pg1.sw_if_index,
1890            is_add=1)
1891        sm = self.vapi.nat44_static_mapping_dump()
1892        self.assertEqual(len(sm), 1)
1893        self.assertEqual(sm[0].tag, '')
1894        self.assertEqual(sm[0].protocol, 0)
1895        self.assertEqual(sm[0].local_port, 0)
1896        self.assertEqual(sm[0].external_port, 0)
1897
1898        # in2out
1899        pkts = self.create_stream_in(self.pg0, self.pg1)
1900        self.pg0.add_stream(pkts)
1901        self.pg_enable_capture(self.pg_interfaces)
1902        self.pg_start()
1903        capture = self.pg1.get_capture(len(pkts))
1904        self.verify_capture_out(capture, nat_ip, True)
1905
1906        # out2in
1907        pkts = self.create_stream_out(self.pg1, nat_ip)
1908        self.pg1.add_stream(pkts)
1909        self.pg_enable_capture(self.pg_interfaces)
1910        self.pg_start()
1911        capture = self.pg0.get_capture(len(pkts))
1912        self.verify_capture_in(capture, self.pg0)
1913
1914    def test_static_out(self):
1915        """ 1:1 NAT initialized from outside network """
1916
1917        nat_ip = "10.0.0.20"
1918        self.tcp_port_out = 6303
1919        self.udp_port_out = 6304
1920        self.icmp_id_out = 6305
1921        tag = "testTAG"
1922
1923        self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
1924        flags = self.config_flags.NAT_IS_INSIDE
1925        self.vapi.nat44_interface_add_del_feature(
1926            sw_if_index=self.pg0.sw_if_index,
1927            flags=flags, is_add=1)
1928        self.vapi.nat44_interface_add_del_feature(
1929            sw_if_index=self.pg1.sw_if_index,
1930            is_add=1)
1931        sm = self.vapi.nat44_static_mapping_dump()
1932        self.assertEqual(len(sm), 1)
1933        self.assertEqual(sm[0].tag, tag)
1934
1935        # out2in
1936        pkts = self.create_stream_out(self.pg1, nat_ip)
1937        self.pg1.add_stream(pkts)
1938        self.pg_enable_capture(self.pg_interfaces)
1939        self.pg_start()
1940        capture = self.pg0.get_capture(len(pkts))
1941        self.verify_capture_in(capture, self.pg0)
1942
1943        # in2out
1944        pkts = self.create_stream_in(self.pg0, self.pg1)
1945        self.pg0.add_stream(pkts)
1946        self.pg_enable_capture(self.pg_interfaces)
1947        self.pg_start()
1948        capture = self.pg1.get_capture(len(pkts))
1949        self.verify_capture_out(capture, nat_ip, True)
1950
1951    def test_static_with_port_in(self):
1952        """ 1:1 NAPT initialized from inside network """
1953
1954        self.tcp_port_out = 3606
1955        self.udp_port_out = 3607
1956        self.icmp_id_out = 3608
1957
1958        self.nat44_add_address(self.nat_addr)
1959        self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1960                                      self.tcp_port_in, self.tcp_port_out,
1961                                      proto=IP_PROTOS.tcp)
1962        self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1963                                      self.udp_port_in, self.udp_port_out,
1964                                      proto=IP_PROTOS.udp)
1965        self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1966                                      self.icmp_id_in, self.icmp_id_out,
1967                                      proto=IP_PROTOS.icmp)
1968        flags = self.config_flags.NAT_IS_INSIDE
1969        self.vapi.nat44_interface_add_del_feature(
1970            sw_if_index=self.pg0.sw_if_index,
1971            flags=flags, is_add=1)
1972        self.vapi.nat44_interface_add_del_feature(
1973            sw_if_index=self.pg1.sw_if_index,
1974            is_add=1)
1975
1976        # in2out
1977        pkts = self.create_stream_in(self.pg0, self.pg1)
1978        self.pg0.add_stream(pkts)
1979        self.pg_enable_capture(self.pg_interfaces)
1980        self.pg_start()
1981        capture = self.pg1.get_capture(len(pkts))
1982        self.verify_capture_out(capture)
1983
1984        # out2in
1985        pkts = self.create_stream_out(self.pg1)
1986        self.pg1.add_stream(pkts)
1987        self.pg_enable_capture(self.pg_interfaces)
1988        self.pg_start()
1989        capture = self.pg0.get_capture(len(pkts))
1990        self.verify_capture_in(capture, self.pg0)
1991
1992    def test_static_with_port_out(self):
1993        """ 1:1 NAPT initialized from outside network """
1994
1995        self.tcp_port_out = 30606
1996        self.udp_port_out = 30607
1997        self.icmp_id_out = 30608
1998
1999        self.nat44_add_address(self.nat_addr)
2000        self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
2001                                      self.tcp_port_in, self.tcp_port_out,
2002                                      proto=IP_PROTOS.tcp)
2003        self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
2004                                      self.udp_port_in, self.udp_port_out,
2005                                      proto=IP_PROTOS.udp)
2006        self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
2007                                      self.icmp_id_in, self.icmp_id_out,
2008                                      proto=IP_PROTOS.icmp)
2009        flags = self.config_flags.NAT_IS_INSIDE
2010        self.vapi.nat44_interface_add_del_feature(
2011            sw_if_index=self.pg0.sw_if_index,
2012            flags=flags, is_add=1)
2013        self.vapi.nat44_interface_add_del_feature(
2014            sw_if_index=self.pg1.sw_if_index,
2015            is_add=1)
2016
2017        # out2in
2018        pkts = self.create_stream_out(self.pg1)
2019        self.pg1.add_stream(pkts)
2020        self.pg_enable_capture(self.pg_interfaces)
2021        self.pg_start()
2022        capture = self.pg0.get_capture(len(pkts))
2023        self.verify_capture_in(capture, self.pg0)
2024
2025        # in2out
2026        pkts = self.create_stream_in(self.pg0, self.pg1)
2027        self.pg0.add_stream(pkts)
2028        self.pg_enable_capture(self.pg_interfaces)
2029        self.pg_start()
2030        capture = self.pg1.get_capture(len(pkts))
2031        self.verify_capture_out(capture)
2032
2033    def test_static_vrf_aware(self):
2034        """ 1:1 NAT VRF awareness """
2035
2036        nat_ip1 = "10.0.0.30"
2037        nat_ip2 = "10.0.0.40"
2038        self.tcp_port_out = 6303
2039        self.udp_port_out = 6304
2040        self.icmp_id_out = 6305
2041
2042        self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
2043                                      vrf_id=10)
2044        self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
2045                                      vrf_id=10)
2046        flags = self.config_flags.NAT_IS_INSIDE
2047        self.vapi.nat44_interface_add_del_feature(
2048            sw_if_index=self.pg3.sw_if_index,
2049            is_add=1)
2050        self.vapi.nat44_interface_add_del_feature(
2051            sw_if_index=self.pg0.sw_if_index,
2052            flags=flags, is_add=1)
2053        self.vapi.nat44_interface_add_del_feature(
2054            sw_if_index=self.pg4.sw_if_index,
2055            flags=flags, is_add=1)
2056
2057        # inside interface VRF match NAT44 static mapping VRF
2058        pkts = self.create_stream_in(self.pg4, self.pg3)
2059        self.pg4.add_stream(pkts)
2060        self.pg_enable_capture(self.pg_interfaces)
2061        self.pg_start()
2062        capture = self.pg3.get_capture(len(pkts))
2063        self.verify_capture_out(capture, nat_ip1, True)
2064
2065        # inside interface VRF don't match NAT44 static mapping VRF (packets
2066        # are dropped)
2067        pkts = self.create_stream_in(self.pg0, self.pg3)
2068        self.pg0.add_stream(pkts)
2069        self.pg_enable_capture(self.pg_interfaces)
2070        self.pg_start()
2071        self.pg3.assert_nothing_captured()
2072
2073    def test_dynamic_to_static(self):
2074        """ Switch from dynamic translation to 1:1NAT """
2075        nat_ip = "10.0.0.10"
2076        self.tcp_port_out = 6303
2077        self.udp_port_out = 6304
2078        self.icmp_id_out = 6305
2079
2080        self.nat44_add_address(self.nat_addr)
2081        flags = self.config_flags.NAT_IS_INSIDE
2082        self.vapi.nat44_interface_add_del_feature(
2083            sw_if_index=self.pg0.sw_if_index,
2084            flags=flags, is_add=1)
2085        self.vapi.nat44_interface_add_del_feature(
2086            sw_if_index=self.pg1.sw_if_index,
2087            is_add=1)
2088
2089        # dynamic
2090        pkts = self.create_stream_in(self.pg0, self.pg1)
2091        self.pg0.add_stream(pkts)
2092        self.pg_enable_capture(self.pg_interfaces)
2093        self.pg_start()
2094        capture = self.pg1.get_capture(len(pkts))
2095        self.verify_capture_out(capture)
2096
2097        # 1:1NAT
2098        self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2099        sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
2100        self.assertEqual(len(sessions), 0)
2101        pkts = self.create_stream_in(self.pg0, self.pg1)
2102        self.pg0.add_stream(pkts)
2103        self.pg_enable_capture(self.pg_interfaces)
2104        self.pg_start()
2105        capture = self.pg1.get_capture(len(pkts))
2106        self.verify_capture_out(capture, nat_ip, True)
2107
2108    def test_identity_nat(self):
2109        """ Identity NAT """
2110        flags = self.config_flags.NAT_IS_ADDR_ONLY
2111        self.vapi.nat44_add_del_identity_mapping(
2112            ip_address=self.pg0.remote_ip4, sw_if_index=0xFFFFFFFF,
2113            flags=flags, is_add=1)
2114        flags = self.config_flags.NAT_IS_INSIDE
2115        self.vapi.nat44_interface_add_del_feature(
2116            sw_if_index=self.pg0.sw_if_index,
2117            flags=flags, is_add=1)
2118        self.vapi.nat44_interface_add_del_feature(
2119            sw_if_index=self.pg1.sw_if_index,
2120            is_add=1)
2121
2122        p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2123             IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
2124             TCP(sport=12345, dport=56789))
2125        self.pg1.add_stream(p)
2126        self.pg_enable_capture(self.pg_interfaces)
2127        self.pg_start()
2128        capture = self.pg0.get_capture(1)
2129        p = capture[0]
2130        try:
2131            ip = p[IP]
2132            tcp = p[TCP]
2133            self.assertEqual(ip.dst, self.pg0.remote_ip4)
2134            self.assertEqual(ip.src, self.pg1.remote_ip4)
2135            self.assertEqual(tcp.dport, 56789)
2136            self.assertEqual(tcp.sport, 12345)
2137            self.assert_packet_checksums_valid(p)
2138        except:
2139            self.logger.error(ppp("Unexpected or invalid packet:", p))
2140            raise
2141
2142        sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
2143        self.assertEqual(len(sessions), 0)
2144        flags = self.config_flags.NAT_IS_ADDR_ONLY
2145        self.vapi.nat44_add_del_identity_mapping(
2146            ip_address=self.pg0.remote_ip4, sw_if_index=0xFFFFFFFF,
2147            flags=flags, vrf_id=1, is_add=1)
2148        identity_mappings = self.vapi.nat44_identity_mapping_dump()
2149        self.assertEqual(len(identity_mappings), 2)
2150
2151    def test_multiple_inside_interfaces(self):
2152        """ NAT44 multiple non-overlapping address space inside interfaces """
2153
2154        self.nat44_add_address(self.nat_addr)
2155        flags = self.config_flags.NAT_IS_INSIDE
2156        self.vapi.nat44_interface_add_del_feature(
2157            sw_if_index=self.pg0.sw_if_index,
2158            flags=flags, is_add=1)
2159        self.vapi.nat44_interface_add_del_feature(
2160            sw_if_index=self.pg1.sw_if_index,
2161            flags=flags, is_add=1)
2162        self.vapi.nat44_interface_add_del_feature(
2163            sw_if_index=self.pg3.sw_if_index,
2164            is_add=1)
2165
2166        # between two NAT44 inside interfaces (no translation)
2167        pkts = self.create_stream_in(self.pg0, self.pg1)
2168        self.pg0.add_stream(pkts)
2169        self.pg_enable_capture(self.pg_interfaces)
2170        self.pg_start()
2171        capture = self.pg1.get_capture(len(pkts))
2172        self.verify_capture_no_translation(capture, self.pg0, self.pg1)
2173
2174        # from NAT44 inside to interface without NAT44 feature (no translation)
2175        pkts = self.create_stream_in(self.pg0, self.pg2)
2176        self.pg0.add_stream(pkts)
2177        self.pg_enable_capture(self.pg_interfaces)
2178        self.pg_start()
2179        capture = self.pg2.get_capture(len(pkts))
2180        self.verify_capture_no_translation(capture, self.pg0, self.pg2)
2181
2182        # in2out 1st interface
2183        pkts = self.create_stream_in(self.pg0, self.pg3)
2184        self.pg0.add_stream(pkts)
2185        self.pg_enable_capture(self.pg_interfaces)
2186        self.pg_start()
2187        capture = self.pg3.get_capture(len(pkts))
2188        self.verify_capture_out(capture)
2189
2190        # out2in 1st interface
2191        pkts = self.create_stream_out(self.pg3)
2192        self.pg3.add_stream(pkts)
2193        self.pg_enable_capture(self.pg_interfaces)
2194        self.pg_start()
2195        capture = self.pg0.get_capture(len(pkts))
2196        self.verify_capture_in(capture, self.pg0)
2197
2198        # in2out 2nd interface
2199        pkts = self.create_stream_in(self.pg1, self.pg3)
2200        self.pg1.add_stream(pkts)
2201        self.pg_enable_capture(self.pg_interfaces)
2202        self.pg_start()
2203        capture = self.pg3.get_capture(len(pkts))
2204        self.verify_capture_out(capture)
2205
2206        # out2in 2nd interface
2207        pkts = self.create_stream_out(self.pg3)
2208        self.pg3.add_stream(pkts)
2209        self.pg_enable_capture(self.pg_interfaces)
2210        self.pg_start()
2211        capture = self.pg1.get_capture(len(pkts))
2212        self.verify_capture_in(capture, self.pg1)
2213
2214    def test_inside_overlapping_interfaces(self):
2215        """ NAT44 multiple inside interfaces with overlapping address space """
2216
2217        static_nat_ip = "10.0.0.10"
2218        self.nat44_add_address(self.nat_addr)
2219        flags = self.config_flags.NAT_IS_INSIDE
2220        self.vapi.nat44_interface_add_del_feature(
2221            sw_if_index=self.pg3.sw_if_index,
2222            is_add=1)
2223        self.vapi.nat44_interface_add_del_feature(
2224            sw_if_index=self.pg4.sw_if_index,
2225            flags=flags, is_add=1)
2226        self.vapi.nat44_interface_add_del_feature(
2227            sw_if_index=self.pg5.sw_if_index,
2228            flags=flags, is_add=1)
2229        self.vapi.nat44_interface_add_del_feature(
2230            sw_if_index=self.pg6.sw_if_index,
2231            flags=flags, is_add=1)
2232        self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
2233                                      vrf_id=20)
2234
2235        # between NAT44 inside interfaces with same VRF (no translation)
2236        pkts = self.create_stream_in(self.pg4, self.pg5)
2237        self.pg4.add_stream(pkts)
2238        self.pg_enable_capture(self.pg_interfaces)
2239        self.pg_start()
2240        capture = self.pg5.get_capture(len(pkts))
2241        self.verify_capture_no_translation(capture, self.pg4, self.pg5)
2242
2243        # between NAT44 inside interfaces with different VRF (hairpinning)
2244        p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
2245             IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
2246             TCP(sport=1234, dport=5678))
2247        self.pg4.add_stream(p)
2248        self.pg_enable_capture(self.pg_interfaces)
2249        self.pg_start()
2250        capture = self.pg6.get_capture(1)
2251        p = capture[0]
2252        try:
2253            ip = p[IP]
2254            tcp = p[TCP]
2255            self.assertEqual(ip.src, self.nat_addr)
2256            self.assertEqual(ip.dst, self.pg6.remote_ip4)
2257            self.assertNotEqual(tcp.sport, 1234)
2258            self.assertEqual(tcp.dport, 5678)
2259        except:
2260            self.logger.error(ppp("Unexpected or invalid packet:", p))
2261            raise
2262
2263        # in2out 1st interface
2264        pkts = self.create_stream_in(self.pg4, self.pg3)
2265        self.pg4.add_stream(pkts)
2266        self.pg_enable_capture(self.pg_interfaces)
2267        self.pg_start()
2268        capture = self.pg3.get_capture(len(pkts))
2269        self.verify_capture_out(capture)
2270
2271        # out2in 1st interface
2272        pkts = self.create_stream_out(self.pg3)
2273        self.pg3.add_stream(pkts)
2274        self.pg_enable_capture(self.pg_interfaces)
2275        self.pg_start()
2276        capture = self.pg4.get_capture(len(pkts))
2277        self.verify_capture_in(capture, self.pg4)
2278
2279        # in2out 2nd interface
2280        pkts = self.create_stream_in(self.pg5, self.pg3)
2281        self.pg5.add_stream(pkts)
2282        self.pg_enable_capture(self.pg_interfaces)
2283        self.pg_start()
2284        capture = self.pg3.get_capture(len(pkts))
2285        self.verify_capture_out(capture)
2286
2287        # out2in 2nd interface
2288        pkts = self.create_stream_out(self.pg3)
2289        self.pg3.add_stream(pkts)
2290        self.pg_enable_capture(self.pg_interfaces)
2291        self.pg_start()
2292        capture = self.pg5.get_capture(len(pkts))
2293        self.verify_capture_in(capture, self.pg5)
2294
2295        # pg5 session dump
2296        addresses = self.vapi.nat44_address_dump()
2297        self.assertEqual(len(addresses), 1)
2298        sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4, 10)
2299        self.assertEqual(len(sessions), 3)
2300        for session in sessions:
2301            self.assertFalse(session.flags & self.config_flags.NAT_IS_STATIC)
2302            self.assertEqual(str(session.inside_ip_address),
2303                             self.pg5.remote_ip4)
2304            self.assertEqual(session.outside_ip_address,
2305                             addresses[0].ip_address)
2306        self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
2307        self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
2308        self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
2309        self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
2310        self.assertEqual(sessions[1].inside_port, self.udp_port_in)
2311        self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
2312        self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
2313        self.assertEqual(sessions[1].outside_port, self.udp_port_out)
2314        self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
2315
2316        # in2out 3rd interface
2317        pkts = self.create_stream_in(self.pg6, self.pg3)
2318        self.pg6.add_stream(pkts)
2319        self.pg_enable_capture(self.pg_interfaces)
2320        self.pg_start()
2321        capture = self.pg3.get_capture(len(pkts))
2322        self.verify_capture_out(capture, static_nat_ip, True)
2323
2324        # out2in 3rd interface
2325        pkts = self.create_stream_out(self.pg3, static_nat_ip)
2326        self.pg3.add_stream(pkts)
2327        self.pg_enable_capture(self.pg_interfaces)
2328        self.pg_start()
2329        capture = self.pg6.get_capture(len(pkts))
2330        self.verify_capture_in(capture, self.pg6)
2331
2332        # general user and session dump verifications
2333        users = self.vapi.nat44_user_dump()
2334        self.assertGreaterEqual(len(users), 3)
2335        addresses = self.vapi.nat44_address_dump()
2336        self.assertEqual(len(addresses), 1)
2337        for user in users:
2338            sessions = self.vapi.nat44_user_session_dump(user.ip_address,
2339                                                         user.vrf_id)
2340            for session in sessions:
2341                self.assertEqual(user.ip_address, session.inside_ip_address)
2342                self.assertTrue(session.total_bytes > session.total_pkts > 0)
2343                self.assertTrue(session.protocol in
2344                                [IP_PROTOS.tcp, IP_PROTOS.udp,
2345                                 IP_PROTOS.icmp])
2346                self.assertFalse(session.flags &
2347                                 self.config_flags.NAT_IS_EXT_HOST_VALID)
2348
2349        # pg4 session dump
2350        sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4, 10)
2351        self.assertGreaterEqual(len(sessions), 4)
2352        for session in sessions:
2353            self.assertFalse(session.flags & self.config_flags.NAT_IS_STATIC)
2354            self.assertEqual(str(session.inside_ip_address),
2355                             self.pg4.remote_ip4)
2356            self.assertEqual(session.outside_ip_address,
2357                             addresses[0].ip_address)
2358
2359        # pg6 session dump
2360        sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4, 20)
2361        self.assertGreaterEqual(len(sessions), 3)
2362        for session in sessions:
2363            self.assertTrue(session.flags & self.config_flags.NAT_IS_STATIC)
2364            self.assertEqual(str(session.inside_ip_address),
2365                             self.pg6.remote_ip4)
2366            self.assertEqual(str(session.outside_ip_address),
2367                             static_nat_ip)
2368            self.assertTrue(session.inside_port in
2369                            [self.tcp_port_in, self.udp_port_in,
2370                             self.icmp_id_in])
2371
2372    def test_hairpinning(self):
2373        """ NAT44 hairpinning - 1:1 NAPT """
2374
2375        host = self.pg0.remote_hosts[0]
2376        server = self.pg0.remote_hosts[1]
2377        host_in_port = 1234
2378        host_out_port = 0
2379        server_in_port = 5678
2380        server_out_port = 8765
2381
2382        self.nat44_add_address(self.nat_addr)
2383        flags = self.config_flags.NAT_IS_INSIDE
2384        self.vapi.nat44_interface_add_del_feature(
2385            sw_if_index=self.pg0.sw_if_index,
2386            flags=flags, is_add=1)
2387        self.vapi.nat44_interface_add_del_feature(
2388            sw_if_index=self.pg1.sw_if_index,
2389            is_add=1)
2390
2391        # add static mapping for server
2392        self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2393                                      server_in_port, server_out_port,
2394                                      proto=IP_PROTOS.tcp)
2395
2396        # send packet from host to server
2397        p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2398             IP(src=host.ip4, dst=self.nat_addr) /
2399             TCP(sport=host_in_port, dport=server_out_port))
2400        self.pg0.add_stream(p)
2401        self.pg_enable_capture(self.pg_interfaces)
2402        self.pg_start()
2403        capture = self.pg0.get_capture(1)
2404        p = capture[0]
2405        try:
2406            ip = p[IP]
2407            tcp = p[TCP]
2408            self.assertEqual(ip.src, self.nat_addr)
2409            self.assertEqual(ip.dst, server.ip4)
2410            self.assertNotEqual(tcp.sport, host_in_port)
2411            self.assertEqual(tcp.dport, server_in_port)
2412            self.assert_packet_checksums_valid(p)
2413            host_out_port = tcp.sport
2414        except:
2415            self.logger.error(ppp("Unexpected or invalid packet:", p))
2416            raise
2417
2418        # send reply from server to host
2419        p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2420             IP(src=server.ip4, dst=self.nat_addr) /
2421             TCP(sport=server_in_port, dport=host_out_port))
2422        self.pg0.add_stream(p)
2423        self.pg_enable_capture(self.pg_interfaces)
2424        self.pg_start()
2425        capture = self.pg0.get_capture(1)
2426        p = capture[0]
2427        try:
2428            ip = p[IP]
2429            tcp = p[TCP]
2430            self.assertEqual(ip.src, self.nat_addr)
2431            self.assertEqual(ip.dst, host.ip4)
2432            self.assertEqual(tcp.sport, server_out_port)
2433            self.assertEqual(tcp.dport, host_in_port)
2434            self.assert_packet_checksums_valid(p)
2435        except:
2436            self.logger.error(ppp("Unexpected or invalid packet:", p))
2437            raise
2438
2439    def test_hairpinning2(self):
2440        """ NAT44 hairpinning - 1:1 NAT"""
2441
2442        server1_nat_ip = "10.0.0.10"
2443        server2_nat_ip = "10.0.0.11"
2444        host = self.pg0.remote_hosts[0]
2445        server1 = self.pg0.remote_hosts[1]
2446        server2 = self.pg0.remote_hosts[2]
2447        server_tcp_port = 22
2448        server_udp_port = 20
2449
2450        self.nat44_add_address(self.nat_addr)
2451        flags = self.config_flags.NAT_IS_INSIDE
2452        self.vapi.nat44_interface_add_del_feature(
2453            sw_if_index=self.pg0.sw_if_index,
2454            flags=flags, is_add=1)
2455        self.vapi.nat44_interface_add_del_feature(
2456            sw_if_index=self.pg1.sw_if_index,
2457            is_add=1)
2458
2459        # add static mapping for servers
2460        self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
2461        self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
2462
2463        # host to server1
2464        pkts = []
2465        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2466             IP(src=host.ip4, dst=server1_nat_ip) /
2467             TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2468        pkts.append(p)
2469        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2470             IP(src=host.ip4, dst=server1_nat_ip) /
2471             UDP(sport=self.udp_port_in, dport=server_udp_port))
2472        pkts.append(p)
2473        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2474             IP(src=host.ip4, dst=server1_nat_ip) /
2475             ICMP(id=self.icmp_id_in, type='echo-request'))
2476        pkts.append(p)
2477        self.pg0.add_stream(pkts)
2478        self.pg_enable_capture(self.pg_interfaces)
2479        self.pg_start()
2480        capture = self.pg0.get_capture(len(pkts))
2481        for packet in capture:
2482            try:
2483                self.assertEqual(packet[IP].src, self.nat_addr)
2484                self.assertEqual(packet[IP].dst, server1.ip4)
2485                if packet.haslayer(TCP):
2486                    self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
2487                    self.assertEqual(packet[TCP].dport, server_tcp_port)
2488                    self.tcp_port_out = packet[TCP].sport
2489                    self.assert_packet_checksums_valid(packet)
2490                elif packet.haslayer(UDP):
2491                    self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
2492                    self.assertEqual(packet[UDP].dport, server_udp_port)
2493                    self.udp_port_out = packet[UDP].sport
2494                else:
2495                    self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
2496                    self.icmp_id_out = packet[ICMP].id
2497            except:
2498                self.logger.error(ppp("Unexpected or invalid packet:", packet))
2499                raise
2500
2501        # server1 to host
2502        pkts = []
2503        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2504             IP(src=server1.ip4, dst=self.nat_addr) /
2505             TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2506        pkts.append(p)
2507        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2508             IP(src=server1.ip4, dst=self.nat_addr) /
2509             UDP(sport=server_udp_port, dport=self.udp_port_out))
2510        pkts.append(p)
2511        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2512             IP(src=server1.ip4, dst=self.nat_addr) /
2513             ICMP(id=self.icmp_id_out, type='echo-reply'))
2514        pkts.append(p)
2515        self.pg0.add_stream(pkts)
2516        self.pg_enable_capture(self.pg_interfaces)
2517        self.pg_start()
2518        capture = self.pg0.get_capture(len(pkts))
2519        for packet in capture:
2520            try:
2521                self.assertEqual(packet[IP].src, server1_nat_ip)
2522                self.assertEqual(packet[IP].dst, host.ip4)
2523                if packet.haslayer(TCP):
2524                    self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2525                    self.assertEqual(packet[TCP].sport, server_tcp_port)
2526                    self.assert_packet_checksums_valid(packet)
2527                elif packet.haslayer(UDP):
2528                    self.assertEqual(packet[UDP].dport, self.udp_port_in)
2529                    self.assertEqual(packet[UDP].sport, server_udp_port)
2530                else:
2531                    self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2532            except:
2533                self.logger.error(ppp("Unexpected or invalid packet:", packet))
2534                raise
2535
2536        # server2 to server1
2537        pkts = []
2538        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2539             IP(src=server2.ip4, dst=server1_nat_ip) /
2540             TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2541        pkts.append(p)
2542        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2543             IP(src=server2.ip4, dst=server1_nat_ip) /
2544             UDP(sport=self.udp_port_in, dport=server_udp_port))
2545        pkts.append(p)
2546        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2547             IP(src=server2.ip4, dst=server1_nat_ip) /
2548             ICMP(id=self.icmp_id_in, type='echo-request'))
2549        pkts.append(p)
2550        self.pg0.add_stream(pkts)
2551        self.pg_enable_capture(self.pg_interfaces)
2552        self.pg_start()
2553        capture = self.pg0.get_capture(len(pkts))
2554        for packet in capture:
2555            try:
2556                self.assertEqual(packet[IP].src, server2_nat_ip)
2557                self.assertEqual(packet[IP].dst, server1.ip4)
2558                if packet.haslayer(TCP):
2559                    self.assertEqual(packet[TCP].sport, self.tcp_port_in)
2560                    self.assertEqual(packet[TCP].dport, server_tcp_port)
2561                    self.tcp_port_out = packet[TCP].sport
2562                    self.assert_packet_checksums_valid(packet)
2563                elif packet.haslayer(UDP):
2564                    self.assertEqual(packet[UDP].sport, self.udp_port_in)
2565                    self.assertEqual(packet[UDP].dport, server_udp_port)
2566                    self.udp_port_out = packet[UDP].sport
2567                else:
2568                    self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2569                    self.icmp_id_out = packet[ICMP].id
2570            except:
2571                self.logger.error(ppp("Unexpected or invalid packet:", packet))
2572                raise
2573
2574        # server1 to server2
2575        pkts = []
2576        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2577             IP(src=server1.ip4, dst=server2_nat_ip) /
2578             TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2579        pkts.append(p)
2580        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2581             IP(src=server1.ip4, dst=server2_nat_ip) /
2582             UDP(sport=server_udp_port, dport=self.udp_port_out))
2583        pkts.append(p)
2584        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2585             IP(src=server1.ip4, dst=server2_nat_ip) /
2586             ICMP(id=self.icmp_id_out, type='echo-reply'))
2587        pkts.append(p)
2588        self.pg0.add_stream(pkts)
2589        self.pg_enable_capture(self.pg_interfaces)
2590        self.pg_start()
2591        capture = self.pg0.get_capture(len(pkts))
2592        for packet in capture:
2593            try:
2594                self.assertEqual(packet[IP].src, server1_nat_ip)
2595                self.assertEqual(packet[IP].dst, server2.ip4)
2596                if packet.haslayer(TCP):
2597                    self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2598                    self.assertEqual(packet[TCP].sport, server_tcp_port)
2599                    self.assert_packet_checksums_valid(packet)
2600                elif packet.haslayer(UDP):
2601                    self.assertEqual(packet[UDP].dport, self.udp_port_in)
2602                    self.assertEqual(packet[UDP].sport, server_udp_port)
2603                else:
2604                    self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2605            except:
2606                self.logger.error(ppp("Unexpected or invalid packet:", packet))
2607                raise
2608
2609    def test_max_translations_per_user(self):
2610        """ MAX translations per user - recycle the least recently used """
2611
2612        self.nat44_add_address(self.nat_addr)
2613        flags = self.config_flags.NAT_IS_INSIDE
2614        self.vapi.nat44_interface_add_del_feature(
2615            sw_if_index=self.pg0.sw_if_index,
2616            flags=flags, is_add=1)
2617        self.vapi.nat44_interface_add_del_feature(
2618            sw_if_index=self.pg1.sw_if_index,
2619            is_add=1)
2620
2621        # get maximum number of translations per user
2622        nat44_config = self.vapi.nat_show_config()
2623
2624        # send more than maximum number of translations per user packets
2625        pkts_num = nat44_config.max_translations_per_user + 5
2626        pkts = []
2627        for port in range(0, pkts_num):
2628            p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2629                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2630                 TCP(sport=1025 + port))
2631            pkts.append(p)
2632        self.pg0.add_stream(pkts)
2633        self.pg_enable_capture(self.pg_interfaces)
2634        self.pg_start()
2635
2636        # verify number of translated packet
2637        self.pg1.get_capture(pkts_num)
2638
2639        users = self.vapi.nat44_user_dump()
2640        for user in users:
2641            if user.ip_address == self.pg0.remote_ip4:
2642                self.assertEqual(user.nsessions,
2643                                 nat44_config.max_translations_per_user)
2644                self.assertEqual(user.nstaticsessions, 0)
2645
2646        tcp_port = 22
2647        self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
2648                                      tcp_port, tcp_port,
2649                                      proto=IP_PROTOS.tcp)
2650        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2651             IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2652             TCP(sport=tcp_port))
2653        self.pg0.add_stream(p)
2654        self.pg_enable_capture(self.pg_interfaces)
2655        self.pg_start()
2656        self.pg1.get_capture(1)
2657        users = self.vapi.nat44_user_dump()
2658        for user in users:
2659            if user.ip_address == self.pg0.remote_ip4:
2660                self.assertEqual(user.nsessions,
2661                                 nat44_config.max_translations_per_user - 1)
2662                self.assertEqual(user.nstaticsessions, 1)
2663
2664    def test_interface_addr(self):
2665        """ Acquire NAT44 addresses from interface """
2666        self.vapi.nat44_add_del_interface_addr(
2667            is_add=1,
2668            sw_if_index=self.pg7.sw_if_index)
2669
2670        # no address in NAT pool
2671        addresses = self.vapi.nat44_address_dump()
2672        self.assertEqual(0, len(addresses))
2673
2674        # configure interface address and check NAT address pool
2675        self.pg7.config_ip4()
2676        addresses = self.vapi.nat44_address_dump()
2677        self.assertEqual(1, len(addresses))
2678        self.assertEqual(str(addresses[0].ip_address), self.pg7.local_ip4)
2679
2680        # remove interface address and check NAT address pool
2681        self.pg7.unconfig_ip4()
2682        addresses = self.vapi.nat44_address_dump()
2683        self.assertEqual(0, len(addresses))
2684
2685    def test_interface_addr_static_mapping(self):
2686        """ Static mapping with addresses from interface """
2687        tag = "testTAG"
2688
2689        self.vapi.nat44_add_del_interface_addr(
2690            is_add=1,
2691            sw_if_index=self.pg7.sw_if_index)
2692        self.nat44_add_static_mapping(
2693            '1.2.3.4',
2694            external_sw_if_index=self.pg7.sw_if_index,
2695            tag=tag)
2696
2697        # static mappings with external interface
2698        static_mappings = self.vapi.nat44_static_mapping_dump()
2699        self.assertEqual(1, len(static_mappings))
2700        self.assertEqual(self.pg7.sw_if_index,
2701                         static_mappings[0].external_sw_if_index)
2702        self.assertEqual(static_mappings[0].tag, tag)
2703
2704        # configure interface address and check static mappings
2705        self.pg7.config_ip4()
2706        static_mappings = self.vapi.nat44_static_mapping_dump()
2707        self.assertEqual(2, len(static_mappings))
2708        resolved = False
2709        for sm in static_mappings:
2710            if sm.external_sw_if_index == 0xFFFFFFFF:
2711                self.assertEqual(str(sm.external_ip_address),
2712                                 self.pg7.local_ip4)
2713                self.assertEqual(sm.tag, tag)
2714                resolved = True
2715        self.assertTrue(resolved)
2716
2717        # remove interface address and check static mappings
2718        self.pg7.unconfig_ip4()
2719        static_mappings = self.vapi.nat44_static_mapping_dump()
2720        self.assertEqual(1, len(static_mappings))
2721        self.assertEqual(self.pg7.sw_if_index,
2722                         static_mappings[0].external_sw_if_index)
2723        self.assertEqual(static_mappings[0].tag, tag)
2724
2725        # configure interface address again and check static mappings
2726        self.pg7.config_ip4()
2727        static_mappings = self.vapi.nat44_static_mapping_dump()
2728        self.assertEqual(2, len(static_mappings))
2729        resolved = False
2730        for sm in static_mappings:
2731            if sm.external_sw_if_index == 0xFFFFFFFF:
2732                self.assertEqual(str(sm.external_ip_address),
2733                                 self.pg7.local_ip4)
2734                self.assertEqual(sm.tag, tag)
2735                resolved = True
2736        self.assertTrue(resolved)
2737
2738        # remove static mapping
2739        self.nat44_add_static_mapping(
2740            '1.2.3.4',
2741            external_sw_if_index=self.pg7.sw_if_index,
2742            tag=tag,
2743            is_add=0)
2744        static_mappings = self.vapi.nat44_static_mapping_dump()
2745        self.assertEqual(0, len(static_mappings))
2746
2747    def test_interface_addr_identity_nat(self):
2748        """ Identity NAT with addresses from interface """
2749
2750        port = 53053
2751        self.vapi.nat44_add_del_interface_addr(
2752            is_add=1,
2753            sw_if_index=self.pg7.sw_if_index)
2754        self.vapi.nat44_add_del_identity_mapping(
2755            ip_address=b'0',
2756            sw_if_index=self.pg7.sw_if_index,
2757            port=port,
2758            protocol=IP_PROTOS.tcp,
2759            is_add=1)
2760
2761        # identity mappings with external interface
2762        identity_mappings = self.vapi.nat44_identity_mapping_dump()
2763        self.assertEqual(1, len(identity_mappings))
2764        self.assertEqual(self.pg7.sw_if_index,
2765                         identity_mappings[0].sw_if_index)
2766
2767        # configure interface address and check identity mappings
2768        self.pg7.config_ip4()
2769        identity_mappings = self.vapi.nat44_identity_mapping_dump()
2770        resolved = False
2771        self.assertEqual(2, len(identity_mappings))
2772        for sm in identity_mappings:
2773            if sm.sw_if_index == 0xFFFFFFFF:
2774                self.assertEqual(str(identity_mappings[0].ip_address),
2775                                 self.pg7.local_ip4)
2776                self.assertEqual(port, identity_mappings[0].port)
2777                self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2778                resolved = True
2779        self.assertTrue(resolved)
2780
2781        # remove interface address and check identity mappings
2782        self.pg7.unconfig_ip4()
2783        identity_mappings = self.vapi.nat44_identity_mapping_dump()
2784        self.assertEqual(1, len(identity_mappings))
2785        self.assertEqual(self.pg7.sw_if_index,
2786                         identity_mappings[0].sw_if_index)
2787
2788    def test_ipfix_nat44_sess(self):
2789        """ IPFIX logging NAT44 session created/deleted """
2790        self.ipfix_domain_id = 10
2791        self.ipfix_src_port = 20202
2792        collector_port = 30303
2793        bind_layers(UDP, IPFIX, dport=30303)
2794        self.nat44_add_address(self.nat_addr)
2795        flags = self.config_flags.NAT_IS_INSIDE
2796        self.vapi.nat44_interface_add_del_feature(
2797            sw_if_index=self.pg0.sw_if_index,
2798            flags=flags, is_add=1)
2799        self.vapi.nat44_interface_add_del_feature(
2800            sw_if_index=self.pg1.sw_if_index,
2801            is_add=1)
2802        self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
2803                                     src_address=self.pg3.local_ip4,
2804                                     path_mtu=512,
2805                                     template_interval=10,
2806                                     collector_port=collector_port)
2807        self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
2808                                           src_port=self.ipfix_src_port,
2809                                           enable=1)
2810
2811        pkts = self.create_stream_in(self.pg0, self.pg1)
2812        self.pg0.add_stream(pkts)
2813        self.pg_enable_capture(self.pg_interfaces)
2814        self.pg_start()
2815        capture = self.pg1.get_capture(len(pkts))
2816        self.verify_capture_out(capture)
2817        self.nat44_add_address(self.nat_addr, is_add=0)
2818        self.vapi.ipfix_flush()
2819        capture = self.pg3.get_capture(9)
2820        ipfix = IPFIXDecoder()
2821        # first load template
2822        for p in capture:
2823            self.assertTrue(p.haslayer(IPFIX))
2824            self.assertEqual(p[IP].src, self.pg3.local_ip4)
2825            self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2826            self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2827            self.assertEqual(p[UDP].dport, collector_port)
2828            self.assertEqual(p[IPFIX].observationDomainID,
2829                             self.ipfix_domain_id)
2830            if p.haslayer(Template):
2831                ipfix.add_template(p.getlayer(Template))
2832        # verify events in data set
2833        for p in capture:
2834            if p.haslayer(Data):
2835                data = ipfix.decode_data_set(p.getlayer(Set))
2836                self.verify_ipfix_nat44_ses(data)
2837
2838    def test_ipfix_addr_exhausted(self):
2839        """ IPFIX logging NAT addresses exhausted """
2840        flags = self.config_flags.NAT_IS_INSIDE
2841        self.vapi.nat44_interface_add_del_feature(
2842            sw_if_index=self.pg0.sw_if_index,
2843            flags=flags, is_add=1)
2844        self.vapi.nat44_interface_add_del_feature(
2845            sw_if_index=self.pg1.sw_if_index,
2846            is_add=1)
2847        self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
2848                                     src_address=self.pg3.local_ip4,
2849                                     path_mtu=512,
2850                                     template_interval=10)
2851        self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
2852                                           src_port=self.ipfix_src_port,
2853                                           enable=1)
2854
2855        p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2856             IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2857             TCP(sport=3025))
2858        self.pg0.add_stream(p)
2859        self.pg_enable_capture(self.pg_interfaces)
2860        self.pg_start()
2861        self.pg1.assert_nothing_captured()
2862        sleep(1)
2863        self.vapi.ipfix_flush()
2864        capture = self.pg3.get_capture(9)
2865        ipfix = IPFIXDecoder()
2866        # first load template
2867        for p in capture:
2868            self.assertTrue(p.haslayer(IPFIX))
2869            self.assertEqual(p[IP].src, self.pg3.local_ip4)
2870            self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2871            self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2872            self.assertEqual(p[UDP].dport, 4739)
2873            self.assertEqual(p[IPFIX].observationDomainID,
2874                             self.ipfix_domain_id)
2875            if p.haslayer(Template):
2876                ipfix.add_template(p.getlayer(Template))
2877        # verify events in data set
2878        for p in capture:
2879            if p.haslayer(Data):
2880                data = ipfix.decode_data_set(p.getlayer(Set))
2881                self.verify_ipfix_addr_exhausted(data)
2882
2883    @unittest.skipUnless(running_extended_tests, "part of extended tests")
2884    def test_ipfix_max_sessions(self):
2885        """ IPFIX logging maximum session entries exceeded """
2886        self.nat44_add_address(self.nat_addr)
2887        flags = self.config_flags.NAT_IS_INSIDE
2888        self.vapi.nat44_interface_add_del_feature(
2889            sw_if_index=self.pg0.sw_if_index,
2890            flags=flags, is_add=1)
2891        self.vapi.nat44_interface_add_del_feature(
2892            sw_if_index=self.pg1.sw_if_index,
2893            is_add=1)
2894
2895        nat44_config = self.vapi.nat_show_config()
2896        max_sessions = 10 * nat44_config.translation_buckets
2897
2898        pkts = []
2899        for i in range(0, max_sessions):
2900            src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2901            p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2902                 IP(src=src, dst=self.pg1.remote_ip4) /
2903                 TCP(sport=1025))
2904            pkts.append(p)
2905        self.pg0.add_stream(pkts)
2906        self.pg_enable_capture(self.pg_interfaces)
2907        self.pg_start()
2908
2909        self.pg1.get_capture(max_sessions)
2910        self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
2911                                     src_address=self.pg3.local_ip4,
2912                                     path_mtu=512,
2913                                     template_interval=10)
2914        self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
2915                                           src_port=self.ipfix_src_port,
2916                                           enable=1)
2917
2918        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2919             IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2920             TCP(sport=1025))
2921        self.pg0.add_stream(p)
2922        self.pg_enable_capture(self.pg_interfaces)
2923        self.pg_start()
2924        self.pg1.assert_nothing_captured()
2925        sleep(1)
2926        self.vapi.ipfix_flush()
2927        capture = self.pg3.get_capture(9)
2928        ipfix = IPFIXDecoder()
2929        # first load template
2930        for p in capture:
2931            self.assertTrue(p.haslayer(IPFIX))
2932            self.assertEqual(p[IP].src, self.pg3.local_ip4)
2933            self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2934            self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2935            self.assertEqual(p[UDP].dport, 4739)
2936            self.assertEqual(p[IPFIX].observationDomainID,
2937                             self.ipfix_domain_id)
2938            if p.haslayer(Template):
2939                ipfix.add_template(p.getlayer(Template))
2940        # verify events in data set
2941        for p in capture:
2942            if p.haslayer(Data):
2943                data = ipfix.decode_data_set(p.getlayer(Set))
2944                self.verify_ipfix_max_sessions(data, max_sessions)
2945
2946    def test_syslog_apmap(self):
2947        """ Test syslog address and port mapping creation and deletion """
2948        self.vapi.syslog_set_filter(
2949            self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
2950        self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
2951        self.nat44_add_address(self.nat_addr)
2952        flags = self.config_flags.NAT_IS_INSIDE
2953        self.vapi.nat44_interface_add_del_feature(
2954            sw_if_index=self.pg0.sw_if_index,
2955            flags=flags, is_add=1)
2956        self.vapi.nat44_interface_add_del_feature(
2957            sw_if_index=self.pg1.sw_if_index,
2958            is_add=1)
2959
2960        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2961             IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2962             TCP(sport=self.tcp_port_in, dport=20))
2963        self.pg0.add_stream(p)
2964        self.pg_enable_capture(self.pg_interfaces)
2965        self.pg_start()
2966        capture = self.pg1.get_capture(1)
2967        self.tcp_port_out = capture[0][TCP].sport
2968        capture = self.pg3.get_capture(1)
2969        self.verify_syslog_apmap(capture[0][Raw].load)
2970
2971        self.pg_enable_capture(self.pg_interfaces)
2972        self.pg_start()
2973        self.nat44_add_address(self.nat_addr, is_add=0)
2974        capture = self.pg3.get_capture(1)
2975        self.verify_syslog_apmap(capture[0][Raw].load, False)
2976
2977    def test_pool_addr_fib(self):
2978        """ NAT44 add pool addresses to FIB """
2979        static_addr = '10.0.0.10'
2980        self.nat44_add_address(self.nat_addr)
2981        flags = self.config_flags.NAT_IS_INSIDE
2982        self.vapi.nat44_interface_add_del_feature(
2983            sw_if_index=self.pg0.sw_if_index,
2984            flags=flags, is_add=1)
2985        self.vapi.nat44_interface_add_del_feature(
2986            sw_if_index=self.pg1.sw_if_index,
2987            is_add=1)
2988        self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2989
2990        # NAT44 address
2991        p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2992             ARP(op=ARP.who_has, pdst=self.nat_addr,
2993                 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2994        self.pg1.add_stream(p)
2995        self.pg_enable_capture(self.pg_interfaces)
2996        self.pg_start()
2997        capture = self.pg1.get_capture(1)
2998        self.assertTrue(capture[0].haslayer(ARP))
2999        self.assertTrue(capture[0][ARP].op, ARP.is_at)
3000
3001        # 1:1 NAT address
3002        p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
3003             ARP(op=ARP.who_has, pdst=static_addr,
3004                 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
3005        self.pg1.add_stream(p)
3006        self.pg_enable_capture(self.pg_interfaces)
3007        self.pg_start()
3008        capture = self.pg1.get_capture(1)
3009        self.assertTrue(capture[0].haslayer(ARP))
3010        self.assertTrue(capture[0][ARP].op, ARP.is_at)
3011
3012        # send ARP to non-NAT44 interface
3013        p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
3014             ARP(op=ARP.who_has, pdst=self.nat_addr,
3015                 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
3016        self.pg2.add_stream(p)
3017        self.pg_enable_capture(self.pg_interfaces)
3018        self.pg_start()
3019        self.pg1.assert_nothing_captured()
3020
3021        # remove addresses and verify
3022        self.nat44_add_address(self.nat_addr, is_add=0)
3023        self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
3024                                      is_add=0)
3025
3026        p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
3027             ARP(op=ARP.who_has, pdst=self.nat_addr,
3028                 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
3029        self.pg1.add_stream(p)
3030        self.pg_enable_capture(self.pg_interfaces)
3031        self.pg_start()
3032        self.pg1.assert_nothing_captured()
3033
3034        p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
3035             ARP(op=ARP.who_has, pdst=static_addr,
3036                 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
3037        self.pg1.add_stream(p)
3038        self.pg_enable_capture(self.pg_interfaces)
3039        self.pg_start()
3040        self.pg1.assert_nothing_captured()
3041
3042    def test_vrf_mode(self):
3043        """ NAT44 tenant VRF aware address pool mode """
3044
3045        vrf_id1 = 1
3046        vrf_id2 = 2
3047        nat_ip1 = "10.0.0.10"
3048        nat_ip2 = "10.0.0.11"
3049
3050        self.pg0.unconfig_ip4()
3051        self.pg1.unconfig_ip4()
3052        self.vapi.ip_table_add_del(is_add=1, table_id=vrf_id1)
3053        self.vapi.ip_table_add_del(is_add=1, table_id=vrf_id2)
3054        self.pg0.set_table_ip4(vrf_id1)
3055        self.pg1.set_table_ip4(vrf_id2)
3056        self.pg0.config_ip4()
3057        self.pg1.config_ip4()
3058        self.pg0.resolve_arp()
3059        self.pg1.resolve_arp()
3060
3061        self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
3062        self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
3063        flags = self.config_flags.NAT_IS_INSIDE
3064        self.vapi.nat44_interface_add_del_feature(
3065            sw_if_index=self.pg0.sw_if_index,
3066            flags=flags, is_add=1)
3067        self.vapi.nat44_interface_add_del_feature(
3068            sw_if_index=self.pg1.sw_if_index,
3069            flags=flags, is_add=1)
3070        self.vapi.nat44_interface_add_del_feature(
3071            sw_if_index=self.pg2.sw_if_index,
3072            is_add=1)
3073
3074        try:
3075            # first VRF
3076            pkts = self.create_stream_in(self.pg0, self.pg2)
3077            self.pg0.add_stream(pkts)
3078            self.pg_enable_capture(self.pg_interfaces)
3079            self.pg_start()
3080            capture = self.pg2.get_capture(len(pkts))
3081            self.verify_capture_out(capture, nat_ip1)
3082
3083            # second VRF
3084            pkts = self.create_stream_in(self.pg1, self.pg2)
3085            self.pg1.add_stream(pkts)
3086            self.pg_enable_capture(self.pg_interfaces)
3087            self.pg_start()
3088            capture = self.pg2.get_capture(len(pkts))
3089            self.verify_capture_out(capture, nat_ip2)
3090
3091        finally:
3092            self.pg0.unconfig_ip4()
3093            self.pg1.unconfig_ip4()
3094            self.pg0.set_table_ip4(0)
3095            self.pg1.set_table_ip4(0)
3096            self.pg0.config_ip4()
3097            self.pg1.config_ip4()
3098            self.pg0.resolve_arp()
3099            self.pg1.resolve_arp()
3100            self.vapi.ip_table_add_del(is_add=0, table_id=vrf_id1)
3101            self.vapi.ip_table_add_del(is_add=0, table_id=vrf_id2)
3102
3103    def test_vrf_feature_independent(self):
3104        """ NAT44 tenant VRF independent address pool mode """
3105
3106        nat_ip1 = "10.0.0.10"
3107        nat_ip2 = "10.0.0.11"
3108
3109        self.nat44_add_address(nat_ip1)
3110        self.nat44_add_address(nat_ip2, vrf_id=99)
3111        flags = self.config_flags.NAT_IS_INSIDE
3112        self.vapi.nat44_interface_add_del_feature(
3113            sw_if_index=self.pg0.sw_if_index,
3114            flags=flags, is_add=1)
3115        self.vapi.nat44_interface_add_del_feature(
3116            sw_if_index=self.pg1.sw_if_index,
3117            flags=flags, is_add=1)
3118        self.vapi.nat44_interface_add_del_feature(
3119            sw_if_index=self.pg2.sw_if_index,
3120            is_add=1)
3121
3122        # first VRF
3123        pkts = self.create_stream_in(self.pg0, self.pg2)
3124        self.pg0.add_stream(pkts)
3125        self.pg_enable_capture(self.pg_interfaces)
3126        self.pg_start()
3127        capture = self.pg2.get_capture(len(pkts))
3128        self.verify_capture_out(capture, nat_ip1)
3129
3130        # second VRF
3131        pkts = self.create_stream_in(self.pg1, self.pg2)
3132        self.pg1.add_stream(pkts)
3133        self.pg_enable_capture(self.pg_interfaces)
3134        self.pg_start()
3135        capture = self.pg2.get_capture(len(pkts))
3136        self.verify_capture_out(capture, nat_ip1)
3137
3138    def create_routes_and_neigbors(self):
3139        r1 = VppIpRoute(self, self.pg7.remote_ip4, 32,
3140                        [VppRoutePath(self.pg7.remote_ip4,
3141                                      self.pg7.sw_if_index)])
3142        r2 = VppIpRoute(self, self.pg8.remote_ip4, 32,
3143                        [VppRoutePath(self.pg8.remote_ip4,
3144                                      self.pg8.sw_if_index)])
3145        r1.add_vpp_config()
3146        r2.add_vpp_config()
3147
3148        n1 = VppNeighbor(self,
3149                         self.pg7.sw_if_index,
3150                         self.pg7.remote_mac,
3151                         self.pg7.remote_ip4,
3152                         is_static=1)
3153        n2 = VppNeighbor(self,
3154                         self.pg8.sw_if_index,
3155                         self.pg8.remote_mac,
3156                         self.pg8.remote_ip4,
3157                         is_static=1)
3158        n1.add_vpp_config()
3159        n2.add_vpp_config()
3160
3161    def test_dynamic_ipless_interfaces(self):
3162        """ NAT44 interfaces without configured IP address """
3163        self.create_routes_and_neigbors()
3164        self.nat44_add_address(self.nat_addr)
3165        flags = self.config_flags.NAT_IS_INSIDE
3166        self.vapi.nat44_interface_add_del_feature(
3167            sw_if_index=self.pg7.sw_if_index,
3168            flags=flags, is_add=1)
3169        self.vapi.nat44_interface_add_del_feature(
3170            sw_if_index=self.pg8.sw_if_index,
3171            is_add=1)
3172
3173        # in2out
3174        pkts = self.create_stream_in(self.pg7, self.pg8)
3175        self.pg7.add_stream(pkts)
3176        self.pg_enable_capture(self.pg_interfaces)
3177        self.pg_start()
3178        capture = self.pg8.get_capture(len(pkts))
3179        self.verify_capture_out(capture)
3180
3181        # out2in
3182        pkts = self.create_stream_out(self.pg8, self.nat_addr)
3183        self.pg8.add_stream(pkts)
3184        self.pg_enable_capture(self.pg_interfaces)
3185        self.pg_start()
3186        capture = self.pg7.get_capture(len(pkts))
3187        self.verify_capture_in(capture, self.pg7)
3188
3189    def test_static_ipless_interfaces(self):
3190        """ NAT44 interfaces without configured IP address - 1:1 NAT """
3191
3192        self.create_routes_and_neigbors()
3193        self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
3194        flags = self.config_flags.NAT_IS_INSIDE
3195        self.vapi.nat44_interface_add_del_feature(
3196            sw_if_index=self.pg7.sw_if_index,
3197            flags=flags, is_add=1)
3198        self.vapi.nat44_interface_add_del_feature(
3199            sw_if_index=self.pg8.sw_if_index,
3200            is_add=1)
3201
3202        # out2in
3203        pkts = self.create_stream_out(self.pg8)
3204        self.pg8.add_stream(pkts)
3205        self.pg_enable_capture(self.pg_interfaces)
3206        self.pg_start()
3207        capture = self.pg7.get_capture(len(pkts))
3208        self.verify_capture_in(capture, self.pg7)
3209
3210        # in2out
3211        pkts = self.create_stream_in(self.pg7, self.pg8)
3212        self.pg7.add_stream(pkts)
3213        self.pg_enable_capture(self.pg_interfaces)
3214        self.pg_start()
3215        capture = self.pg8.get_capture(len(pkts))
3216        self.verify_capture_out(capture, self.nat_addr, True)
3217
3218    def test_static_with_port_ipless_interfaces(self):
3219        """ NAT44 interfaces without configured IP address - 1:1 NAPT """
3220
3221        self.tcp_port_out = 30606
3222        self.udp_port_out = 30607
3223        self.icmp_id_out = 30608
3224
3225        self.create_routes_and_neigbors()
3226        self.nat44_add_address(self.nat_addr)
3227        self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
3228                                      self.tcp_port_in, self.tcp_port_out,
3229                                      proto=IP_PROTOS.tcp)
3230        self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
3231                                      self.udp_port_in, self.udp_port_out,
3232                                      proto=IP_PROTOS.udp)
3233        self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
3234                                      self.icmp_id_in, self.icmp_id_out,
3235                                      proto=IP_PROTOS.icmp)
3236        flags = self.config_flags.NAT_IS_INSIDE
3237        self.vapi.nat44_interface_add_del_feature(
3238            sw_if_index=self.pg7.sw_if_index,
3239            flags=flags, is_add=1)
3240        self.vapi.nat44_interface_add_del_feature(
3241            sw_if_index=self.pg8.sw_if_index,
3242            is_add=1)
3243
3244        # out2in
3245        pkts = self.create_stream_out(self.pg8)
3246        self.pg8.add_stream(pkts)
3247        self.pg_enable_capture(self.pg_interfaces)
3248        self.pg_start()
3249        capture = self.pg7.get_capture(len(pkts))
3250        self.verify_capture_in(capture, self.pg7)
3251
3252        # in2out
3253        pkts = self.create_stream_in(self.pg7, self.pg8)
3254        self.pg7.add_stream(pkts)
3255        self.pg_enable_capture(self.pg_interfaces)
3256        self.pg_start()
3257        capture = self.pg8.get_capture(len(pkts))
3258        self.verify_capture_out(capture)
3259
3260    def test_static_unknown_proto(self):
3261        """ 1:1 NAT translate packet with unknown protocol """
3262        nat_ip = "10.0.0.10"
3263        self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
3264        flags = self.config_flags.NAT_IS_INSIDE
3265        self.vapi.nat44_interface_add_del_feature(
3266            sw_if_index=self.pg0.sw_if_index,
3267            flags=flags, is_add=1)
3268        self.vapi.nat44_interface_add_del_feature(
3269            sw_if_index=self.pg1.sw_if_index,
3270            is_add=1)
3271
3272        # in2out
3273        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3274             IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3275             GRE() /
3276             IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3277             TCP(sport=1234, dport=1234))
3278        self.pg0.add_stream(p)
3279        self.pg_enable_capture(self.pg_interfaces)
3280        self.pg_start()
3281        p = self.pg1.get_capture(1)
3282        packet = p[0]
3283        try:
3284            self.assertEqual(packet[IP].src, nat_ip)
3285            self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3286            self.assertEqual(packet.haslayer(GRE), 1)
3287            self.assert_packet_checksums_valid(packet)
3288        except:
3289            self.logger.error(ppp("Unexpected or invalid packet:", packet))
3290            raise
3291
3292        # out2in
3293        p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3294             IP(src=self.pg1.remote_ip4, dst=nat_ip) /
3295             GRE() /
3296             IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3297             TCP(sport=1234, dport=1234))
3298        self.pg1.add_stream(p)
3299        self.pg_enable_capture(self.pg_interfaces)
3300        self.pg_start()
3301        p = self.pg0.get_capture(1)
3302        packet = p[0]
3303        try:
3304            self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
3305            self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
3306            self.assertEqual(packet.haslayer(GRE), 1)
3307            self.assert_packet_checksums_valid(packet)
3308        except:
3309            self.logger.error(ppp("Unexpected or invalid packet:", packet))
3310            raise
3311
3312    def test_hairpinning_static_unknown_proto(self):
3313        """ 1:1 NAT translate packet with unknown protocol - hairpinning """
3314
3315        host = self.pg0.remote_hosts[0]
3316        server = self.pg0.remote_hosts[1]
3317
3318        host_nat_ip = "10.0.0.10"
3319        server_nat_ip = "10.0.0.11"
3320
3321        self.nat44_add_static_mapping(host.ip4, host_nat_ip)
3322        self.nat44_add_static_mapping(server.ip4, server_nat_ip)
3323        flags = self.config_flags.NAT_IS_INSIDE
3324        self.vapi.nat44_interface_add_del_feature(
3325            sw_if_index=self.pg0.sw_if_index,
3326            flags=flags, is_add=1)
3327        self.vapi.nat44_interface_add_del_feature(
3328            sw_if_index=self.pg1.sw_if_index,
3329            is_add=1)
3330
3331        # host to server
3332        p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
3333             IP(src=host.ip4, dst=server_nat_ip) /
3334             GRE() /
3335             IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3336             TCP(sport=1234, dport=1234))
3337        self.pg0.add_stream(p)
3338        self.pg_enable_capture(self.pg_interfaces)
3339        self.pg_start()
3340        p = self.pg0.get_capture(1)
3341        packet = p[0]
3342        try:
3343            self.assertEqual(packet[IP].src, host_nat_ip)
3344            self.assertEqual(packet[IP].dst, server.ip4)
3345            self.assertEqual(packet.haslayer(GRE), 1)
3346            self.assert_packet_checksums_valid(packet)
3347        except:
3348            self.logger.error(ppp("Unexpected or invalid packet:", packet))
3349            raise
3350
3351        # server to host
3352        p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3353             IP(src=server.ip4, dst=host_nat_ip) /
3354             GRE() /
3355             IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3356             TCP(sport=1234, dport=1234))
3357        self.pg0.add_stream(p)
3358        self.pg_enable_capture(self.pg_interfaces)
3359        self.pg_start()
3360        p = self.pg0.get_capture(1)
3361        packet = p[0]
3362        try:
3363            self.assertEqual(packet[IP].src, server_nat_ip)
3364            self.assertEqual(packet[IP].dst, host.ip4)
3365            self.assertEqual(packet.haslayer(GRE), 1)
3366            self.assert_packet_checksums_valid(packet)
3367        except:
3368            self.logger.error(ppp("Unexpected or invalid packet:", packet))
3369            raise
3370
3371    def test_output_feature(self):
3372        """ NAT44 interface output feature (in2out postrouting) """
3373        self.nat44_add_address(self.nat_addr)
3374        flags = self.config_flags.NAT_IS_INSIDE
3375        self.vapi.nat44_interface_add_del_output_feature(
3376            is_add=1, flags=flags,
3377            sw_if_index=self.pg0.sw_if_index)
3378        self.vapi.nat44_interface_add_del_output_feature(
3379            is_add=1, flags=flags,
3380            sw_if_index=self.pg1.sw_if_index)
3381        self.vapi.nat44_interface_add_del_output_feature(
3382            is_add=1,
3383            sw_if_index=self.pg3.sw_if_index)
3384
3385        # in2out
3386        pkts = self.create_stream_in(self.pg0, self.pg3)
3387        self.pg0.add_stream(pkts)
3388        self.pg_enable_capture(self.pg_interfaces)
3389        self.pg_start()
3390        capture = self.pg3.get_capture(len(pkts))
3391        self.verify_capture_out(capture)
3392
3393        # out2in
3394        pkts = self.create_stream_out(self.pg3)
3395        self.pg3.add_stream(pkts)
3396        self.pg_enable_capture(self.pg_interfaces)
3397        self.pg_start()
3398        capture = self.pg0.get_capture(len(pkts))
3399        self.verify_capture_in(capture, self.pg0)
3400
3401        # from non-NAT interface to NAT inside interface
3402        pkts = self.create_stream_in(self.pg2, self.pg0)
3403        self.pg2.add_stream(pkts)
3404        self.pg_enable_capture(self.pg_interfaces)
3405        self.pg_start()
3406        capture = self.pg0.get_capture(len(pkts))
3407        self.verify_capture_no_translation(capture, self.pg2, self.pg0)
3408
3409    def test_output_feature_vrf_aware(self):
3410        """ NAT44 interface output feature VRF aware (in2out postrouting) """
3411        nat_ip_vrf10 = "10.0.0.10"
3412        nat_ip_vrf20 = "10.0.0.20"
3413
3414        r1 = VppIpRoute(self, self.pg3.remote_ip4, 32,
3415                        [VppRoutePath(self.pg3.remote_ip4,
3416                                      self.pg3.sw_if_index)],
3417                        table_id=10)
3418        r2 = VppIpRoute(self, self.pg3.remote_ip4, 32,
3419                        [VppRoutePath(self.pg3.remote_ip4,
3420                                      self.pg3.sw_if_index)],
3421                        table_id=20)
3422        r1.add_vpp_config()
3423        r2.add_vpp_config()
3424
3425        self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
3426        self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
3427        flags = self.config_flags.NAT_IS_INSIDE
3428        self.vapi.nat44_interface_add_del_output_feature(
3429            is_add=1, flags=flags,
3430            sw_if_index=self.pg4.sw_if_index)
3431        self.vapi.nat44_interface_add_del_output_feature(
3432            is_add=1, flags=flags,
3433            sw_if_index=self.pg6.sw_if_index)
3434        self.vapi.nat44_interface_add_del_output_feature(
3435            is_add=1,
3436            sw_if_index=self.pg3.sw_if_index)
3437
3438        # in2out VRF 10
3439        pkts = self.create_stream_in(self.pg4, self.pg3)
3440        self.pg4.add_stream(pkts)
3441        self.pg_enable_capture(self.pg_interfaces)
3442        self.pg_start()
3443        capture = self.pg3.get_capture(len(pkts))
3444        self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
3445
3446        # out2in VRF 10
3447        pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
3448        self.pg3.add_stream(pkts)
3449        self.pg_enable_capture(self.pg_interfaces)
3450        self.pg_start()
3451        capture = self.pg4.get_capture(len(pkts))
3452        self.verify_capture_in(capture, self.pg4)
3453
3454        # in2out VRF 20
3455        pkts = self.create_stream_in(self.pg6, self.pg3)
3456        self.pg6.add_stream(pkts)
3457        self.pg_enable_capture(self.pg_interfaces)
3458        self.pg_start()
3459        capture = self.pg3.get_capture(len(pkts))
3460        self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
3461
3462        # out2in VRF 20
3463        pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
3464        self.pg3.add_stream(pkts)
3465        self.pg_enable_capture(self.pg_interfaces)
3466        self.pg_start()
3467        capture = self.pg6.get_capture(len(pkts))
3468        self.verify_capture_in(capture, self.pg6)
3469
3470    def test_output_feature_hairpinning(self):
3471        """ NAT44 interface output feature hairpinning (in2out postrouting) """
3472        host = self.pg0.remote_hosts[0]
3473        server = self.pg0.remote_hosts[1]
3474        host_in_port = 1234
3475        host_out_port = 0
3476        server_in_port = 5678
3477        server_out_port = 8765
3478
3479        self.nat44_add_address(self.nat_addr)
3480        flags = self.config_flags.NAT_IS_INSIDE
3481        self.vapi.nat44_interface_add_del_output_feature(
3482            is_add=1, flags=flags,
3483            sw_if_index=self.pg0.sw_if_index)
3484        self.vapi.nat44_interface_add_del_output_feature(
3485            is_add=1,
3486            sw_if_index=self.pg1.sw_if_index)
3487
3488        # add static mapping for server
3489        self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3490                                      server_in_port, server_out_port,
3491                                      proto=IP_PROTOS.tcp)
3492
3493        # send packet from host to server
3494        p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3495             IP(src=host.ip4, dst=self.nat_addr) /
3496             TCP(sport=host_in_port, dport=server_out_port))
3497        self.pg0.add_stream(p)
3498        self.pg_enable_capture(self.pg_interfaces)
3499        self.pg_start()
3500        capture = self.pg0.get_capture(1)
3501        p = capture[0]
3502        try:
3503            ip = p[IP]
3504            tcp = p[TCP]
3505            self.assertEqual(ip.src, self.nat_addr)
3506            self.assertEqual(ip.dst, server.ip4)
3507            self.assertNotEqual(tcp.sport, host_in_port)
3508            self.assertEqual(tcp.dport, server_in_port)
3509            self.assert_packet_checksums_valid(p)
3510            host_out_port = tcp.sport
3511        except:
3512            self.logger.error(ppp("Unexpected or invalid packet:", p))
3513            raise
3514
3515        # send reply from server to host
3516        p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3517             IP(src=server.ip4, dst=self.nat_addr) /
3518             TCP(sport=server_in_port, dport=host_out_port))
3519        self.pg0.add_stream(p)
3520        self.pg_enable_capture(self.pg_interfaces)
3521        self.pg_start()
3522        capture = self.pg0.get_capture(1)
3523        p = capture[0]
3524        try:
3525            ip = p[IP]
3526            tcp = p[TCP]
3527            self.assertEqual(ip.src, self.nat_addr)
3528            self.assertEqual(ip.dst, host.ip4)
3529            self.assertEqual(tcp.sport, server_out_port)
3530            self.assertEqual(tcp.dport, host_in_port)
3531            self.assert_packet_checksums_valid(p)
3532        except:
3533            self.logger.error(ppp("Unexpected or invalid packet:", p))
3534            raise
3535
3536    def test_one_armed_nat44(self):
3537        """ One armed NAT44 """
3538        remote_host = self.pg9.remote_hosts[0]
3539        local_host = self.pg9.remote_hosts[1]
3540        external_port = 0
3541
3542        self.nat44_add_address(self.nat_addr)
3543        flags = self.config_flags.NAT_IS_INSIDE
3544        self.vapi.nat44_interface_add_del_feature(
3545            sw_if_index=self.pg9.sw_if_index,
3546            is_add=1)
3547        self.vapi.nat44_interface_add_del_feature(
3548            sw_if_index=self.pg9.sw_if_index,
3549            flags=flags, is_add=1)
3550
3551        # in2out
3552        p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3553             IP(src=local_host.ip4, dst=remote_host.ip4) /
3554             TCP(sport=12345, dport=80))
3555        self.pg9.add_stream(p)
3556        self.pg_enable_capture(self.pg_interfaces)
3557        self.pg_start()
3558        capture = self.pg9.get_capture(1)
3559        p = capture[0]
3560        try:
3561            ip = p[IP]
3562            tcp = p[TCP]
3563            self.assertEqual(ip.src, self.nat_addr)
3564            self.assertEqual(ip.dst, remote_host.ip4)
3565            self.assertNotEqual(tcp.sport, 12345)
3566            external_port = tcp.sport
3567            self.assertEqual(tcp.dport, 80)
3568            self.assert_packet_checksums_valid(p)
3569        except:
3570            self.logger.error(ppp("Unexpected or invalid packet:", p))
3571            raise
3572
3573        # out2in
3574        p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3575             IP(src=remote_host.ip4, dst=self.nat_addr) /
3576             TCP(sport=80, dport=external_port))
3577        self.pg9.add_stream(p)
3578        self.pg_enable_capture(self.pg_interfaces)
3579        self.pg_start()
3580        capture = self.pg9.get_capture(1)
3581        p = capture[0]
3582        try:
3583            ip = p[IP]
3584            tcp = p[TCP]
3585            self.assertEqual(ip.src, remote_host.ip4)
3586            self.assertEqual(ip.dst, local_host.ip4)
3587            self.assertEqual(tcp.sport, 80)
3588            self.assertEqual(tcp.dport, 12345)
3589            self.assert_packet_checksums_valid(p)
3590        except:
3591            self.logger.error(ppp("Unexpected or invalid packet:", p))
3592            raise
3593
3594        err = self.statistics.get_err_counter(
3595            '/err/nat44-classify/next in2out')
3596        self.assertEqual(err, 1)
3597        err = self.statistics.get_err_counter(
3598            '/err/nat44-classify/next out2in')
3599        self.assertEqual(err, 1)
3600
3601    def test_del_session(self):
3602        """ Delete NAT44 session """
3603        self.nat44_add_address(self.nat_addr)
3604        flags = self.config_flags.NAT_IS_INSIDE
3605        self.vapi.nat44_interface_add_del_feature(
3606            sw_if_index=self.pg0.sw_if_index,
3607            flags=flags, is_add=1)
3608        self.vapi.nat44_interface_add_del_feature(
3609            sw_if_index=self.pg1.sw_if_index,
3610            is_add=1)
3611
3612        pkts = self.create_stream_in(self.pg0, self.pg1)
3613        self.pg0.add_stream(pkts)
3614        self.pg_enable_capture(self.pg_interfaces)
3615        self.pg_start()
3616        self.pg1.get_capture(len(pkts))
3617
3618        sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3619        nsessions = len(sessions)
3620
3621        self.vapi.nat44_del_session(address=sessions[0].inside_ip_address,
3622                                    port=sessions[0].inside_port,
3623                                    protocol=sessions[0].protocol,
3624                                    flags=self.config_flags.NAT_IS_INSIDE)
3625        self.vapi.nat44_del_session(address=sessions[1].outside_ip_address,
3626                                    port=sessions[1].outside_port,
3627                                    protocol=sessions[1].protocol)
3628
3629        sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3630        self.assertEqual(nsessions - len(sessions), 2)
3631
3632        self.vapi.nat44_del_session(address=sessions[0].inside_ip_address,
3633                                    port=sessions[0].inside_port,
3634                                    protocol=sessions[0].protocol,
3635                                    flags=self.config_flags.NAT_IS_INSIDE)
3636
3637        self.verify_no_nat44_user()
3638
3639    def test_set_get_reass(self):
3640        """ NAT44 set/get virtual fragmentation reassembly """
3641        reas_cfg1 = self.vapi.nat_get_reass()
3642
3643        self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
3644                                max_reass=reas_cfg1.ip4_max_reass * 2,
3645                                max_frag=reas_cfg1.ip4_max_frag * 2,
3646                                drop_frag=0)
3647
3648        reas_cfg2 = self.vapi.nat_get_reass()
3649
3650        self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
3651        self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
3652        self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
3653
3654        self.vapi.nat_set_reass(timeout=2, max_reass=1024, max_frag=5,
3655                                drop_frag=1)
3656        self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
3657
3658    def test_frag_in_order(self):
3659        """ NAT44 translate fragments arriving in order """
3660
3661        self.nat44_add_address(self.nat_addr)
3662        flags = self.config_flags.NAT_IS_INSIDE
3663        self.vapi.nat44_interface_add_del_feature(
3664            sw_if_index=self.pg0.sw_if_index,
3665            flags=flags, is_add=1)
3666        self.vapi.nat44_interface_add_del_feature(
3667            sw_if_index=self.pg1.sw_if_index,
3668            is_add=1)
3669
3670        reas_cfg1 = self.vapi.nat_get_reass()
3671        # this test was intermittently failing in some cases
3672        # until we temporarily bump the reassembly timeouts
3673        self.vapi.nat_set_reass(timeout=20, max_reass=1024, max_frag=5,
3674                                drop_frag=0)
3675
3676        self.frag_in_order(proto=IP_PROTOS.tcp)
3677        self.frag_in_order(proto=IP_PROTOS.udp)
3678        self.frag_in_order(proto=IP_PROTOS.icmp)
3679
3680        # restore the reassembly timeouts
3681        self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout,
3682                                max_reass=reas_cfg1.ip4_max_reass,
3683                                max_frag=reas_cfg1.ip4_max_frag,
3684                                drop_frag=reas_cfg1.ip4_drop_frag)
3685
3686    def test_frag_forwarding(self):
3687        """ NAT44 forwarding fragment test """
3688        self.vapi.nat44_add_del_interface_addr(
3689            is_add=1,
3690            sw_if_index=self.pg1.sw_if_index)
3691        flags = self.config_flags.NAT_IS_INSIDE
3692        self.vapi.nat44_interface_add_del_feature(
3693            sw_if_index=self.pg0.sw_if_index,
3694            flags=flags, is_add=1)
3695        self.vapi.nat44_interface_add_del_feature(
3696            sw_if_index=self.pg1.sw_if_index,
3697            is_add=1)
3698        self.vapi.nat44_forwarding_enable_disable(enable=1)
3699
3700        data = b"A" * 16 + b"B" * 16 + b"C" * 3
3701        pkts = self.create_stream_frag(self.pg1,
3702                                       self.pg0.remote_ip4,
3703                                       4789,
3704                                       4789,
3705                                       data,
3706                                       proto=IP_PROTOS.udp)
3707        self.pg1.add_stream(pkts)
3708        self.pg_enable_capture(self.pg_interfaces)
3709        self.pg_start()
3710        frags = self.pg0.get_capture(len(pkts))
3711        p = self.reass_frags_and_verify(frags,
3712                                        self.pg1.remote_ip4,
3713                                        self.pg0.remote_ip4)
3714        self.assertEqual(p[UDP].sport, 4789)
3715        self.assertEqual(p[UDP].dport, 4789)
3716        self.assertEqual(data, p[Raw].load)
3717
3718    def test_reass_hairpinning(self):
3719        """ NAT44 fragments hairpinning """
3720
3721        self.server = self.pg0.remote_hosts[1]
3722        self.host_in_port = random.randint(1025, 65535)
3723        self.server_in_port = random.randint(1025, 65535)
3724        self.server_out_port = random.randint(1025, 65535)
3725
3726        self.nat44_add_address(self.nat_addr)
3727        flags = self.config_flags.NAT_IS_INSIDE
3728        self.vapi.nat44_interface_add_del_feature(
3729            sw_if_index=self.pg0.sw_if_index,
3730            flags=flags, is_add=1)
3731        self.vapi.nat44_interface_add_del_feature(
3732            sw_if_index=self.pg1.sw_if_index,
3733            is_add=1)
3734        # add static mapping for server
3735        self.nat44_add_static_mapping(self.server.ip4, self.nat_addr,
3736                                      self.server_in_port,
3737                                      self.server_out_port,
3738                                      proto=IP_PROTOS.tcp)
3739        self.nat44_add_static_mapping(self.server.ip4, self.nat_addr,
3740                                      self.server_in_port,
3741                                      self.server_out_port,
3742                                      proto=IP_PROTOS.udp)
3743        self.nat44_add_static_mapping(self.server.ip4, self.nat_addr)
3744
3745        self.reass_hairpinning(proto=IP_PROTOS.tcp)
3746        self.reass_hairpinning(proto=IP_PROTOS.udp)
3747        self.reass_hairpinning(proto=IP_PROTOS.icmp)
3748
3749    def test_frag_out_of_order(self):
3750        """ NAT44 translate fragments arriving out of order """
3751
3752        self.nat44_add_address(self.nat_addr)
3753        flags = self.config_flags.NAT_IS_INSIDE
3754        self.vapi.nat44_interface_add_del_feature(
3755            sw_if_index=self.pg0.sw_if_index,
3756            flags=flags, is_add=1)
3757        self.vapi.nat44_interface_add_del_feature(
3758            sw_if_index=self.pg1.sw_if_index,
3759            is_add=1)
3760
3761        self.frag_out_of_order(proto=IP_PROTOS.tcp)
3762        self.frag_out_of_order(proto=IP_PROTOS.udp)
3763        self.frag_out_of_order(proto=IP_PROTOS.icmp)
3764
3765    def test_port_restricted(self):
3766        """ Port restricted NAT44 (MAP-E CE) """
3767        self.nat44_add_address(self.nat_addr)
3768        flags = self.config_flags.NAT_IS_INSIDE
3769        self.vapi.nat44_interface_add_del_feature(
3770            sw_if_index=self.pg0.sw_if_index,
3771            flags=flags, is_add=1)
3772        self.vapi.nat44_interface_add_del_feature(
3773            sw_if_index=self.pg1.sw_if_index,
3774            is_add=1)
3775        self.vapi.nat_set_addr_and_port_alloc_alg(alg=1,
3776                                                  psid_offset=6,
3777                                                  psid_length=6,
3778                                                  psid=10)
3779
3780        p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3781             IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3782             TCP(sport=4567, dport=22))
3783        self.pg0.add_stream(p)
3784        self.pg_enable_capture(self.pg_interfaces)
3785        self.pg_start()
3786        capture = self.pg1.get_capture(1)
3787        p = capture[0]
3788        try:
3789            ip = p[IP]
3790            tcp = p[TCP]
3791            self.assertEqual(ip.dst, self.pg1.remote_ip4)
3792            self.assertEqual(ip.src, self.nat_addr)
3793            self.assertEqual(tcp.dport, 22)
3794            self.assertNotEqual(tcp.sport, 4567)
3795            self.assertEqual((tcp.sport >> 6) & 63, 10)
3796            self.assert_packet_checksums_valid(p)
3797        except:
3798            self.logger.error(ppp("Unexpected or invalid packet:", p))
3799            raise
3800
3801    def test_port_range(self):
3802        """ External address port range """
3803        self.nat44_add_address(self.nat_addr)
3804        flags = self.config_flags.NAT_IS_INSIDE
3805        self.vapi.nat44_interface_add_del_feature(
3806            sw_if_index=self.pg0.sw_if_index,
3807            flags=flags, is_add=1)
3808        self.vapi.nat44_interface_add_del_feature(
3809            sw_if_index=self.pg1.sw_if_index,
3810            is_add=1)
3811        self.vapi.nat_set_addr_and_port_alloc_alg(alg=2,
3812                                                  start_port=1025,
3813                                                  end_port=1027)
3814
3815        pkts = []
3816        for port in range(0, 5):
3817            p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3818                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3819                 TCP(sport=1125 + port))
3820            pkts.append(p)
3821        self.pg0.add_stream(pkts)
3822        self.pg_enable_capture(self.pg_interfaces)
3823        self.pg_start()
3824        capture = self.pg1.get_capture(3)
3825        for p in capture:
3826            tcp = p[TCP]
3827            self.assertGreaterEqual(tcp.sport, 1025)
3828            self.assertLessEqual(tcp.sport, 1027)
3829
3830    def test_ipfix_max_frags(self):
3831        """ IPFIX logging maximum fragments pending reassembly exceeded """
3832        self.nat44_add_address(self.nat_addr)
3833        flags = self.config_flags.NAT_IS_INSIDE
3834        self.vapi.nat44_interface_add_del_feature(
3835            sw_if_index=self.pg0.sw_if_index,
3836            flags=flags, is_add=1)
3837        self.vapi.nat44_interface_add_del_feature(
3838            sw_if_index=self.pg1.sw_if_index,
3839            is_add=1)
3840        self.vapi.nat_set_reass(timeout=2, max_reass=1024, max_frag=1,
3841                                drop_frag=0)
3842        self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
3843                                     src_address=self.pg3.local_ip4,
3844                                     path_mtu=512,
3845                                     template_interval=10)
3846        self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
3847                                           src_port=self.ipfix_src_port,
3848                                           enable=1)
3849
3850        data = b"A" * 4 + b"B" * 16 + b"C" * 3
3851        self.tcp_port_in = random.randint(1025, 65535)
3852        pkts = self.create_stream_frag(self.pg0,
3853                                       self.pg1.remote_ip4,
3854                                       self.tcp_port_in,
3855                                       20,
3856                                       data)
3857        pkts.reverse()
3858        self.pg0.add_stream(pkts)
3859        self.pg_enable_capture(self.pg_interfaces)
3860        self.pg_start()
3861        self.pg1.assert_nothing_captured()
3862        sleep(1)
3863        self.vapi.ipfix_flush()
3864        capture = self.pg3.get_capture(9)
3865        ipfix = IPFIXDecoder()
3866        # first load template
3867        for p in capture:
3868            self.assertTrue(p.haslayer(IPFIX))
3869            self.assertEqual(p[IP].src, self.pg3.local_ip4)
3870            self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
3871            self.assertEqual(p[UDP].sport, self.ipfix_src_port)
3872            self.assertEqual(p[UDP].dport, 4739)
3873            self.assertEqual(p[IPFIX].observationDomainID,
3874                             self.ipfix_domain_id)
3875            if p.haslayer(Template):
3876                ipfix.add_template(p.getlayer(Template))
3877        # verify events in data set
3878        for p in capture:
3879            if p.haslayer(Data):
3880                data = ipfix.decode_data_set(p.getlayer(Set))
3881                self.verify_ipfix_max_fragments_ip4(data, 1,
3882                                                    self.pg0.remote_ip4n)
3883
3884    def test_multiple_outside_vrf(self):
3885        """ Multiple outside VRF """
3886        vrf_id1 = 1
3887        vrf_id2 = 2
3888
3889        self.pg1.unconfig_ip4()
3890        self.pg2.unconfig_ip4()
3891        self.vapi.ip_table_add_del(is_add=1, table_id=vrf_id1)
3892        self.vapi.ip_table_add_del(is_add=1, table_id=vrf_id2)
3893        self.pg1.set_table_ip4(vrf_id1)
3894        self.pg2.set_table_ip4(vrf_id2)
3895        self.pg1.config_ip4()
3896        self.pg2.config_ip4()
3897        self.pg1.resolve_arp()
3898        self.pg2.resolve_arp()
3899
3900        self.nat44_add_address(self.nat_addr)
3901        flags = self.config_flags.NAT_IS_INSIDE
3902        self.vapi.nat44_interface_add_del_feature(
3903            sw_if_index=self.pg0.sw_if_index,
3904            flags=flags, is_add=1)
3905        self.vapi.nat44_interface_add_del_feature(
3906            sw_if_index=self.pg1.sw_if_index,
3907            is_add=1)
3908        self.vapi.nat44_interface_add_del_feature(
3909            sw_if_index=self.pg2.sw_if_index,
3910            is_add=1)
3911
3912        try:
3913            # first VRF
3914            pkts = self.create_stream_in(self.pg0, self.pg1)
3915            self.pg0.add_stream(pkts)
3916            self.pg_enable_capture(self.pg_interfaces)
3917            self.pg_start()
3918            capture = self.pg1.get_capture(len(pkts))
3919            self.verify_capture_out(capture, self.nat_addr)
3920
3921            pkts = self.create_stream_out(self.pg1, self.nat_addr)
3922            self.pg1.add_stream(pkts)
3923            self.pg_enable_capture(self.pg_interfaces)
3924            self.pg_start()
3925            capture = self.pg0.get_capture(len(pkts))
3926            self.verify_capture_in(capture, self.pg0)
3927
3928            self.tcp_port_in = 60303
3929            self.udp_port_in = 60304
3930            self.icmp_id_in = 60305
3931
3932            # second VRF
3933            pkts = self.create_stream_in(self.pg0, self.pg2)
3934            self.pg0.add_stream(pkts)
3935            self.pg_enable_capture(self.pg_interfaces)
3936            self.pg_start()
3937            capture = self.pg2.get_capture(len(pkts))
3938            self.verify_capture_out(capture, self.nat_addr)
3939
3940            pkts = self.create_stream_out(self.pg2, self.nat_addr)
3941            self.pg2.add_stream(pkts)
3942            self.pg_enable_capture(self.pg_interfaces)
3943            self.pg_start()
3944            capture = self.pg0.get_capture(len(pkts))
3945            self.verify_capture_in(capture, self.pg0)
3946
3947        finally:
3948            self.nat44_add_address(self.nat_addr, is_add=0)
3949            self.pg1.unconfig_ip4()
3950            self.pg2.unconfig_ip4()
3951            self.pg1.set_table_ip4(0)
3952            self.pg2.set_table_ip4(0)
3953            self.pg1.config_ip4()
3954            self.pg2.config_ip4()
3955            self.pg1.resolve_arp()
3956            self.pg2.resolve_arp()
3957
3958    @unittest.skipUnless(running_extended_tests, "part of extended tests")
3959    def test_session_timeout(self):
3960        """ NAT44 session timeouts """
3961        self.nat44_add_address(self.nat_addr)
3962        flags = self.config_flags.NAT_IS_INSIDE
3963        self.vapi.nat44_interface_add_del_feature(
3964            sw_if_index=self.pg0.sw_if_index,
3965            flags=flags, is_add=1)
3966        self.vapi.nat44_interface_add_del_feature(
3967            sw_if_index=self.pg1.sw_if_index,
3968            is_add=1)
3969        self.vapi.nat_set_timeouts(udp=5, tcp_established=7440,
3970                                   tcp_transitory=240, icmp=60)
3971
3972        max_sessions = 1000
3973        pkts = []
3974        for i in range(0, max_sessions):
3975            src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
3976            p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3977                 IP(src=src, dst=self.pg1.remote_ip4) /
3978                 UDP(sport=1025, dport=53))
3979            pkts.append(p)
3980        self.pg0.add_stream(pkts)
3981        self.pg_enable_capture(self.pg_interfaces)
3982        self.pg_start()
3983        self.pg1.get_capture(max_sessions)
3984
3985        sleep(6)
3986
3987        pkts = []
3988        for i in range(0, max_sessions):
3989            src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
3990            p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3991                 IP(src=src, dst=self.pg1.remote_ip4) /
3992                 UDP(sport=1026, dport=53))
3993            pkts.append(p)
3994        self.pg0.add_stream(pkts)
3995        self.pg_enable_capture(self.pg_interfaces)
3996        self.pg_start()
3997        self.pg1.get_capture(max_sessions)
3998
3999        nsessions = 0
4000        users = self.vapi.nat44_user_dump()
4001        for user in users:
4002            nsessions = nsessions + user.nsessions
4003        self.assertLess(nsessions, 2 * max_sessions)
4004
4005    def test_mss_clamping(self):
4006        """ TCP MSS clamping """
4007        self.nat44_add_address(self.nat_addr)
4008        flags = self.config_flags.NAT_IS_INSIDE
4009        self.vapi.nat44_interface_add_del_feature(
4010            sw_if_index=self.pg0.sw_if_index,
4011            flags=flags, is_add=1)
4012        self.vapi.nat44_interface_add_del_feature(
4013            sw_if_index=self.pg1.sw_if_index,
4014            is_add=1)
4015
4016        p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4017             IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4018             TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4019                 flags="S", options=[('MSS', 1400)]))
4020
4021        self.vapi.nat_set_mss_clamping(enable=1, mss_value=1000)
4022        self.pg0.add_stream(p)
4023        self.pg_enable_capture(self.pg_interfaces)
4024        self.pg_start()
4025        capture = self.pg1.get_capture(1)
4026        # Negotiated MSS value greater than configured - changed
4027        self.verify_mss_value(capture[0], 1000)
4028
4029        self.vapi.nat_set_mss_clamping(enable=0, mss_value=1500)
4030        self.pg0.add_stream(p)
4031        self.pg_enable_capture(self.pg_interfaces)
4032        self.pg_start()
4033        capture = self.pg1.get_capture(1)
4034        # MSS clamping disabled - negotiated MSS unchanged
4035        self.verify_mss_value(capture[0], 1400)
4036
4037        self.vapi.nat_set_mss_clamping(enable=1, mss_value=1500)
4038        self.pg0.add_stream(p)
4039        self.pg_enable_capture(self.pg_interfaces)
4040        self.pg_start()
4041        capture = self.pg1.get_capture(1)
4042        # Negotiated MSS value smaller than configured - unchanged
4043        self.verify_mss_value(capture[0], 1400)
4044
4045    @unittest.skipUnless(running_extended_tests, "part of extended tests")
4046    def test_ha_send(self):
4047        """ Send HA session synchronization events (active) """
4048        self.nat44_add_address(self.nat_addr)
4049        flags = self.config_flags.NAT_IS_INSIDE
4050        self.vapi.nat44_interface_add_del_feature(
4051            sw_if_index=self.pg0.sw_if_index,
4052            flags=flags, is_add=1)
4053        self.vapi.nat44_interface_add_del_feature(
4054            sw_if_index=self.pg1.sw_if_index,
4055            is_add=1)
4056        self.vapi.nat_ha_set_listener(ip_address=self.pg3.local_ip4,
4057                                      port=12345,
4058                                      path_mtu=512)
4059        self.vapi.nat_ha_set_failover(ip_address=self.pg3.remote_ip4,
4060                                      port=12346, session_refresh_interval=10)
4061        bind_layers(UDP, HANATStateSync, sport=12345)
4062
4063        # create sessions
4064        pkts = self.create_stream_in(self.pg0, self.pg1)
4065        self.pg0.add_stream(pkts)
4066        self.pg_enable_capture(self.pg_interfaces)
4067        self.pg_start()
4068        capture = self.pg1.get_capture(len(pkts))
4069        self.verify_capture_out(capture)
4070        # active send HA events
4071        self.vapi.nat_ha_flush()
4072        stats = self.statistics.get_counter('/nat44/ha/add-event-send')
4073        self.assertEqual(stats[0][0], 3)
4074        capture = self.pg3.get_capture(1)
4075        p = capture[0]
4076        self.assert_packet_checksums_valid(p)
4077        try:
4078            ip = p[IP]
4079            udp = p[UDP]
4080            hanat = p[HANATStateSync]
4081        except IndexError:
4082            self.logger.error(ppp("Invalid packet:", p))
4083            raise
4084        else:
4085            self.assertEqual(ip.src, self.pg3.local_ip4)
4086            self.assertEqual(ip.dst, self.pg3.remote_ip4)
4087            self.assertEqual(udp.sport, 12345)
4088            self.assertEqual(udp.dport, 12346)
4089            self.assertEqual(hanat.version, 1)
4090            self.assertEqual(hanat.thread_index, 0)
4091            self.assertEqual(hanat.count, 3)
4092            seq = hanat.sequence_number
4093            for event in hanat.events:
4094                self.assertEqual(event.event_type, 1)
4095                self.assertEqual(event.in_addr, self.pg0.remote_ip4)
4096                self.assertEqual(event.out_addr, self.nat_addr)
4097                self.assertEqual(event.fib_index, 0)
4098
4099        # ACK received events
4100        ack = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
4101               IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
4102               UDP(sport=12346, dport=12345) /
4103               HANATStateSync(sequence_number=seq, flags='ACK'))
4104        self.pg3.add_stream(ack)
4105        self.pg_start()
4106        stats = self.statistics.get_counter('/nat44/ha/ack-recv')
4107        self.assertEqual(stats[0][0], 1)
4108
4109        # delete one session
4110        self.pg_enable_capture(self.pg_interfaces)
4111        self.vapi.nat44_del_session(address=self.pg0.remote_ip4,
4112                                    port=self.tcp_port_in,
4113                                    protocol=IP_PROTOS.tcp,
4114                                    flags=self.config_flags.NAT_IS_INSIDE)
4115        self.vapi.nat_ha_flush()
4116        stats = self.statistics.get_counter('/nat44/ha/del-event-send')
4117        self.assertEqual(stats[0][0], 1)
4118        capture = self.pg3.get_capture(1)
4119        p = capture[0]
4120        try:
4121            hanat = p[HANATStateSync]
4122        except IndexError:
4123            self.logger.error(ppp("Invalid packet:", p))
4124            raise
4125        else:
4126            self.assertGreater(hanat.sequence_number, seq)
4127
4128        # do not send ACK, active retry send HA event again
4129        self.pg_enable_capture(self.pg_interfaces)
4130        sleep(12)
4131        stats = self.statistics.get_counter('/nat44/ha/retry-count')
4132        self.assertEqual(stats[0][0], 3)
4133        stats = self.statistics.get_counter('/nat44/ha/missed-count')
4134        self.assertEqual(stats[0][0], 1)
4135        capture = self.pg3.get_capture(3)
4136        for packet in capture:
4137            self.assertEqual(packet, p)
4138
4139        # session counters refresh
4140        pkts = self.create_stream_out(self.pg1)
4141        self.pg1.add_stream(pkts)
4142        self.pg_enable_capture(self.pg_interfaces)
4143        self.pg_start()
4144        self.pg0.get_capture(2)
4145        self.vapi.nat_ha_flush()
4146        stats = self.statistics.get_counter('/nat44/ha/refresh-event-send')
4147        self.assertEqual(stats[0][0], 2)
4148        capture = self.pg3.get_capture(1)
4149        p = capture[0]
4150        self.assert_packet_checksums_valid(p)
4151        try:
4152            ip = p[IP]
4153            udp = p[UDP]
4154            hanat = p[HANATStateSync]
4155        except IndexError:
4156            self.logger.error(ppp("Invalid packet:", p))
4157            raise
4158        else:
4159            self.assertEqual(ip.src, self.pg3.local_ip4)
4160            self.assertEqual(ip.dst, self.pg3.remote_ip4)
4161            self.assertEqual(udp.sport, 12345)
4162            self.assertEqual(udp.dport, 12346)
4163            self.assertEqual(hanat.version, 1)
4164            self.assertEqual(hanat.count, 2)
4165            seq = hanat.sequence_number
4166            for event in hanat.events:
4167                self.assertEqual(event.event_type, 3)
4168                self.assertEqual(event.out_addr, self.nat_addr)
4169                self.assertEqual(event.fib_index, 0)
4170                self.assertEqual(event.total_pkts, 2)
4171                self.assertGreater(event.total_bytes, 0)
4172
4173        ack = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
4174               IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
4175               UDP(sport=12346, dport=12345) /
4176               HANATStateSync(sequence_number=seq, flags='ACK'))
4177        self.pg3.add_stream(ack)
4178        self.pg_start()
4179        stats = self.statistics.get_counter('/nat44/ha/ack-recv')
4180        self.assertEqual(stats[0][0], 2)
4181
4182    def test_ha_recv(self):
4183        """ Receive HA session synchronization events (passive) """
4184        self.nat44_add_address(self.nat_addr)
4185        flags = self.config_flags.NAT_IS_INSIDE
4186        self.vapi.nat44_interface_add_del_feature(
4187            sw_if_index=self.pg0.sw_if_index,
4188            flags=flags, is_add=1)
4189        self.vapi.nat44_interface_add_del_feature(
4190            sw_if_index=self.pg1.sw_if_index,
4191            is_add=1)
4192        self.vapi.nat_ha_set_listener(ip_address=self.pg3.local_ip4,
4193                                      port=12345,
4194                                      path_mtu=512)
4195        bind_layers(UDP, HANATStateSync, sport=12345)
4196
4197        self.tcp_port_out = random.randint(1025, 65535)
4198        self.udp_port_out = random.randint(1025, 65535)
4199
4200        # send HA session add events to failover/passive
4201        p = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
4202             IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
4203             UDP(sport=12346, dport=12345) /
4204             HANATStateSync(sequence_number=1, events=[
4205                 Event(event_type='add', protocol='tcp',
4206                       in_addr=self.pg0.remote_ip4, out_addr=self.nat_addr,
4207                       in_port=self.tcp_port_in, out_port=self.tcp_port_out,
4208                       eh_addr=self.pg1.remote_ip4,
4209                       ehn_addr=self.pg1.remote_ip4,
4210                       eh_port=self.tcp_external_port,
4211                       ehn_port=self.tcp_external_port, fib_index=0),
4212                 Event(event_type='add', protocol='udp',
4213                       in_addr=self.pg0.remote_ip4, out_addr=self.nat_addr,
4214                       in_port=self.udp_port_in, out_port=self.udp_port_out,
4215                       eh_addr=self.pg1.remote_ip4,
4216                       ehn_addr=self.pg1.remote_ip4,
4217                       eh_port=self.udp_external_port,
4218                       ehn_port=self.udp_external_port, fib_index=0)]))
4219
4220        self.pg3.add_stream(p)
4221        self.pg_enable_capture(self.pg_interfaces)
4222        self.pg_start()
4223        # receive ACK
4224        capture = self.pg3.get_capture(1)
4225        p = capture[0]
4226        try:
4227            hanat = p[HANATStateSync]
4228        except IndexError:
4229            self.logger.error(ppp("Invalid packet:", p))
4230            raise
4231        else:
4232            self.assertEqual(hanat.sequence_number, 1)
4233            self.assertEqual(hanat.flags, 'ACK')
4234            self.assertEqual(hanat.version, 1)
4235            self.assertEqual(hanat.thread_index, 0)
4236        stats = self.statistics.get_counter('/nat44/ha/ack-send')
4237        self.assertEqual(stats[0][0], 1)
4238        stats = self.statistics.get_counter('/nat44/ha/add-event-recv')
4239        self.assertEqual(stats[0][0], 2)
4240        users = self.statistics.get_counter('/nat44/total-users')
4241        self.assertEqual(users[0][0], 1)
4242        sessions = self.statistics.get_counter('/nat44/total-sessions')
4243        self.assertEqual(sessions[0][0], 2)
4244        users = self.vapi.nat44_user_dump()
4245        self.assertEqual(len(users), 1)
4246        self.assertEqual(str(users[0].ip_address),
4247                         self.pg0.remote_ip4)
4248        # there should be 2 sessions created by HA
4249        sessions = self.vapi.nat44_user_session_dump(users[0].ip_address,
4250                                                     users[0].vrf_id)
4251        self.assertEqual(len(sessions), 2)
4252        for session in sessions:
4253            self.assertEqual(str(session.inside_ip_address),
4254                             self.pg0.remote_ip4)
4255            self.assertEqual(str(session.outside_ip_address),
4256                             self.nat_addr)
4257            self.assertIn(session.inside_port,
4258                          [self.tcp_port_in, self.udp_port_in])
4259            self.assertIn(session.outside_port,
4260                          [self.tcp_port_out, self.udp_port_out])
4261            self.assertIn(session.protocol, [IP_PROTOS.tcp, IP_PROTOS.udp])
4262
4263        # send HA session delete event to failover/passive
4264        p = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
4265             IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
4266             UDP(sport=12346, dport=12345) /
4267             HANATStateSync(sequence_number=2, events=[
4268                 Event(event_type='del',