test_nat.py revision f126e746
1#!/usr/bin/env python3
2
3import socket
4import unittest
5import struct
6import random
7
8from framework import VppTestCase, VppTestRunner, running_extended_tests
9
10import scapy.compat
11from scapy.layers.inet import IP, TCP, UDP, ICMP
12from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \
14    ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, fragment6
15from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
16from scapy.layers.l2 import Ether, ARP, GRE
17from scapy.data import IP_PROTOS
18from scapy.packet import bind_layers, Raw
19from util import ppp
20from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21from time import sleep
22from util import ip4_range
23from vpp_papi import mac_pton
24from syslog_rfc5424_parser import SyslogMessage, ParseError
25from syslog_rfc5424_parser.constants import SyslogFacility, SyslogSeverity
26from io import BytesIO
27from vpp_papi import VppEnum
28from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathType
29from vpp_neighbor import VppNeighbor
30from scapy.all import bind_layers, Packet, ByteEnumField, ShortField, \
31    IPField, IntField, LongField, XByteField, FlagsField, FieldLenField, \
32    PacketListField
33from ipaddress import IPv6Network
34from util import ppc, ppp
35
36
37# NAT HA protocol event data
38class Event(Packet):
39    name = "Event"
40    fields_desc = [ByteEnumField("event_type", None,
41                                 {1: "add", 2: "del", 3: "refresh"}),
42                   ByteEnumField("protocol", None,
43                                 {0: "udp", 1: "tcp", 2: "icmp"}),
44                   ShortField("flags", 0),
45                   IPField("in_addr", None),
46                   IPField("out_addr", None),
47                   ShortField("in_port", None),
48                   ShortField("out_port", None),
49                   IPField("eh_addr", None),
50                   IPField("ehn_addr", None),
51                   ShortField("eh_port", None),
52                   ShortField("ehn_port", None),
53                   IntField("fib_index", None),
54                   IntField("total_pkts", 0),
55                   LongField("total_bytes", 0)]
56
57    def extract_padding(self, s):
58        return "", s
59
60
61# NAT HA protocol header
62class HANATStateSync(Packet):
63    name = "HA NAT state sync"
64    fields_desc = [XByteField("version", 1),
65                   FlagsField("flags", 0, 8, ['ACK']),
66                   FieldLenField("count", None, count_of="events"),
67                   IntField("sequence_number", 1),
68                   IntField("thread_index", 0),
69                   PacketListField("events", [], Event,
70                                   count_from=lambda pkt: pkt.count)]
71
72
73class MethodHolder(VppTestCase):
74    """ NAT create capture and verify method holder """
75
76    @property
77    def config_flags(self):
78        return VppEnum.vl_api_nat_config_flags_t
79
80    @property
81    def SYSLOG_SEVERITY(self):
82        return VppEnum.vl_api_syslog_severity_t
83
84    def clear_nat44(self):
85        """
86        Clear NAT44 configuration.
87        """
88        if hasattr(self, 'pg7') and hasattr(self, 'pg8'):
89            if self.pg7.has_ip4_config:
90                self.pg7.unconfig_ip4()
91
92        self.vapi.nat44_forwarding_enable_disable(enable=0)
93
94        interfaces = self.vapi.nat44_interface_addr_dump()
95        for intf in interfaces:
96            self.vapi.nat44_add_del_interface_addr(
97                is_add=0,
98                sw_if_index=intf.sw_if_index,
99                flags=intf.flags)
100
101        self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
102                                           src_port=self.ipfix_src_port,
103                                           enable=0)
104        self.ipfix_src_port = 4739
105        self.ipfix_domain_id = 1
106
107        self.vapi.syslog_set_filter(
108            self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_EMERG)
109
110        self.vapi.nat_ha_set_listener(ip_address='0.0.0.0', port=0,
111                                      path_mtu=512)
112        self.vapi.nat_ha_set_failover(ip_address='0.0.0.0', port=0,
113                                      session_refresh_interval=10)
114
115        interfaces = self.vapi.nat44_interface_dump()
116        for intf in interfaces:
117            if intf.flags & self.config_flags.NAT_IS_INSIDE and \
118                    intf.flags & self.config_flags.NAT_IS_OUTSIDE:
119                self.vapi.nat44_interface_add_del_feature(
120                    sw_if_index=intf.sw_if_index)
121            self.vapi.nat44_interface_add_del_feature(
122                sw_if_index=intf.sw_if_index,
123                flags=intf.flags)
124
125        interfaces = self.vapi.nat44_interface_output_feature_dump()
126        for intf in interfaces:
127            self.vapi.nat44_interface_add_del_output_feature(
128                is_add=0,
129                flags=intf.flags,
130                sw_if_index=intf.sw_if_index)
131        static_mappings = self.vapi.nat44_static_mapping_dump()
132        for sm in static_mappings:
133            self.vapi.nat44_add_del_static_mapping(
134                is_add=0,
135                local_ip_address=sm.local_ip_address,
136                external_ip_address=sm.external_ip_address,
137                external_sw_if_index=sm.external_sw_if_index,
138                local_port=sm.local_port,
139                external_port=sm.external_port,
140                vrf_id=sm.vrf_id,
141                protocol=sm.protocol,
142                flags=sm.flags, tag=sm.tag)
143
144        lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
145        for lb_sm in lb_static_mappings:
146            self.vapi.nat44_add_del_lb_static_mapping(
147                is_add=0,
148                flags=lb_sm.flags,
149                external_addr=lb_sm.external_addr,
150                external_port=lb_sm.external_port,
151                protocol=lb_sm.protocol,
152                local_num=0, locals=[],
153                tag=lb_sm.tag)
154
155        identity_mappings = self.vapi.nat44_identity_mapping_dump()
156        for id_m in identity_mappings:
157            self.vapi.nat44_add_del_identity_mapping(
158                ip_address=id_m.ip_address,
159                sw_if_index=id_m.sw_if_index,
160                port=id_m.port,
161                flags=id_m.flags,
162                vrf_id=id_m.vrf_id,
163                protocol=id_m.protocol)
164
165        addresses = self.vapi.nat44_address_dump()
166        for addr in addresses:
167            self.vapi.nat44_add_del_address_range(
168                first_ip_address=addr.ip_address,
169                last_ip_address=addr.ip_address,
170                vrf_id=0xFFFFFFFF, flags=addr.flags)
171
172        self.verify_no_nat44_user()
173        self.vapi.nat_set_timeouts(udp=300, tcp_established=7440,
174                                   tcp_transitory=240, icmp=60)
175        self.vapi.nat_set_addr_and_port_alloc_alg()
176        self.vapi.nat_set_mss_clamping(enable=0, mss_value=1500)
177
178    def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
179                                 local_port=0, external_port=0, vrf_id=0,
180                                 is_add=1, external_sw_if_index=0xFFFFFFFF,
181                                 proto=0, tag="", flags=0):
182        """
183        Add/delete NAT44 static mapping
184
185        :param local_ip: Local IP address
186        :param external_ip: External IP address
187        :param local_port: Local port number (Optional)
188        :param external_port: External port number (Optional)
189        :param vrf_id: VRF ID (Default 0)
190        :param is_add: 1 if add, 0 if delete (Default add)
191        :param external_sw_if_index: External interface instead of IP address
192        :param proto: IP protocol (Mandatory if port specified)
193        :param tag: Opaque string tag
194        :param flags: NAT configuration flags
195        """
196
197        if not (local_port and external_port):
198            flags |= self.config_flags.NAT_IS_ADDR_ONLY
199
200        self.vapi.nat44_add_del_static_mapping(
201            is_add=is_add,
202            local_ip_address=local_ip,
203            external_ip_address=external_ip,
204            external_sw_if_index=external_sw_if_index,
205            local_port=local_port,
206            external_port=external_port,
207            vrf_id=vrf_id, protocol=proto,
208            flags=flags,
209            tag=tag)
210
211    def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
212        """
213        Add/delete NAT44 address
214
215        :param ip: IP address
216        :param is_add: 1 if add, 0 if delete (Default add)
217        :param twice_nat: twice NAT address for external hosts
218        """
219        flags = self.config_flags.NAT_IS_TWICE_NAT if twice_nat else 0
220        self.vapi.nat44_add_del_address_range(first_ip_address=ip,
221                                              last_ip_address=ip,
222                                              vrf_id=vrf_id,
223                                              is_add=is_add,
224                                              flags=flags)
225
226    def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
227        """
228        Create packet stream for inside network
229
230        :param in_if: Inside interface
231        :param out_if: Outside interface
232        :param dst_ip: Destination address
233        :param ttl: TTL of generated packets
234        """
235        if dst_ip is None:
236            dst_ip = out_if.remote_ip4
237
238        pkts = []
239        # TCP
240        p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
241             IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
242             TCP(sport=self.tcp_port_in, dport=20))
243        pkts.extend([p, p])
244
245        # UDP
246        p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
247             IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
248             UDP(sport=self.udp_port_in, dport=20))
249        pkts.append(p)
250
251        # ICMP
252        p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
253             IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
254             ICMP(id=self.icmp_id_in, type='echo-request'))
255        pkts.append(p)
256
257        return pkts
258
259    def compose_ip6(self, ip4, pref, plen):
260        """
261        Compose IPv4-embedded IPv6 addresses
262
263        :param ip4: IPv4 address
264        :param pref: IPv6 prefix
265        :param plen: IPv6 prefix length
266        :returns: IPv4-embedded IPv6 addresses
267        """
268        pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
269        ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
270        if plen == 32:
271            pref_n[4] = ip4_n[0]
272            pref_n[5] = ip4_n[1]
273            pref_n[6] = ip4_n[2]
274            pref_n[7] = ip4_n[3]
275        elif plen == 40:
276            pref_n[5] = ip4_n[0]
277            pref_n[6] = ip4_n[1]
278            pref_n[7] = ip4_n[2]
279            pref_n[9] = ip4_n[3]
280        elif plen == 48:
281            pref_n[6] = ip4_n[0]
282            pref_n[7] = ip4_n[1]
283            pref_n[9] = ip4_n[2]
284            pref_n[10] = ip4_n[3]
285        elif plen == 56:
286            pref_n[7] = ip4_n[0]
287            pref_n[9] = ip4_n[1]
288            pref_n[10] = ip4_n[2]
289            pref_n[11] = ip4_n[3]
290        elif plen == 64:
291            pref_n[9] = ip4_n[0]
292            pref_n[10] = ip4_n[1]
293            pref_n[11] = ip4_n[2]
294            pref_n[12] = ip4_n[3]
295        elif plen == 96:
296            pref_n[12] = ip4_n[0]
297            pref_n[13] = ip4_n[1]
298            pref_n[14] = ip4_n[2]
299            pref_n[15] = ip4_n[3]
300        packed_pref_n = b''.join([scapy.compat.chb(x) for x in pref_n])
301        return socket.inet_ntop(socket.AF_INET6, packed_pref_n)
302
303    def extract_ip4(self, ip6, plen):
304        """
305        Extract IPv4 address embedded in IPv6 addresses
306
307        :param ip6: IPv6 address
308        :param plen: IPv6 prefix length
309        :returns: extracted IPv4 address
310        """
311        ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6))
312        ip4_n = [None] * 4
313        if plen == 32:
314            ip4_n[0] = ip6_n[4]
315            ip4_n[1] = ip6_n[5]
316            ip4_n[2] = ip6_n[6]
317            ip4_n[3] = ip6_n[7]
318        elif plen == 40:
319            ip4_n[0] = ip6_n[5]
320            ip4_n[1] = ip6_n[6]
321            ip4_n[2] = ip6_n[7]
322            ip4_n[3] = ip6_n[9]
323        elif plen == 48:
324            ip4_n[0] = ip6_n[6]
325            ip4_n[1] = ip6_n[7]
326            ip4_n[2] = ip6_n[9]
327            ip4_n[3] = ip6_n[10]
328        elif plen == 56:
329            ip4_n[0] = ip6_n[7]
330            ip4_n[1] = ip6_n[9]
331            ip4_n[2] = ip6_n[10]
332            ip4_n[3] = ip6_n[11]
333        elif plen == 64:
334            ip4_n[0] = ip6_n[9]
335            ip4_n[1] = ip6_n[10]
336            ip4_n[2] = ip6_n[11]
337            ip4_n[3] = ip6_n[12]
338        elif plen == 96:
339            ip4_n[0] = ip6_n[12]
340            ip4_n[1] = ip6_n[13]
341            ip4_n[2] = ip6_n[14]
342            ip4_n[3] = ip6_n[15]
343        return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n))
344
345    def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
346        """
347        Create IPv6 packet stream for inside network
348
349        :param in_if: Inside interface
350        :param out_if: Outside interface
351        :param ttl: Hop Limit of generated packets
352        :param pref: NAT64 prefix
353        :param plen: NAT64 prefix length
354        """
355        pkts = []
356        if pref is None:
357            dst = ''.join(['64:ff9b::', out_if.remote_ip4])
358        else:
359            dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
360
361        # TCP
362        p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
363             IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
364             TCP(sport=self.tcp_port_in, dport=20))
365        pkts.append(p)
366
367        # UDP
368        p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
369             IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
370             UDP(sport=self.udp_port_in, dport=20))
371        pkts.append(p)
372
373        # ICMP
374        p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
375             IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
376             ICMPv6EchoRequest(id=self.icmp_id_in))
377        pkts.append(p)
378
379        return pkts
380
381    def create_stream_out(self, out_if, dst_ip=None, ttl=64,
382                          use_inside_ports=False):
383        """
384        Create packet stream for outside network
385
386        :param out_if: Outside interface
387        :param dst_ip: Destination IP address (Default use global NAT address)
388        :param ttl: TTL of generated packets
389        :param use_inside_ports: Use inside NAT ports as destination ports
390               instead of outside ports
391        """
392        if dst_ip is None:
393            dst_ip = self.nat_addr
394        if not use_inside_ports:
395            tcp_port = self.tcp_port_out
396            udp_port = self.udp_port_out
397            icmp_id = self.icmp_id_out
398        else:
399            tcp_port = self.tcp_port_in
400            udp_port = self.udp_port_in
401            icmp_id = self.icmp_id_in
402        pkts = []
403        # TCP
404        p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
405             IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
406             TCP(dport=tcp_port, sport=20))
407        pkts.extend([p, p])
408
409        # UDP
410        p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
411             IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
412             UDP(dport=udp_port, sport=20))
413        pkts.append(p)
414
415        # ICMP
416        p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
417             IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
418             ICMP(id=icmp_id, type='echo-reply'))
419        pkts.append(p)
420
421        return pkts
422
423    def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
424        """
425        Create packet stream for outside network
426
427        :param out_if: Outside interface
428        :param dst_ip: Destination IP address (Default use global NAT address)
429        :param hl: HL of generated packets
430        """
431        pkts = []
432        # TCP
433        p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
434             IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
435             TCP(dport=self.tcp_port_out, sport=20))
436        pkts.append(p)
437
438        # UDP
439        p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
440             IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
441             UDP(dport=self.udp_port_out, sport=20))
442        pkts.append(p)
443
444        # ICMP
445        p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
446             IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
447             ICMPv6EchoReply(id=self.icmp_id_out))
448        pkts.append(p)
449
450        return pkts
451
452    def verify_capture_out(self, capture, nat_ip=None, same_port=False,
453                           dst_ip=None, is_ip6=False):
454        """
455        Verify captured packets on outside network
456
457        :param capture: Captured packets
458        :param nat_ip: Translated IP address (Default use global NAT address)
459        :param same_port: Source port number is not translated (Default False)
460        :param dst_ip: Destination IP address (Default do not verify)
461        :param is_ip6: If L3 protocol is IPv6 (Default False)
462        """
463        if is_ip6:
464            IP46 = IPv6
465            ICMP46 = ICMPv6EchoRequest
466        else:
467            IP46 = IP
468            ICMP46 = ICMP
469        if nat_ip is None:
470            nat_ip = self.nat_addr
471        for packet in capture:
472            try:
473                if not is_ip6:
474                    self.assert_packet_checksums_valid(packet)
475                self.assertEqual(packet[IP46].src, nat_ip)
476                if dst_ip is not None:
477                    self.assertEqual(packet[IP46].dst, dst_ip)
478                if packet.haslayer(TCP):
479                    if same_port:
480                        self.assertEqual(packet[TCP].sport, self.tcp_port_in)
481                    else:
482                        self.assertNotEqual(
483                            packet[TCP].sport, self.tcp_port_in)
484                    self.tcp_port_out = packet[TCP].sport
485                    self.assert_packet_checksums_valid(packet)
486                elif packet.haslayer(UDP):
487                    if same_port:
488                        self.assertEqual(packet[UDP].sport, self.udp_port_in)
489                    else:
490                        self.assertNotEqual(
491                            packet[UDP].sport, self.udp_port_in)
492                    self.udp_port_out = packet[UDP].sport
493                else:
494                    if same_port:
495                        self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
496                    else:
497                        self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
498                    self.icmp_id_out = packet[ICMP46].id
499                    self.assert_packet_checksums_valid(packet)
500            except:
501                self.logger.error(ppp("Unexpected or invalid packet "
502                                      "(outside network):", packet))
503                raise
504
505    def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
506                               dst_ip=None):
507        """
508        Verify captured packets on outside network
509
510        :param capture: Captured packets
511        :param nat_ip: Translated IP address
512        :param same_port: Source port number is not translated (Default False)
513        :param dst_ip: Destination IP address (Default do not verify)
514        """
515        return self.verify_capture_out(capture, nat_ip, same_port, dst_ip,
516                                       True)
517
518    def verify_capture_in(self, capture, in_if):
519        """
520        Verify captured packets on inside network
521
522        :param capture: Captured packets
523        :param in_if: Inside interface
524        """
525        for packet in capture:
526            try:
527                self.assert_packet_checksums_valid(packet)
528                self.assertEqual(packet[IP].dst, in_if.remote_ip4)
529                if packet.haslayer(TCP):
530                    self.assertEqual(packet[TCP].dport, self.tcp_port_in)
531                elif packet.haslayer(UDP):
532                    self.assertEqual(packet[UDP].dport, self.udp_port_in)
533                else:
534                    self.assertEqual(packet[ICMP].id, self.icmp_id_in)
535            except:
536                self.logger.error(ppp("Unexpected or invalid packet "
537                                      "(inside network):", packet))
538                raise
539
540    def verify_capture_in_ip6(self, capture, src_ip, dst_ip):
541        """
542        Verify captured IPv6 packets on inside network
543
544        :param capture: Captured packets
545        :param src_ip: Source IP
546        :param dst_ip: Destination IP address
547        """
548        for packet in capture:
549            try:
550                self.assertEqual(packet[IPv6].src, src_ip)
551                self.assertEqual(packet[IPv6].dst, dst_ip)
552                self.assert_packet_checksums_valid(packet)
553                if packet.haslayer(TCP):
554                    self.assertEqual(packet[TCP].dport, self.tcp_port_in)
555                elif packet.haslayer(UDP):
556                    self.assertEqual(packet[UDP].dport, self.udp_port_in)
557                else:
558                    self.assertEqual(packet[ICMPv6EchoReply].id,
559                                     self.icmp_id_in)
560            except:
561                self.logger.error(ppp("Unexpected or invalid packet "
562                                      "(inside network):", packet))
563                raise
564
565    def verify_capture_no_translation(self, capture, ingress_if, egress_if):
566        """
567        Verify captured packet that don't have to be translated
568
569        :param capture: Captured packets
570        :param ingress_if: Ingress interface
571        :param egress_if: Egress interface
572        """
573        for packet in capture:
574            try:
575                self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
576                self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
577                if packet.haslayer(TCP):
578                    self.assertEqual(packet[TCP].sport, self.tcp_port_in)
579                elif packet.haslayer(UDP):
580                    self.assertEqual(packet[UDP].sport, self.udp_port_in)
581                else:
582                    self.assertEqual(packet[ICMP].id, self.icmp_id_in)
583            except:
584                self.logger.error(ppp("Unexpected or invalid packet "
585                                      "(inside network):", packet))
586                raise
587
588    def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
589                                            icmp_type=11):
590        """
591        Verify captured packets with ICMP errors on outside network
592
593        :param capture: Captured packets
594        :param src_ip: Translated IP address or IP address of VPP
595                       (Default use global NAT address)
596        :param icmp_type: Type of error ICMP packet
597                          we are expecting (Default 11)
598        """
599        if src_ip is None:
600            src_ip = self.nat_addr
601        for packet in capture:
602            try:
603                self.assertEqual(packet[IP].src, src_ip)
604                self.assertEqual(packet.haslayer(ICMP), 1)
605                icmp = packet[ICMP]
606                self.assertEqual(icmp.type, icmp_type)
607                self.assertTrue(icmp.haslayer(IPerror))
608                inner_ip = icmp[IPerror]
609                if inner_ip.haslayer(TCPerror):
610                    self.assertEqual(inner_ip[TCPerror].dport,
611                                     self.tcp_port_out)
612                elif inner_ip.haslayer(UDPerror):
613                    self.assertEqual(inner_ip[UDPerror].dport,
614                                     self.udp_port_out)
615                else:
616                    self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
617            except:
618                self.logger.error(ppp("Unexpected or invalid packet "
619                                      "(outside network):", packet))
620                raise
621
622    def verify_capture_in_with_icmp_errors(self, capture, in_if, icmp_type=11):
623        """
624        Verify captured packets with ICMP errors on inside network
625
626        :param capture: Captured packets
627        :param in_if: Inside interface
628        :param icmp_type: Type of error ICMP packet
629                          we are expecting (Default 11)
630        """
631        for packet in capture:
632            try:
633                self.assertEqual(packet[IP].dst, in_if.remote_ip4)
634                self.assertEqual(packet.haslayer(ICMP), 1)
635                icmp = packet[ICMP]
636                self.assertEqual(icmp.type, icmp_type)
637                self.assertTrue(icmp.haslayer(IPerror))
638                inner_ip = icmp[IPerror]
639                if inner_ip.haslayer(TCPerror):
640                    self.assertEqual(inner_ip[TCPerror].sport,
641                                     self.tcp_port_in)
642                elif inner_ip.haslayer(UDPerror):
643                    self.assertEqual(inner_ip[UDPerror].sport,
644                                     self.udp_port_in)
645                else:
646                    self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
647            except:
648                self.logger.error(ppp("Unexpected or invalid packet "
649                                      "(inside network):", packet))
650                raise
651
652    def create_stream_frag(self, src_if, dst, sport, dport, data,
653                           proto=IP_PROTOS.tcp, echo_reply=False):
654        """
655        Create fragmented packet stream
656
657        :param src_if: Source interface
658        :param dst: Destination IPv4 address
659        :param sport: Source port
660        :param dport: Destination port
661        :param data: Payload data
662        :param proto: protocol (TCP, UDP, ICMP)
663        :param echo_reply: use echo_reply if protocol is ICMP
664        :returns: Fragments
665        """
666        if proto == IP_PROTOS.tcp:
667            p = (IP(src=src_if.remote_ip4, dst=dst) /
668                 TCP(sport=sport, dport=dport) /
669                 Raw(data))
670            p = p.__class__(scapy.compat.raw(p))
671            chksum = p[TCP].chksum
672            proto_header = TCP(sport=sport, dport=dport, chksum=chksum)
673        elif proto == IP_PROTOS.udp:
674            proto_header = UDP(sport=sport, dport=dport)
675        elif proto == IP_PROTOS.icmp:
676            if not echo_reply:
677                proto_header = ICMP(id=sport, type='echo-request')
678            else:
679                proto_header = ICMP(id=sport, type='echo-reply')
680        else:
681            raise Exception("Unsupported protocol")
682        id = random.randint(0, 65535)
683        pkts = []
684        if proto == IP_PROTOS.tcp:
685            raw = Raw(data[0:4])
686        else:
687            raw = Raw(data[0:16])
688        p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
689             IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
690             proto_header /
691             raw)
692        pkts.append(p)
693        if proto == IP_PROTOS.tcp:
694            raw = Raw(data[4:20])
695        else:
696            raw = Raw(data[16:32])
697        p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
698             IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
699                proto=proto) /
700             raw)
701        pkts.append(p)
702        if proto == IP_PROTOS.tcp:
703            raw = Raw(data[20:])
704        else:
705            raw = Raw(data[32:])
706        p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
707             IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto,
708                id=id) /
709             raw)
710        pkts.append(p)
711        return pkts
712
713    def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
714                               pref=None, plen=0, frag_size=128):
715        """
716        Create fragmented packet stream
717
718        :param src_if: Source interface
719        :param dst: Destination IPv4 address
720        :param sport: Source TCP port
721        :param dport: Destination TCP port
722        :param data: Payload data
723        :param pref: NAT64 prefix
724        :param plen: NAT64 prefix length
725        :param fragsize: size of fragments
726        :returns: Fragments
727        """
728        if pref is None:
729            dst_ip6 = ''.join(['64:ff9b::', dst])
730        else:
731            dst_ip6 = self.compose_ip6(dst, pref, plen)
732
733        p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
734             IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
735             IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
736             TCP(sport=sport, dport=dport) /
737             Raw(data))
738
739        return fragment6(p, frag_size)
740
741    def reass_frags_and_verify(self, frags, src, dst):
742        """
743        Reassemble and verify fragmented packet
744
745        :param frags: Captured fragments
746        :param src: Source IPv4 address to verify
747        :param dst: Destination IPv4 address to verify
748
749        :returns: Reassembled IPv4 packet
750        """
751        buffer = BytesIO()
752        for p in frags:
753            self.assertEqual(p[IP].src, src)
754            self.assertEqual(p[IP].dst, dst)
755            self.assert_ip_checksum_valid(p)
756            buffer.seek(p[IP].frag * 8)
757            buffer.write(bytes(p[IP].payload))
758        ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
759                proto=frags[0][IP].proto)
760        if ip.proto == IP_PROTOS.tcp:
761            p = (ip / TCP(buffer.getvalue()))
762            self.logger.debug(ppp("Reassembled:", p))
763            self.assert_tcp_checksum_valid(p)
764        elif ip.proto == IP_PROTOS.udp:
765            p = (ip / UDP(buffer.getvalue()[:8]) /
766                 Raw(buffer.getvalue()[8:]))
767        elif ip.proto == IP_PROTOS.icmp:
768            p = (ip / ICMP(buffer.getvalue()))
769        return p
770
771    def reass_frags_and_verify_ip6(self, frags, src, dst):
772        """
773        Reassemble and verify fragmented packet
774
775        :param frags: Captured fragments
776        :param src: Source IPv6 address to verify
777        :param dst: Destination IPv6 address to verify
778
779        :returns: Reassembled IPv6 packet
780        """
781        buffer = BytesIO()
782        for p in frags:
783            self.assertEqual(p[IPv6].src, src)
784            self.assertEqual(p[IPv6].dst, dst)
785            buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
786            buffer.write(bytes(p[IPv6ExtHdrFragment].payload))
787        ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
788                  nh=frags[0][IPv6ExtHdrFragment].nh)
789        if ip.nh == IP_PROTOS.tcp:
790            p = (ip / TCP(buffer.getvalue()))
791        elif ip.nh == IP_PROTOS.udp:
792            p = (ip / UDP(buffer.getvalue()))
793        self.logger.debug(ppp("Reassembled:", p))
794        self.assert_packet_checksums_valid(p)
795        return p
796
797    def initiate_tcp_session(self, in_if, out_if):
798        """
799        Initiates TCP session
800
801        :param in_if: Inside interface
802        :param out_if: Outside interface
803        """
804        try:
805            # SYN packet in->out
806            p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
807                 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
808                 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
809                     flags="S"))
810            in_if.add_stream(p)
811            self.pg_enable_capture(self.pg_interfaces)
812            self.pg_start()
813            capture = out_if.get_capture(1)
814            p = capture[0]
815            self.tcp_port_out = p[TCP].sport
816
817            # SYN + ACK packet out->in
818            p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
819                 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
820                 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
821                     flags="SA"))
822            out_if.add_stream(p)
823            self.pg_enable_capture(self.pg_interfaces)
824            self.pg_start()
825            in_if.get_capture(1)
826
827            # ACK packet in->out
828            p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
829                 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
830                 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
831                     flags="A"))
832            in_if.add_stream(p)
833            self.pg_enable_capture(self.pg_interfaces)
834            self.pg_start()
835            out_if.get_capture(1)
836
837        except:
838            self.logger.error("TCP 3 way handshake failed")
839            raise
840
841    def verify_ipfix_nat44_ses(self, data):
842        """
843        Verify IPFIX NAT44 session create/delete event
844
845        :param data: Decoded IPFIX data records
846        """
847        nat44_ses_create_num = 0
848        nat44_ses_delete_num = 0
849        self.assertEqual(6, len(data))
850        for record in data:
851            # natEvent
852            self.assertIn(scapy.compat.orb(record[230]), [4, 5])
853            if scapy.compat.orb(record[230]) == 4:
854                nat44_ses_create_num += 1
855            else:
856                nat44_ses_delete_num += 1
857            # sourceIPv4Address
858            self.assertEqual(self.pg0.remote_ip4n, record[8])
859            # postNATSourceIPv4Address
860            self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
861                             record[225])
862            # ingressVRFID
863            self.assertEqual(struct.pack("!I", 0), record[234])
864            # protocolIdentifier/sourceTransportPort
865            # /postNAPTSourceTransportPort
866            if IP_PROTOS.icmp == scapy.compat.orb(record[4]):
867                self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
868                self.assertEqual(struct.pack("!H", self.icmp_id_out),
869                                 record[227])
870            elif IP_PROTOS.tcp == scapy.compat.orb(record[4]):
871                self.assertEqual(struct.pack("!H", self.tcp_port_in),
872                                 record[7])
873                self.assertEqual(struct.pack("!H", self.tcp_port_out),
874                                 record[227])
875            elif IP_PROTOS.udp == scapy.compat.orb(record[4]):
876                self.assertEqual(struct.pack("!H", self.udp_port_in),
877                                 record[7])
878                self.assertEqual(struct.pack("!H", self.udp_port_out),
879                                 record[227])
880            else:
881                self.fail("Invalid protocol")
882        self.assertEqual(3, nat44_ses_create_num)
883        self.assertEqual(3, nat44_ses_delete_num)
884
885    def verify_ipfix_addr_exhausted(self, data):
886        """
887        Verify IPFIX NAT addresses event
888
889        :param data: Decoded IPFIX data records
890        """
891        self.assertEqual(1, len(data))
892        record = data[0]
893        # natEvent
894        self.assertEqual(scapy.compat.orb(record[230]), 3)
895        # natPoolID
896        self.assertEqual(struct.pack("!I", 0), record[283])
897
898    def verify_ipfix_max_sessions(self, data, limit):
899        """
900        Verify IPFIX maximum session entries exceeded event
901
902        :param data: Decoded IPFIX data records
903        :param limit: Number of maximum session entries that can be created.
904        """
905        self.assertEqual(1, len(data))
906        record = data[0]
907        # natEvent
908        self.assertEqual(scapy.compat.orb(record[230]), 13)
909        # natQuotaExceededEvent
910        self.assertEqual(struct.pack("I", 1), record[466])
911        # maxSessionEntries
912        self.assertEqual(struct.pack("I", limit), record[471])
913
914    def verify_ipfix_max_bibs(self, data, limit):
915        """
916        Verify IPFIX maximum BIB entries exceeded event
917
918        :param data: Decoded IPFIX data records
919        :param limit: Number of maximum BIB entries that can be created.
920        """
921        self.assertEqual(1, len(data))
922        record = data[0]
923        # natEvent
924        self.assertEqual(scapy.compat.orb(record[230]), 13)
925        # natQuotaExceededEvent
926        self.assertEqual(struct.pack("I", 2), record[466])
927        # maxBIBEntries
928        self.assertEqual(struct.pack("I", limit), record[472])
929
930    def verify_ipfix_max_fragments_ip6(self, data, limit, src_addr):
931        """
932        Verify IPFIX maximum IPv6 fragments pending reassembly exceeded event
933
934        :param data: Decoded IPFIX data records
935        :param limit: Number of maximum fragments pending reassembly
936        :param src_addr: IPv6 source address
937        """
938        self.assertEqual(1, len(data))
939        record = data[0]
940        # natEvent
941        self.assertEqual(scapy.compat.orb(record[230]), 13)
942        # natQuotaExceededEvent
943        self.assertEqual(struct.pack("I", 5), record[466])
944        # maxFragmentsPendingReassembly
945        self.assertEqual(struct.pack("I", limit), record[475])
946        # sourceIPv6Address
947        self.assertEqual(src_addr, record[27])
948
949    def verify_ipfix_max_fragments_ip4(self, data, limit, src_addr):
950        """
951        Verify IPFIX maximum IPv4 fragments pending reassembly exceeded event
952
953        :param data: Decoded IPFIX data records
954        :param limit: Number of maximum fragments pending reassembly
955        :param src_addr: IPv4 source address
956        """
957        self.assertEqual(1, len(data))
958        record = data[0]
959        # natEvent
960        self.assertEqual(scapy.compat.orb(record[230]), 13)
961        # natQuotaExceededEvent
962        self.assertEqual(struct.pack("I", 5), record[466])
963        # maxFragmentsPendingReassembly
964        self.assertEqual(struct.pack("I", limit), record[475])
965        # sourceIPv4Address
966        self.assertEqual(src_addr, record[8])
967
968    def verify_ipfix_bib(self, data, is_create, src_addr):
969        """
970        Verify IPFIX NAT64 BIB create and delete events
971
972        :param data: Decoded IPFIX data records
973        :param is_create: Create event if nonzero value otherwise delete event
974        :param src_addr: IPv6 source address
975        """
976        self.assertEqual(1, len(data))
977        record = data[0]
978        # natEvent
979        if is_create:
980            self.assertEqual(scapy.compat.orb(record[230]), 10)
981        else:
982            self.assertEqual(scapy.compat.orb(record[230]), 11)
983        # sourceIPv6Address
984        self.assertEqual(src_addr, record[27])
985        # postNATSourceIPv4Address
986        self.assertEqual(self.nat_addr_n, record[225])
987        # protocolIdentifier
988        self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4]))
989        # ingressVRFID
990        self.assertEqual(struct.pack("!I", 0), record[234])
991        # sourceTransportPort
992        self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
993        # postNAPTSourceTransportPort
994        self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
995
996    def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
997                               dst_port):
998        """
999        Verify IPFIX NAT64 session create and delete events
1000
1001        :param data: Decoded IPFIX data records
1002        :param is_create: Create event if nonzero value otherwise delete event
1003        :param src_addr: IPv6 source address
1004        :param dst_addr: IPv4 destination address
1005        :param dst_port: destination TCP port
1006        """
1007        self.assertEqual(1, len(data))
1008        record = data[0]
1009        # natEvent
1010        if is_create:
1011            self.assertEqual(scapy.compat.orb(record[230]), 6)
1012        else:
1013            self.assertEqual(scapy.compat.orb(record[230]), 7)
1014        # sourceIPv6Address
1015        self.assertEqual(src_addr, record[27])
1016        # destinationIPv6Address
1017        self.assertEqual(socket.inet_pton(socket.AF_INET6,
1018                                          self.compose_ip6(dst_addr,
1019                                                           '64:ff9b::',
1020                                                           96)),
1021                         record[28])
1022        # postNATSourceIPv4Address
1023        self.assertEqual(self.nat_addr_n, record[225])
1024        # postNATDestinationIPv4Address
1025        self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
1026                         record[226])
1027        # protocolIdentifier
1028        self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4]))
1029        # ingressVRFID
1030        self.assertEqual(struct.pack("!I", 0), record[234])
1031        # sourceTransportPort
1032        self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
1033        # postNAPTSourceTransportPort
1034        self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
1035        # destinationTransportPort
1036        self.assertEqual(struct.pack("!H", dst_port), record[11])
1037        # postNAPTDestinationTransportPort
1038        self.assertEqual(struct.pack("!H", dst_port), record[228])
1039
1040    def verify_no_nat44_user(self):
1041        """ Verify that there is no NAT44 user """
1042        users = self.vapi.nat44_user_dump()
1043        self.assertEqual(len(users), 0)
1044        users = self.statistics.get_counter('/nat44/total-users')
1045        self.assertEqual(users[0][0], 0)
1046        sessions = self.statistics.get_counter('/nat44/total-sessions')
1047        self.assertEqual(sessions[0][0], 0)
1048
1049    def verify_ipfix_max_entries_per_user(self, data, limit, src_addr):
1050        """
1051        Verify IPFIX maximum entries per user exceeded event
1052
1053        :param data: Decoded IPFIX data records
1054        :param limit: Number of maximum entries per user
1055        :param src_addr: IPv4 source address
1056        """
1057        self.assertEqual(1, len(data))
1058        record = data[0]
1059        # natEvent
1060        self.assertEqual(scapy.compat.orb(record[230]), 13)
1061        # natQuotaExceededEvent
1062        self.assertEqual(struct.pack("I", 3), record[466])
1063        # maxEntriesPerUser
1064        self.assertEqual(struct.pack("I", limit), record[473])
1065        # sourceIPv4Address
1066        self.assertEqual(socket.inet_pton(socket.AF_INET, src_addr), record[8])
1067
1068    def verify_syslog_apmap(self, data, is_add=True):
1069        message = data.decode('utf-8')
1070        try:
1071            message = SyslogMessage.parse(message)
1072        except ParseError as e:
1073            self.logger.error(e)
1074            raise
1075        else:
1076            self.assertEqual(message.severity, SyslogSeverity.info)
1077            self.assertEqual(message.appname, 'NAT')
1078            self.assertEqual(message.msgid, 'APMADD' if is_add else 'APMDEL')
1079            sd_params = message.sd.get('napmap')
1080            self.assertTrue(sd_params is not None)
1081            self.assertEqual(sd_params.get('IATYP'), 'IPv4')
1082            self.assertEqual(sd_params.get('ISADDR'), self.pg0.remote_ip4)
1083            self.assertEqual(sd_params.get('ISPORT'), "%d" % self.tcp_port_in)
1084            self.assertEqual(sd_params.get('XATYP'), 'IPv4')
1085            self.assertEqual(sd_params.get('XSADDR'), self.nat_addr)
1086            self.assertEqual(sd_params.get('XSPORT'), "%d" % self.tcp_port_out)
1087            self.assertEqual(sd_params.get('PROTO'), "%d" % IP_PROTOS.tcp)
1088            self.assertTrue(sd_params.get('SSUBIX') is not None)
1089            self.assertEqual(sd_params.get('SVLAN'), '0')
1090
1091    def verify_syslog_sess(self, data, is_add=True, is_ip6=False):
1092        message = data.decode('utf-8')
1093        try:
1094            message = SyslogMessage.parse(message)
1095        except ParseError as e:
1096            self.logger.error(e)
1097            raise
1098        else:
1099            self.assertEqual(message.severity, SyslogSeverity.info)
1100            self.assertEqual(message.appname, 'NAT')
1101            self.assertEqual(message.msgid, 'SADD' if is_add else 'SDEL')
1102            sd_params = message.sd.get('nsess')
1103            self.assertTrue(sd_params is not None)
1104            if is_ip6:
1105                self.assertEqual(sd_params.get('IATYP'), 'IPv6')
1106                self.assertEqual(sd_params.get('ISADDR'), self.pg0.remote_ip6)
1107            else:
1108                self.assertEqual(sd_params.get('IATYP'), 'IPv4')
1109                self.assertEqual(sd_params.get('ISADDR'), self.pg0.remote_ip4)
1110                self.assertTrue(sd_params.get('SSUBIX') is not None)
1111            self.assertEqual(sd_params.get('ISPORT'), "%d" % self.tcp_port_in)
1112            self.assertEqual(sd_params.get('XATYP'), 'IPv4')
1113            self.assertEqual(sd_params.get('XSADDR'), self.nat_addr)
1114            self.assertEqual(sd_params.get('XSPORT'), "%d" % self.tcp_port_out)
1115            self.assertEqual(sd_params.get('PROTO'), "%d" % IP_PROTOS.tcp)
1116            self.assertEqual(sd_params.get('SVLAN'), '0')
1117            self.assertEqual(sd_params.get('XDADDR'), self.pg1.remote_ip4)
1118            self.assertEqual(sd_params.get('XDPORT'),
1119                             "%d" % self.tcp_external_port)
1120
1121    def verify_mss_value(self, pkt, mss):
1122        """
1123        Verify TCP MSS value
1124
1125        :param pkt:
1126        :param mss:
1127        """
1128        if not pkt.haslayer(IP) or not pkt.haslayer(TCP):
1129            raise TypeError("Not a TCP/IP packet")
1130
1131        for option in pkt[TCP].options:
1132            if option[0] == 'MSS':
1133                self.assertEqual(option[1], mss)
1134                self.assert_tcp_checksum_valid(pkt)
1135
1136    @staticmethod
1137    def proto2layer(proto):
1138        if proto == IP_PROTOS.tcp:
1139            return TCP
1140        elif proto == IP_PROTOS.udp:
1141            return UDP
1142        elif proto == IP_PROTOS.icmp:
1143            return ICMP
1144        else:
1145            raise Exception("Unsupported protocol")
1146
1147    def frag_in_order(self, proto=IP_PROTOS.tcp, dont_translate=False):
1148        layer = self.proto2layer(proto)
1149
1150        if proto == IP_PROTOS.tcp:
1151            data = b"A" * 4 + b"B" * 16 + b"C" * 3
1152        else:
1153            data = b"A" * 16 + b"B" * 16 + b"C" * 3
1154        self.port_in = random.randint(1025, 65535)
1155
1156        # in2out
1157        pkts = self.create_stream_frag(self.pg0,
1158                                       self.pg1.remote_ip4,
1159                                       self.port_in,
1160                                       20,
1161                                       data,
1162                                       proto)
1163        self.pg0.add_stream(pkts)
1164        self.pg_enable_capture(self.pg_interfaces)
1165        self.pg_start()
1166        frags = self.pg1.get_capture(len(pkts))
1167        if not dont_translate:
1168            p = self.reass_frags_and_verify(frags,
1169                                            self.nat_addr,
1170                                            self.pg1.remote_ip4)
1171        else:
1172            p = self.reass_frags_and_verify(frags,
1173                                            self.pg0.remote_ip4,
1174                                            self.pg1.remote_ip4)
1175        if proto != IP_PROTOS.icmp:
1176            if not dont_translate:
1177                self.assertEqual(p[layer].dport, 20)
1178                self.assertNotEqual(p[layer].sport, self.port_in)
1179            else:
1180                self.assertEqual(p[layer].sport, self.port_in)
1181        else:
1182            if not dont_translate:
1183                self.assertNotEqual(p[layer].id, self.port_in)
1184            else:
1185                self.assertEqual(p[layer].id, self.port_in)
1186        self.assertEqual(data, p[Raw].load)
1187
1188        # out2in
1189        if not dont_translate:
1190            dst_addr = self.nat_addr
1191        else:
1192            dst_addr = self.pg0.remote_ip4
1193        if proto != IP_PROTOS.icmp:
1194            sport = 20
1195            dport = p[layer].sport
1196        else:
1197            sport = p[layer].id
1198            dport = 0
1199        pkts = self.create_stream_frag(self.pg1,
1200                                       dst_addr,
1201                                       sport,
1202                                       dport,
1203                                       data,
1204                                       proto,
1205                                       echo_reply=True)
1206        self.pg1.add_stream(pkts)
1207        self.pg_enable_capture(self.pg_interfaces)
1208        self.pg_start()
1209        frags = self.pg0.get_capture(len(pkts))
1210        p = self.reass_frags_and_verify(frags,
1211                                        self.pg1.remote_ip4,
1212                                        self.pg0.remote_ip4)
1213        if proto != IP_PROTOS.icmp:
1214            self.assertEqual(p[layer].sport, 20)
1215            self.assertEqual(p[layer].dport, self.port_in)
1216        else:
1217            self.assertEqual(p[layer].id, self.port_in)
1218        self.assertEqual(data, p[Raw].load)
1219
1220    def frag_in_order_in_plus_out(self, proto=IP_PROTOS.tcp):
1221        layer = self.proto2layer(proto)
1222
1223        if proto == IP_PROTOS.tcp:
1224            data = b"A" * 4 + b"B" * 16 + b"C" * 3
1225        else:
1226            data = b"A" * 16 + b"B" * 16 + b"C" * 3
1227        self.port_in = random.randint(1025, 65535)
1228
1229        for i in range(2):
1230            # out2in
1231            pkts = self.create_stream_frag(self.pg0,
1232                                           self.server_out_addr,
1233                                           self.port_in,
1234                                           self.server_out_port,
1235                                           data,
1236                                           proto)
1237            self.pg0.add_stream(pkts)
1238            self.pg_enable_capture(self.pg_interfaces)
1239            self.pg_start()
1240            frags = self.pg1.get_capture(len(pkts))
1241            p = self.reass_frags_and_verify(frags,
1242                                            self.pg0.remote_ip4,
1243                                            self.server_in_addr)
1244            if proto != IP_PROTOS.icmp:
1245                self.assertEqual(p[layer].sport, self.port_in)
1246                self.assertEqual(p[layer].dport, self.server_in_port)
1247            else:
1248                self.assertEqual(p[layer].id, self.port_in)
1249            self.assertEqual(data, p[Raw].load)
1250
1251            # in2out
1252            if proto != IP_PROTOS.icmp:
1253                pkts = self.create_stream_frag(self.pg1,
1254                                               self.pg0.remote_ip4,
1255                                               self.server_in_port,
1256                                               p[layer].sport,
1257                                               data,
1258                                               proto)
1259            else:
1260                pkts = self.create_stream_frag(self.pg1,
1261                                               self.pg0.remote_ip4,
1262                                               p[layer].id,
1263                                               0,
1264                                               data,
1265                                               proto,
1266                                               echo_reply=True)
1267            self.pg1.add_stream(pkts)
1268            self.pg_enable_capture(self.pg_interfaces)
1269            self.pg_start()
1270            frags = self.pg0.get_capture(len(pkts))
1271            p = self.reass_frags_and_verify(frags,
1272                                            self.server_out_addr,
1273                                            self.pg0.remote_ip4)
1274            if proto != IP_PROTOS.icmp:
1275                self.assertEqual(p[layer].sport, self.server_out_port)
1276                self.assertEqual(p[layer].dport, self.port_in)
1277            else:
1278                self.assertEqual(p[layer].id, self.port_in)
1279            self.assertEqual(data, p[Raw].load)
1280
1281    def reass_hairpinning(self, proto=IP_PROTOS.tcp):
1282        layer = self.proto2layer(proto)
1283
1284        if proto == IP_PROTOS.tcp:
1285            data = b"A" * 4 + b"B" * 16 + b"C" * 3
1286        else:
1287            data = b"A" * 16 + b"B" * 16 + b"C" * 3
1288
1289        # send packet from host to server
1290        pkts = self.create_stream_frag(self.pg0,
1291                                       self.nat_addr,
1292                                       self.host_in_port,
1293                                       self.server_out_port,
1294                                       data,
1295                                       proto)
1296        self.pg0.add_stream(pkts)
1297        self.pg_enable_capture(self.pg_interfaces)
1298        self.pg_start()
1299        frags = self.pg0.get_capture(len(pkts))
1300        p = self.reass_frags_and_verify(frags,
1301                                        self.nat_addr,
1302                                        self.server.ip4)
1303        if proto != IP_PROTOS.icmp:
1304            self.assertNotEqual(p[layer].sport, self.host_in_port)
1305            self.assertEqual(p[layer].dport, self.server_in_port)
1306        else:
1307            self.assertNotEqual(p[layer].id, self.host_in_port)
1308        self.assertEqual(data, p[Raw].load)
1309
1310    def frag_out_of_order(self, proto=IP_PROTOS.tcp, dont_translate=False):
1311        layer = self.proto2layer(proto)
1312
1313        if proto == IP_PROTOS.tcp:
1314            data = b"A" * 4 + b"B" * 16 + b"C" * 3
1315        else:
1316            data = b"A" * 16 + b"B" * 16 + b"C" * 3
1317        self.port_in = random.randint(1025, 65535)
1318
1319        for i in range(2):
1320            # in2out
1321            pkts = self.create_stream_frag(self.pg0,
1322                                           self.pg1.remote_ip4,
1323                                           self.port_in,
1324                                           20,
1325                                           data,
1326                                           proto)
1327            pkts.reverse()
1328            self.pg0.add_stream(pkts)
1329            self.pg_enable_capture(self.pg_interfaces)
1330            self.pg_start()
1331            frags = self.pg1.get_capture(len(pkts))
1332            if not dont_translate:
1333                p = self.reass_frags_and_verify(frags,
1334                                                self.nat_addr,
1335                                                self.pg1.remote_ip4)
1336            else:
1337                p = self.reass_frags_and_verify(frags,
1338                                                self.pg0.remote_ip4,
1339                                                self.pg1.remote_ip4)
1340            if proto != IP_PROTOS.icmp:
1341                if not dont_translate:
1342                    self.assertEqual(p[layer].dport, 20)
1343                    self.assertNotEqual(p[layer].sport, self.port_in)
1344                else:
1345                    self.assertEqual(p[layer].sport, self.port_in)
1346            else:
1347                if not dont_translate:
1348                    self.assertNotEqual(p[layer].id, self.port_in)
1349                else:
1350                    self.assertEqual(p[layer].id, self.port_in)
1351            self.assertEqual(data, p[Raw].load)
1352
1353            # out2in
1354            if not dont_translate:
1355                dst_addr = self.nat_addr
1356            else:
1357                dst_addr = self.pg0.remote_ip4
1358            if proto != IP_PROTOS.icmp:
1359                sport = 20
1360                dport = p[layer].sport
1361            else:
1362                sport = p[layer].id
1363                dport = 0
1364            pkts = self.create_stream_frag(self.pg1,
1365                                           dst_addr,
1366                                           sport,
1367                                           dport,
1368                                           data,
1369                                           proto,
1370                                           echo_reply=True)
1371            pkts.reverse()
1372            self.pg1.add_stream(pkts)
1373            self.pg_enable_capture(self.pg_interfaces)
1374            self.pg_start()
1375            frags = self.pg0.get_capture(len(pkts))
1376            p = self.reass_frags_and_verify(frags,
1377                                            self.pg1.remote_ip4,
1378                                            self.pg0.remote_ip4)
1379            if proto != IP_PROTOS.icmp:
1380                self.assertEqual(p[layer].sport, 20)
1381                self.assertEqual(p[layer].dport, self.port_in)
1382            else:
1383                self.assertEqual(p[layer].id, self.port_in)
1384            self.assertEqual(data, p[Raw].load)
1385
1386    def frag_out_of_order_in_plus_out(self, proto=IP_PROTOS.tcp):
1387        layer = self.proto2layer(proto)
1388
1389        if proto == IP_PROTOS.tcp:
1390            data = b"A" * 4 + b"B" * 16 + b"C" * 3
1391        else:
1392            data = b"A" * 16 + b"B" * 16 + b"C" * 3
1393        self.port_in = random.randint(1025, 65535)
1394
1395        for i in range(2):
1396            # out2in
1397            pkts = self.create_stream_frag(self.pg0,
1398                                           self.server_out_addr,
1399                                           self.port_in,
1400                                           self.server_out_port,
1401                                           data,
1402                                           proto)
1403            pkts.reverse()
1404            self.pg0.add_stream(pkts)
1405            self.pg_enable_capture(self.pg_interfaces)
1406            self.pg_start()
1407            frags = self.pg1.get_capture(len(pkts))
1408            p = self.reass_frags_and_verify(frags,
1409                                            self.pg0.remote_ip4,
1410                                            self.server_in_addr)
1411            if proto != IP_PROTOS.icmp:
1412                self.assertEqual(p[layer].dport, self.server_in_port)
1413                self.assertEqual(p[layer].sport, self.port_in)
1414                self.assertEqual(p[layer].dport, self.server_in_port)
1415            else:
1416                self.assertEqual(p[layer].id, self.port_in)
1417            self.assertEqual(data, p[Raw].load)
1418
1419            # in2out
1420            if proto != IP_PROTOS.icmp:
1421                pkts = self.create_stream_frag(self.pg1,
1422                                               self.pg0.remote_ip4,
1423                                               self.server_in_port,
1424                                               p[layer].sport,
1425                                               data,
1426                                               proto)
1427            else:
1428                pkts = self.create_stream_frag(self.pg1,
1429                                               self.pg0.remote_ip4,
1430                                               p[layer].id,
1431                                               0,
1432                                               data,
1433                                               proto,
1434                                               echo_reply=True)
1435            pkts.reverse()
1436            self.pg1.add_stream(pkts)
1437            self.pg_enable_capture(self.pg_interfaces)
1438            self.pg_start()
1439            frags = self.pg0.get_capture(len(pkts))
1440            p = self.reass_frags_and_verify(frags,
1441                                            self.server_out_addr,
1442                                            self.pg0.remote_ip4)
1443            if proto != IP_PROTOS.icmp:
1444                self.assertEqual(p[layer].sport, self.server_out_port)
1445                self.assertEqual(p[layer].dport, self.port_in)
1446            else:
1447                self.assertEqual(p[layer].id, self.port_in)
1448            self.assertEqual(data, p[Raw].load)
1449
1450
1451class TestNAT44(MethodHolder):
1452    """ NAT44 Test Cases """
1453
1454    @classmethod
1455    def setUpClass(cls):
1456        super(TestNAT44, cls).setUpClass()
1457        cls.vapi.cli("set log class nat level debug")
1458
1459        try:
1460            cls.tcp_port_in = 6303
1461            cls.tcp_port_out = 6303
1462            cls.udp_port_in = 6304
1463            cls.udp_port_out = 6304
1464            cls.icmp_id_in = 6305
1465            cls.icmp_id_out = 6305
1466            cls.nat_addr = '10.0.0.3'
1467            cls.ipfix_src_port = 4739
1468            cls.ipfix_domain_id = 1
1469            cls.tcp_external_port = 80
1470            cls.udp_external_port = 69
1471
1472            cls.create_pg_interfaces(range(10))
1473            cls.interfaces = list(cls.pg_interfaces[0:4])
1474
1475            for i in cls.interfaces:
1476                i.admin_up()
1477                i.config_ip4()
1478                i.resolve_arp()
1479
1480            cls.pg0.generate_remote_hosts(3)
1481            cls.pg0.configure_ipv4_neighbors()
1482
1483            cls.pg1.generate_remote_hosts(1)
1484            cls.pg1.configure_ipv4_neighbors()
1485
1486            cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
1487            cls.vapi.ip_table_add_del(is_add=1, table={'table_id': 10})
1488            cls.vapi.ip_table_add_del(is_add=1, table={'table_id': 20})
1489
1490            cls.pg4._local_ip4 = "172.16.255.1"
1491            cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
1492            cls.pg4.set_table_ip4(10)
1493            cls.pg5._local_ip4 = "172.17.255.3"
1494            cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
1495            cls.pg5.set_table_ip4(10)
1496            cls.pg6._local_ip4 = "172.16.255.1"
1497            cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
1498            cls.pg6.set_table_ip4(20)
1499            for i in cls.overlapping_interfaces:
1500                i.config_ip4()
1501                i.admin_up()
1502                i.resolve_arp()
1503
1504            cls.pg7.admin_up()
1505            cls.pg8.admin_up()
1506
1507            cls.pg9.generate_remote_hosts(2)
1508            cls.pg9.config_ip4()
1509            cls.vapi.sw_interface_add_del_address(
1510                sw_if_index=cls.pg9.sw_if_index,
1511                prefix="10.0.0.1/24")
1512
1513            cls.pg9.admin_up()
1514            cls.pg9.resolve_arp()
1515            cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
1516            cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
1517            cls.pg9.resolve_arp()
1518
1519        except Exception:
1520            super(TestNAT44, cls).tearDownClass()
1521            raise
1522
1523    @classmethod
1524    def tearDownClass(cls):
1525        super(TestNAT44, cls).tearDownClass()
1526
1527    def test_dynamic(self):
1528        """ NAT44 dynamic translation test """
1529        self.nat44_add_address(self.nat_addr)
1530        flags = self.config_flags.NAT_IS_INSIDE
1531        self.vapi.nat44_interface_add_del_feature(
1532            sw_if_index=self.pg0.sw_if_index,
1533            flags=flags, is_add=1)
1534        self.vapi.nat44_interface_add_del_feature(
1535            sw_if_index=self.pg1.sw_if_index,
1536            is_add=1)
1537
1538        # in2out
1539        tcpn = self.statistics.get_err_counter(
1540            '/err/nat44-in2out-slowpath/TCP packets')
1541        udpn = self.statistics.get_err_counter(
1542            '/err/nat44-in2out-slowpath/UDP packets')
1543        icmpn = self.statistics.get_err_counter(
1544            '/err/nat44-in2out-slowpath/ICMP packets')
1545        totaln = self.statistics.get_err_counter(
1546            '/err/nat44-in2out-slowpath/good in2out packets processed')
1547
1548        pkts = self.create_stream_in(self.pg0, self.pg1)
1549        self.pg0.add_stream(pkts)
1550        self.pg_enable_capture(self.pg_interfaces)
1551        self.pg_start()
1552        capture = self.pg1.get_capture(len(pkts))
1553        self.verify_capture_out(capture)
1554
1555        err = self.statistics.get_err_counter(
1556            '/err/nat44-in2out-slowpath/TCP packets')
1557        self.assertEqual(err - tcpn, 2)
1558        err = self.statistics.get_err_counter(
1559            '/err/nat44-in2out-slowpath/UDP packets')
1560        self.assertEqual(err - udpn, 1)
1561        err = self.statistics.get_err_counter(
1562            '/err/nat44-in2out-slowpath/ICMP packets')
1563        self.assertEqual(err - icmpn, 1)
1564        err = self.statistics.get_err_counter(
1565            '/err/nat44-in2out-slowpath/good in2out packets processed')
1566        self.assertEqual(err - totaln, 4)
1567
1568        # out2in
1569        tcpn = self.statistics.get_err_counter('/err/nat44-out2in/TCP packets')
1570        udpn = self.statistics.get_err_counter('/err/nat44-out2in/UDP packets')
1571        icmpn = self.statistics.get_err_counter(
1572            '/err/nat44-out2in/ICMP packets')
1573        totaln = self.statistics.get_err_counter(
1574            '/err/nat44-out2in/good out2in packets processed')
1575
1576        pkts = self.create_stream_out(self.pg1)
1577        self.pg1.add_stream(pkts)
1578        self.pg_enable_capture(self.pg_interfaces)
1579        self.pg_start()
1580        capture = self.pg0.get_capture(len(pkts))
1581        self.verify_capture_in(capture, self.pg0)
1582
1583        err = self.statistics.get_err_counter('/err/nat44-out2in/TCP packets')
1584        self.assertEqual(err - tcpn, 2)
1585        err = self.statistics.get_err_counter('/err/nat44-out2in/UDP packets')
1586        self.assertEqual(err - udpn, 1)
1587        err = self.statistics.get_err_counter('/err/nat44-out2in/ICMP packets')
1588        self.assertEqual(err - icmpn, 1)
1589        err = self.statistics.get_err_counter(
1590            '/err/nat44-out2in/good out2in packets processed')
1591        self.assertEqual(err - totaln, 4)
1592
1593        users = self.statistics.get_counter('/nat44/total-users')
1594        self.assertEqual(users[0][0], 1)
1595        sessions = self.statistics.get_counter('/nat44/total-sessions')
1596        self.assertEqual(sessions[0][0], 3)
1597
1598    def test_dynamic_icmp_errors_in2out_ttl_1(self):
1599        """ NAT44 handling of client packets with TTL=1 """
1600
1601        self.nat44_add_address(self.nat_addr)
1602        flags = self.config_flags.NAT_IS_INSIDE
1603        self.vapi.nat44_interface_add_del_feature(
1604            sw_if_index=self.pg0.sw_if_index,
1605            flags=flags, is_add=1)
1606        self.vapi.nat44_interface_add_del_feature(
1607            sw_if_index=self.pg1.sw_if_index,
1608            is_add=1)
1609
1610        # Client side - generate traffic
1611        pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1612        self.pg0.add_stream(pkts)
1613        self.pg_enable_capture(self.pg_interfaces)
1614        self.pg_start()
1615
1616        # Client side - verify ICMP type 11 packets
1617        capture = self.pg0.get_capture(len(pkts))
1618        self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1619
1620    def test_dynamic_icmp_errors_out2in_ttl_1(self):
1621        """ NAT44 handling of server packets with TTL=1 """
1622
1623        self.nat44_add_address(self.nat_addr)
1624        flags = self.config_flags.NAT_IS_INSIDE
1625        self.vapi.nat44_interface_add_del_feature(
1626            sw_if_index=self.pg0.sw_if_index,
1627            flags=flags, is_add=1)
1628        self.vapi.nat44_interface_add_del_feature(
1629            sw_if_index=self.pg1.sw_if_index,
1630            is_add=1)
1631
1632        # Client side - create sessions
1633        pkts = self.create_stream_in(self.pg0, self.pg1)
1634        self.pg0.add_stream(pkts)
1635        self.pg_enable_capture(self.pg_interfaces)
1636        self.pg_start()
1637
1638        # Server side - generate traffic
1639        capture = self.pg1.get_capture(len(pkts))
1640        self.verify_capture_out(capture)
1641        pkts = self.create_stream_out(self.pg1, ttl=1)
1642        self.pg1.add_stream(pkts)
1643        self.pg_enable_capture(self.pg_interfaces)
1644        self.pg_start()
1645
1646        # Server side - verify ICMP type 11 packets
1647        capture = self.pg1.get_capture(len(pkts))
1648        self.verify_capture_out_with_icmp_errors(capture,
1649                                                 src_ip=self.pg1.local_ip4)
1650
1651    def test_dynamic_icmp_errors_in2out_ttl_2(self):
1652        """ NAT44 handling of error responses to client packets with TTL=2 """
1653
1654        self.nat44_add_address(self.nat_addr)
1655        flags = self.config_flags.NAT_IS_INSIDE
1656        self.vapi.nat44_interface_add_del_feature(
1657            sw_if_index=self.pg0.sw_if_index,
1658            flags=flags, is_add=1)
1659        self.vapi.nat44_interface_add_del_feature(
1660            sw_if_index=self.pg1.sw_if_index,
1661            is_add=1)
1662
1663        # Client side - generate traffic
1664        pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1665        self.pg0.add_stream(pkts)
1666        self.pg_enable_capture(self.pg_interfaces)
1667        self.pg_start()
1668
1669        # Server side - simulate ICMP type 11 response
1670        capture = self.pg1.get_capture(len(pkts))
1671        pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1672                IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1673                ICMP(type=11) / packet[IP] for packet in capture]
1674        self.pg1.add_stream(pkts)
1675        self.pg_enable_capture(self.pg_interfaces)
1676        self.pg_start()
1677
1678        # Client side - verify ICMP type 11 packets
1679        capture = self.pg0.get_capture(len(pkts))
1680        self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1681
1682    def test_dynamic_icmp_errors_out2in_ttl_2(self):
1683        """ NAT44 handling of error responses to server packets with TTL=2 """
1684
1685        self.nat44_add_address(self.nat_addr)
1686        flags = self.config_flags.NAT_IS_INSIDE
1687        self.vapi.nat44_interface_add_del_feature(
1688            sw_if_index=self.pg0.sw_if_index,
1689            flags=flags, is_add=1)
1690        self.vapi.nat44_interface_add_del_feature(
1691            sw_if_index=self.pg1.sw_if_index,
1692            is_add=1)
1693
1694        # Client side - create sessions
1695        pkts = self.create_stream_in(self.pg0, self.pg1)
1696        self.pg0.add_stream(pkts)
1697        self.pg_enable_capture(self.pg_interfaces)
1698        self.pg_start()
1699
1700        # Server side - generate traffic
1701        capture = self.pg1.get_capture(len(pkts))
1702        self.verify_capture_out(capture)
1703        pkts = self.create_stream_out(self.pg1, ttl=2)
1704        self.pg1.add_stream(pkts)
1705        self.pg_enable_capture(self.pg_interfaces)
1706        self.pg_start()
1707
1708        # Client side - simulate ICMP type 11 response
1709        capture = self.pg0.get_capture(len(pkts))
1710        pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1711                IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1712                ICMP(type=11) / packet[IP] for packet in capture]
1713        self.pg0.add_stream(pkts)
1714        self.pg_enable_capture(self.pg_interfaces)
1715        self.pg_start()
1716
1717        # Server side - verify ICMP type 11 packets
1718        capture = self.pg1.get_capture(len(pkts))
1719        self.verify_capture_out_with_icmp_errors(capture)
1720
1721    def test_ping_out_interface_from_outside(self):
1722        """ Ping NAT44 out interface from outside network """
1723
1724        self.nat44_add_address(self.nat_addr)
1725        flags = self.config_flags.NAT_IS_INSIDE
1726        self.vapi.nat44_interface_add_del_feature(
1727            sw_if_index=self.pg0.sw_if_index,
1728            flags=flags, is_add=1)
1729        self.vapi.nat44_interface_add_del_feature(
1730            sw_if_index=self.pg1.sw_if_index,
1731            is_add=1)
1732
1733        p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1734             IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1735             ICMP(id=self.icmp_id_out, type='echo-request'))
1736        pkts = [p]
1737        self.pg1.add_stream(pkts)
1738        self.pg_enable_capture(self.pg_interfaces)
1739        self.pg_start()
1740        capture = self.pg1.get_capture(len(pkts))
1741        packet = capture[0]
1742        try:
1743            self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1744            self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1745            self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1746            self.assertEqual(packet[ICMP].type, 0)  # echo reply
1747        except:
1748            self.logger.error(ppp("Unexpected or invalid packet "
1749                                  "(outside network):", packet))
1750            raise
1751
1752    def test_ping_internal_host_from_outside(self):
1753        """ Ping internal host from outside network """
1754
1755        self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1756        flags = self.config_flags.NAT_IS_INSIDE
1757        self.vapi.nat44_interface_add_del_feature(
1758            sw_if_index=self.pg0.sw_if_index,
1759            flags=flags, is_add=1)
1760        self.vapi.nat44_interface_add_del_feature(
1761            sw_if_index=self.pg1.sw_if_index,
1762            is_add=1)
1763
1764        # out2in
1765        pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1766               IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1767               ICMP(id=self.icmp_id_out, type='echo-request'))
1768        self.pg1.add_stream(pkt)
1769        self.pg_enable_capture(self.pg_interfaces)
1770        self.pg_start()
1771        capture = self.pg0.get_capture(1)
1772        self.verify_capture_in(capture, self.pg0)
1773        self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1774
1775        # in2out
1776        pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1777               IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1778               ICMP(id=self.icmp_id_in, type='echo-reply'))
1779        self.pg0.add_stream(pkt)
1780        self.pg_enable_capture(self.pg_interfaces)
1781        self.pg_start()
1782        capture = self.pg1.get_capture(1)
1783        self.verify_capture_out(capture, same_port=True)
1784        self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1785
1786    def test_forwarding(self):
1787        """ NAT44 forwarding test """
1788
1789        flags = self.config_flags.NAT_IS_INSIDE
1790        self.vapi.nat44_interface_add_del_feature(
1791            sw_if_index=self.pg0.sw_if_index,
1792            flags=flags, is_add=1)
1793        self.vapi.nat44_interface_add_del_feature(
1794            sw_if_index=self.pg1.sw_if_index,
1795            is_add=1)
1796        self.vapi.nat44_forwarding_enable_disable(enable=1)
1797
1798        real_ip = self.pg0.remote_ip4
1799        alias_ip = self.nat_addr
1800        flags = self.config_flags.NAT_IS_ADDR_ONLY
1801        self.vapi.nat44_add_del_static_mapping(is_add=1,
1802                                               local_ip_address=real_ip,
1803                                               external_ip_address=alias_ip,
1804                                               external_sw_if_index=0xFFFFFFFF,
1805                                               flags=flags)
1806
1807        try:
1808            # static mapping match
1809
1810            pkts = self.create_stream_out(self.pg1)
1811            self.pg1.add_stream(pkts)
1812            self.pg_enable_capture(self.pg_interfaces)
1813            self.pg_start()
1814            capture = self.pg0.get_capture(len(pkts))
1815            self.verify_capture_in(capture, self.pg0)
1816
1817            pkts = self.create_stream_in(self.pg0, self.pg1)
1818            self.pg0.add_stream(pkts)
1819            self.pg_enable_capture(self.pg_interfaces)
1820            self.pg_start()
1821            capture = self.pg1.get_capture(len(pkts))
1822            self.verify_capture_out(capture, same_port=True)
1823
1824            # no static mapping match
1825
1826            host0 = self.pg0.remote_hosts[0]
1827            self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1828            try:
1829                pkts = self.create_stream_out(self.pg1,
1830                                              dst_ip=self.pg0.remote_ip4,
1831                                              use_inside_ports=True)
1832                self.pg1.add_stream(pkts)
1833                self.pg_enable_capture(self.pg_interfaces)
1834                self.pg_start()
1835                capture = self.pg0.get_capture(len(pkts))
1836                self.verify_capture_in(capture, self.pg0)
1837
1838                pkts = self.create_stream_in(self.pg0, self.pg1)
1839                self.pg0.add_stream(pkts)
1840                self.pg_enable_capture(self.pg_interfaces)
1841                self.pg_start()
1842                capture = self.pg1.get_capture(len(pkts))
1843                self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1844                                        same_port=True)
1845            finally:
1846                self.pg0.remote_hosts[0] = host0
1847
1848        finally:
1849            self.vapi.nat44_forwarding_enable_disable(enable=0)
1850            flags = self.config_flags.NAT_IS_ADDR_ONLY
1851            self.vapi.nat44_add_del_static_mapping(
1852                is_add=0,
1853                local_ip_address=real_ip,
1854                external_ip_address=alias_ip,
1855                external_sw_if_index=0xFFFFFFFF,
1856                flags=flags)
1857
1858    def test_static_in(self):
1859        """ 1:1 NAT initialized from inside network """
1860
1861        nat_ip = "10.0.0.10"
1862        self.tcp_port_out = 6303
1863        self.udp_port_out = 6304
1864        self.icmp_id_out = 6305
1865
1866        self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1867        flags = self.config_flags.NAT_IS_INSIDE
1868        self.vapi.nat44_interface_add_del_feature(
1869            sw_if_index=self.pg0.sw_if_index,
1870            flags=flags, is_add=1)
1871        self.vapi.nat44_interface_add_del_feature(
1872            sw_if_index=self.pg1.sw_if_index,
1873            is_add=1)
1874        sm = self.vapi.nat44_static_mapping_dump()
1875        self.assertEqual(len(sm), 1)
1876        self.assertEqual(sm[0].tag, '')
1877        self.assertEqual(sm[0].protocol, 0)
1878        self.assertEqual(sm[0].local_port, 0)
1879        self.assertEqual(sm[0].external_port, 0)
1880
1881        # in2out
1882        pkts = self.create_stream_in(self.pg0, self.pg1)
1883        self.pg0.add_stream(pkts)
1884        self.pg_enable_capture(self.pg_interfaces)
1885        self.pg_start()
1886        capture = self.pg1.get_capture(len(pkts))
1887        self.verify_capture_out(capture, nat_ip, True)
1888
1889        # out2in
1890        pkts = self.create_stream_out(self.pg1, nat_ip)
1891        self.pg1.add_stream(pkts)
1892        self.pg_enable_capture(self.pg_interfaces)
1893        self.pg_start()
1894        capture = self.pg0.get_capture(len(pkts))
1895        self.verify_capture_in(capture, self.pg0)
1896
1897    def test_static_out(self):
1898        """ 1:1 NAT initialized from outside network """
1899
1900        nat_ip = "10.0.0.20"
1901        self.tcp_port_out = 6303
1902        self.udp_port_out = 6304
1903        self.icmp_id_out = 6305
1904        tag = "testTAG"
1905
1906        self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
1907        flags = self.config_flags.NAT_IS_INSIDE
1908        self.vapi.nat44_interface_add_del_feature(
1909            sw_if_index=self.pg0.sw_if_index,
1910            flags=flags, is_add=1)
1911        self.vapi.nat44_interface_add_del_feature(
1912            sw_if_index=self.pg1.sw_if_index,
1913            is_add=1)
1914        sm = self.vapi.nat44_static_mapping_dump()
1915        self.assertEqual(len(sm), 1)
1916        self.assertEqual(sm[0].tag, tag)
1917
1918        # out2in
1919        pkts = self.create_stream_out(self.pg1, nat_ip)
1920        self.pg1.add_stream(pkts)
1921        self.pg_enable_capture(self.pg_interfaces)
1922        self.pg_start()
1923        capture = self.pg0.get_capture(len(pkts))
1924        self.verify_capture_in(capture, self.pg0)
1925
1926        # in2out
1927        pkts = self.create_stream_in(self.pg0, self.pg1)
1928        self.pg0.add_stream(pkts)
1929        self.pg_enable_capture(self.pg_interfaces)
1930        self.pg_start()
1931        capture = self.pg1.get_capture(len(pkts))
1932        self.verify_capture_out(capture, nat_ip, True)
1933
1934    def test_static_with_port_in(self):
1935        """ 1:1 NAPT initialized from inside network """
1936
1937        self.tcp_port_out = 3606
1938        self.udp_port_out = 3607
1939        self.icmp_id_out = 3608
1940
1941        self.nat44_add_address(self.nat_addr)
1942        self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1943                                      self.tcp_port_in, self.tcp_port_out,
1944                                      proto=IP_PROTOS.tcp)
1945        self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1946                                      self.udp_port_in, self.udp_port_out,
1947                                      proto=IP_PROTOS.udp)
1948        self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1949                                      self.icmp_id_in, self.icmp_id_out,
1950                                      proto=IP_PROTOS.icmp)
1951        flags = self.config_flags.NAT_IS_INSIDE
1952        self.vapi.nat44_interface_add_del_feature(
1953            sw_if_index=self.pg0.sw_if_index,
1954            flags=flags, is_add=1)
1955        self.vapi.nat44_interface_add_del_feature(
1956            sw_if_index=self.pg1.sw_if_index,
1957            is_add=1)
1958
1959        # in2out
1960        pkts = self.create_stream_in(self.pg0, self.pg1)
1961        self.pg0.add_stream(pkts)
1962        self.pg_enable_capture(self.pg_interfaces)
1963        self.pg_start()
1964        capture = self.pg1.get_capture(len(pkts))
1965        self.verify_capture_out(capture)
1966
1967        # out2in
1968        pkts = self.create_stream_out(self.pg1)
1969        self.pg1.add_stream(pkts)
1970        self.pg_enable_capture(self.pg_interfaces)
1971        self.pg_start()
1972        capture = self.pg0.get_capture(len(pkts))
1973        self.verify_capture_in(capture, self.pg0)
1974
1975    def test_static_with_port_out(self):
1976        """ 1:1 NAPT initialized from outside network """
1977
1978        self.tcp_port_out = 30606
1979        self.udp_port_out = 30607
1980        self.icmp_id_out = 30608
1981
1982        self.nat44_add_address(self.nat_addr)
1983        self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1984                                      self.tcp_port_in, self.tcp_port_out,
1985                                      proto=IP_PROTOS.tcp)
1986        self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1987                                      self.udp_port_in, self.udp_port_out,
1988                                      proto=IP_PROTOS.udp)
1989        self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1990                                      self.icmp_id_in, self.icmp_id_out,
1991                                      proto=IP_PROTOS.icmp)
1992        flags = self.config_flags.NAT_IS_INSIDE
1993        self.vapi.nat44_interface_add_del_feature(
1994            sw_if_index=self.pg0.sw_if_index,
1995            flags=flags, is_add=1)
1996        self.vapi.nat44_interface_add_del_feature(
1997            sw_if_index=self.pg1.sw_if_index,
1998            is_add=1)
1999
2000        # out2in
2001        pkts = self.create_stream_out(self.pg1)
2002        self.pg1.add_stream(pkts)
2003        self.pg_enable_capture(self.pg_interfaces)
2004        self.pg_start()
2005        capture = self.pg0.get_capture(len(pkts))
2006        self.verify_capture_in(capture, self.pg0)
2007
2008        # in2out
2009        pkts = self.create_stream_in(self.pg0, self.pg1)
2010        self.pg0.add_stream(pkts)
2011        self.pg_enable_capture(self.pg_interfaces)
2012        self.pg_start()
2013        capture = self.pg1.get_capture(len(pkts))
2014        self.verify_capture_out(capture)
2015
2016    def test_static_vrf_aware(self):
2017        """ 1:1 NAT VRF awareness """
2018
2019        nat_ip1 = "10.0.0.30"
2020        nat_ip2 = "10.0.0.40"
2021        self.tcp_port_out = 6303
2022        self.udp_port_out = 6304
2023        self.icmp_id_out = 6305
2024
2025        self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
2026                                      vrf_id=10)
2027        self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
2028                                      vrf_id=10)
2029        flags = self.config_flags.NAT_IS_INSIDE
2030        self.vapi.nat44_interface_add_del_feature(
2031            sw_if_index=self.pg3.sw_if_index,
2032            is_add=1)
2033        self.vapi.nat44_interface_add_del_feature(
2034            sw_if_index=self.pg0.sw_if_index,
2035            flags=flags, is_add=1)
2036        self.vapi.nat44_interface_add_del_feature(
2037            sw_if_index=self.pg4.sw_if_index,
2038            flags=flags, is_add=1)
2039
2040        # inside interface VRF match NAT44 static mapping VRF
2041        pkts = self.create_stream_in(self.pg4, self.pg3)
2042        self.pg4.add_stream(pkts)
2043        self.pg_enable_capture(self.pg_interfaces)
2044        self.pg_start()
2045        capture = self.pg3.get_capture(len(pkts))
2046        self.verify_capture_out(capture, nat_ip1, True)
2047
2048        # inside interface VRF don't match NAT44 static mapping VRF (packets
2049        # are dropped)
2050        pkts = self.create_stream_in(self.pg0, self.pg3)
2051        self.pg0.add_stream(pkts)
2052        self.pg_enable_capture(self.pg_interfaces)
2053        self.pg_start()
2054        self.pg3.assert_nothing_captured()
2055
2056    def test_dynamic_to_static(self):
2057        """ Switch from dynamic translation to 1:1NAT """
2058        nat_ip = "10.0.0.10"
2059        self.tcp_port_out = 6303
2060        self.udp_port_out = 6304
2061        self.icmp_id_out = 6305
2062
2063        self.nat44_add_address(self.nat_addr)
2064        flags = self.config_flags.NAT_IS_INSIDE
2065        self.vapi.nat44_interface_add_del_feature(
2066            sw_if_index=self.pg0.sw_if_index,
2067            flags=flags, is_add=1)
2068        self.vapi.nat44_interface_add_del_feature(
2069            sw_if_index=self.pg1.sw_if_index,
2070            is_add=1)
2071
2072        # dynamic
2073        pkts = self.create_stream_in(self.pg0, self.pg1)
2074        self.pg0.add_stream(pkts)
2075        self.pg_enable_capture(self.pg_interfaces)
2076        self.pg_start()
2077        capture = self.pg1.get_capture(len(pkts))
2078        self.verify_capture_out(capture)
2079
2080        # 1:1NAT
2081        self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2082        sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
2083        self.assertEqual(len(sessions), 0)
2084        pkts = self.create_stream_in(self.pg0, self.pg1)
2085        self.pg0.add_stream(pkts)
2086        self.pg_enable_capture(self.pg_interfaces)
2087        self.pg_start()
2088        capture = self.pg1.get_capture(len(pkts))
2089        self.verify_capture_out(capture, nat_ip, True)
2090
2091    def test_identity_nat(self):
2092        """ Identity NAT """
2093        flags = self.config_flags.NAT_IS_ADDR_ONLY
2094        self.vapi.nat44_add_del_identity_mapping(
2095            ip_address=self.pg0.remote_ip4, sw_if_index=0xFFFFFFFF,
2096            flags=flags, is_add=1)
2097        flags = self.config_flags.NAT_IS_INSIDE
2098        self.vapi.nat44_interface_add_del_feature(
2099            sw_if_index=self.pg0.sw_if_index,
2100            flags=flags, is_add=1)
2101        self.vapi.nat44_interface_add_del_feature(
2102            sw_if_index=self.pg1.sw_if_index,
2103            is_add=1)
2104
2105        p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2106             IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
2107             TCP(sport=12345, dport=56789))
2108        self.pg1.add_stream(p)
2109        self.pg_enable_capture(self.pg_interfaces)
2110        self.pg_start()
2111        capture = self.pg0.get_capture(1)
2112        p = capture[0]
2113        try:
2114            ip = p[IP]
2115            tcp = p[TCP]
2116            self.assertEqual(ip.dst, self.pg0.remote_ip4)
2117            self.assertEqual(ip.src, self.pg1.remote_ip4)
2118            self.assertEqual(tcp.dport, 56789)
2119            self.assertEqual(tcp.sport, 12345)
2120            self.assert_packet_checksums_valid(p)
2121        except:
2122            self.logger.error(ppp("Unexpected or invalid packet:", p))
2123            raise
2124
2125        sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
2126        self.assertEqual(len(sessions), 0)
2127        flags = self.config_flags.NAT_IS_ADDR_ONLY
2128        self.vapi.nat44_add_del_identity_mapping(
2129            ip_address=self.pg0.remote_ip4, sw_if_index=0xFFFFFFFF,
2130            flags=flags, vrf_id=1, is_add=1)
2131        identity_mappings = self.vapi.nat44_identity_mapping_dump()
2132        self.assertEqual(len(identity_mappings), 2)
2133
2134    def test_multiple_inside_interfaces(self):
2135        """ NAT44 multiple non-overlapping address space inside interfaces """
2136
2137        self.nat44_add_address(self.nat_addr)
2138        flags = self.config_flags.NAT_IS_INSIDE
2139        self.vapi.nat44_interface_add_del_feature(
2140            sw_if_index=self.pg0.sw_if_index,
2141            flags=flags, is_add=1)
2142        self.vapi.nat44_interface_add_del_feature(
2143            sw_if_index=self.pg1.sw_if_index,
2144            flags=flags, is_add=1)
2145        self.vapi.nat44_interface_add_del_feature(
2146            sw_if_index=self.pg3.sw_if_index,
2147            is_add=1)
2148
2149        # between two NAT44 inside interfaces (no translation)
2150        pkts = self.create_stream_in(self.pg0, self.pg1)
2151        self.pg0.add_stream(pkts)
2152        self.pg_enable_capture(self.pg_interfaces)
2153        self.pg_start()
2154        capture = self.pg1.get_capture(len(pkts))
2155        self.verify_capture_no_translation(capture, self.pg0, self.pg1)
2156
2157        # from NAT44 inside to interface without NAT44 feature (no translation)
2158        pkts = self.create_stream_in(self.pg0, self.pg2)
2159        self.pg0.add_stream(pkts)
2160        self.pg_enable_capture(self.pg_interfaces)
2161        self.pg_start()
2162        capture = self.pg2.get_capture(len(pkts))
2163        self.verify_capture_no_translation(capture, self.pg0, self.pg2)
2164
2165        # in2out 1st interface
2166        pkts = self.create_stream_in(self.pg0, self.pg3)
2167        self.pg0.add_stream(pkts)
2168        self.pg_enable_capture(self.pg_interfaces)
2169        self.pg_start()
2170        capture = self.pg3.get_capture(len(pkts))
2171        self.verify_capture_out(capture)
2172
2173        # out2in 1st interface
2174        pkts = self.create_stream_out(self.pg3)
2175        self.pg3.add_stream(pkts)
2176        self.pg_enable_capture(self.pg_interfaces)
2177        self.pg_start()
2178        capture = self.pg0.get_capture(len(pkts))
2179        self.verify_capture_in(capture, self.pg0)
2180
2181        # in2out 2nd interface
2182        pkts = self.create_stream_in(self.pg1, self.pg3)
2183        self.pg1.add_stream(pkts)
2184        self.pg_enable_capture(self.pg_interfaces)
2185        self.pg_start()
2186        capture = self.pg3.get_capture(len(pkts))
2187        self.verify_capture_out(capture)
2188
2189        # out2in 2nd interface
2190        pkts = self.create_stream_out(self.pg3)
2191        self.pg3.add_stream(pkts)
2192        self.pg_enable_capture(self.pg_interfaces)
2193        self.pg_start()
2194        capture = self.pg1.get_capture(len(pkts))
2195        self.verify_capture_in(capture, self.pg1)
2196
2197    def test_inside_overlapping_interfaces(self):
2198        """ NAT44 multiple inside interfaces with overlapping address space """
2199
2200        static_nat_ip = "10.0.0.10"
2201        self.nat44_add_address(self.nat_addr)
2202        flags = self.config_flags.NAT_IS_INSIDE
2203        self.vapi.nat44_interface_add_del_feature(
2204            sw_if_index=self.pg3.sw_if_index,
2205            is_add=1)
2206        self.vapi.nat44_interface_add_del_feature(
2207            sw_if_index=self.pg4.sw_if_index,
2208            flags=flags, is_add=1)
2209        self.vapi.nat44_interface_add_del_feature(
2210            sw_if_index=self.pg5.sw_if_index,
2211            flags=flags, is_add=1)
2212        self.vapi.nat44_interface_add_del_feature(
2213            sw_if_index=self.pg6.sw_if_index,
2214            flags=flags, is_add=1)
2215        self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
2216                                      vrf_id=20)
2217
2218        # between NAT44 inside interfaces with same VRF (no translation)
2219        pkts = self.create_stream_in(self.pg4, self.pg5)
2220        self.pg4.add_stream(pkts)
2221        self.pg_enable_capture(self.pg_interfaces)
2222        self.pg_start()
2223        capture = self.pg5.get_capture(len(pkts))
2224        self.verify_capture_no_translation(capture, self.pg4, self.pg5)
2225
2226        # between NAT44 inside interfaces with different VRF (hairpinning)
2227        p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
2228             IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
2229             TCP(sport=1234, dport=5678))
2230        self.pg4.add_stream(p)
2231        self.pg_enable_capture(self.pg_interfaces)
2232        self.pg_start()
2233        capture = self.pg6.get_capture(1)
2234        p = capture[0]
2235        try:
2236            ip = p[IP]
2237            tcp = p[TCP]
2238            self.assertEqual(ip.src, self.nat_addr)
2239            self.assertEqual(ip.dst, self.pg6.remote_ip4)
2240            self.assertNotEqual(tcp.sport, 1234)
2241            self.assertEqual(tcp.dport, 5678)
2242        except:
2243            self.logger.error(ppp("Unexpected or invalid packet:", p))
2244            raise
2245
2246        # in2out 1st interface
2247        pkts = self.create_stream_in(self.pg4, self.pg3)
2248        self.pg4.add_stream(pkts)
2249        self.pg_enable_capture(self.pg_interfaces)
2250        self.pg_start()
2251        capture = self.pg3.get_capture(len(pkts))
2252        self.verify_capture_out(capture)
2253
2254        # out2in 1st interface
2255        pkts = self.create_stream_out(self.pg3)
2256        self.pg3.add_stream(pkts)
2257        self.pg_enable_capture(self.pg_interfaces)
2258        self.pg_start()
2259        capture = self.pg4.get_capture(len(pkts))
2260        self.verify_capture_in(capture, self.pg4)
2261
2262        # in2out 2nd interface
2263        pkts = self.create_stream_in(self.pg5, self.pg3)
2264        self.pg5.add_stream(pkts)
2265        self.pg_enable_capture(self.pg_interfaces)
2266        self.pg_start()
2267        capture = self.pg3.get_capture(len(pkts))
2268        self.verify_capture_out(capture)
2269
2270        # out2in 2nd interface
2271        pkts = self.create_stream_out(self.pg3)
2272        self.pg3.add_stream(pkts)
2273        self.pg_enable_capture(self.pg_interfaces)
2274        self.pg_start()
2275        capture = self.pg5.get_capture(len(pkts))
2276        self.verify_capture_in(capture, self.pg5)
2277
2278        # pg5 session dump
2279        addresses = self.vapi.nat44_address_dump()
2280        self.assertEqual(len(addresses), 1)
2281        sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4, 10)
2282        self.assertEqual(len(sessions), 3)
2283        for session in sessions:
2284            self.assertFalse(session.flags & self.config_flags.NAT_IS_STATIC)
2285            self.assertEqual(str(session.inside_ip_address),
2286                             self.pg5.remote_ip4)
2287            self.assertEqual(session.outside_ip_address,
2288                             addresses[0].ip_address)
2289        self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
2290        self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
2291        self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
2292        self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
2293        self.assertEqual(sessions[1].inside_port, self.udp_port_in)
2294        self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
2295        self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
2296        self.assertEqual(sessions[1].outside_port, self.udp_port_out)
2297        self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
2298
2299        # in2out 3rd interface
2300        pkts = self.create_stream_in(self.pg6, self.pg3)
2301        self.pg6.add_stream(pkts)
2302        self.pg_enable_capture(self.pg_interfaces)
2303        self.pg_start()
2304        capture = self.pg3.get_capture(len(pkts))
2305        self.verify_capture_out(capture, static_nat_ip, True)
2306
2307        # out2in 3rd interface
2308        pkts = self.create_stream_out(self.pg3, static_nat_ip)
2309        self.pg3.add_stream(pkts)
2310        self.pg_enable_capture(self.pg_interfaces)
2311        self.pg_start()
2312        capture = self.pg6.get_capture(len(pkts))
2313        self.verify_capture_in(capture, self.pg6)
2314
2315        # general user and session dump verifications
2316        users = self.vapi.nat44_user_dump()
2317        self.assertGreaterEqual(len(users), 3)
2318        addresses = self.vapi.nat44_address_dump()
2319        self.assertEqual(len(addresses), 1)
2320        for user in users:
2321            sessions = self.vapi.nat44_user_session_dump(user.ip_address,
2322                                                         user.vrf_id)
2323            for session in sessions:
2324                self.assertEqual(user.ip_address, session.inside_ip_address)
2325                self.assertTrue(session.total_bytes > session.total_pkts > 0)
2326                self.assertTrue(session.protocol in
2327                                [IP_PROTOS.tcp, IP_PROTOS.udp,
2328                                 IP_PROTOS.icmp])
2329                self.assertFalse(session.flags &
2330                                 self.config_flags.NAT_IS_EXT_HOST_VALID)
2331
2332        # pg4 session dump
2333        sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4, 10)
2334        self.assertGreaterEqual(len(sessions), 4)
2335        for session in sessions:
2336            self.assertFalse(session.flags & self.config_flags.NAT_IS_STATIC)
2337            self.assertEqual(str(session.inside_ip_address),
2338                             self.pg4.remote_ip4)
2339            self.assertEqual(session.outside_ip_address,
2340                             addresses[0].ip_address)
2341
2342        # pg6 session dump
2343        sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4, 20)
2344        self.assertGreaterEqual(len(sessions), 3)
2345        for session in sessions:
2346            self.assertTrue(session.flags & self.config_flags.NAT_IS_STATIC)
2347            self.assertEqual(str(session.inside_ip_address),
2348                             self.pg6.remote_ip4)
2349            self.assertEqual(str(session.outside_ip_address),
2350                             static_nat_ip)
2351            self.assertTrue(session.inside_port in
2352                            [self.tcp_port_in, self.udp_port_in,
2353                             self.icmp_id_in])
2354
2355    def test_hairpinning(self):
2356        """ NAT44 hairpinning - 1:1 NAPT """
2357
2358        host = self.pg0.remote_hosts[0]
2359        server = self.pg0.remote_hosts[1]
2360        host_in_port = 1234
2361        host_out_port = 0
2362        server_in_port = 5678
2363        server_out_port = 8765
2364
2365        self.nat44_add_address(self.nat_addr)
2366        flags = self.config_flags.NAT_IS_INSIDE
2367        self.vapi.nat44_interface_add_del_feature(
2368            sw_if_index=self.pg0.sw_if_index,
2369            flags=flags, is_add=1)
2370        self.vapi.nat44_interface_add_del_feature(
2371            sw_if_index=self.pg1.sw_if_index,
2372            is_add=1)
2373
2374        # add static mapping for server
2375        self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2376                                      server_in_port, server_out_port,
2377                                      proto=IP_PROTOS.tcp)
2378
2379        # send packet from host to server
2380        p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2381             IP(src=host.ip4, dst=self.nat_addr) /
2382             TCP(sport=host_in_port, dport=server_out_port))
2383        self.pg0.add_stream(p)
2384        self.pg_enable_capture(self.pg_interfaces)
2385        self.pg_start()
2386        capture = self.pg0.get_capture(1)
2387        p = capture[0]
2388        try:
2389            ip = p[IP]
2390            tcp = p[TCP]
2391            self.assertEqual(ip.src, self.nat_addr)
2392            self.assertEqual(ip.dst, server.ip4)
2393            self.assertNotEqual(tcp.sport, host_in_port)
2394            self.assertEqual(tcp.dport, server_in_port)
2395            self.assert_packet_checksums_valid(p)
2396            host_out_port = tcp.sport
2397        except:
2398            self.logger.error(ppp("Unexpected or invalid packet:", p))
2399            raise
2400
2401        # send reply from server to host
2402        p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2403             IP(src=server.ip4, dst=self.nat_addr) /
2404             TCP(sport=server_in_port, dport=host_out_port))
2405        self.pg0.add_stream(p)
2406        self.pg_enable_capture(self.pg_interfaces)
2407        self.pg_start()
2408        capture = self.pg0.get_capture(1)
2409        p = capture[0]
2410        try:
2411            ip = p[IP]
2412            tcp = p[TCP]
2413            self.assertEqual(ip.src, self.nat_addr)
2414            self.assertEqual(ip.dst, host.ip4)
2415            self.assertEqual(tcp.sport, server_out_port)
2416            self.assertEqual(tcp.dport, host_in_port)
2417            self.assert_packet_checksums_valid(p)
2418        except:
2419            self.logger.error(ppp("Unexpected or invalid packet:", p))
2420            raise
2421
2422    def test_hairpinning2(self):
2423        """ NAT44 hairpinning - 1:1 NAT"""
2424
2425        server1_nat_ip = "10.0.0.10"
2426        server2_nat_ip = "10.0.0.11"
2427        host = self.pg0.remote_hosts[0]
2428        server1 = self.pg0.remote_hosts[1]
2429        server2 = self.pg0.remote_hosts[2]
2430        server_tcp_port = 22
2431        server_udp_port = 20
2432
2433        self.nat44_add_address(self.nat_addr)
2434        flags = self.config_flags.NAT_IS_INSIDE
2435        self.vapi.nat44_interface_add_del_feature(
2436            sw_if_index=self.pg0.sw_if_index,
2437            flags=flags, is_add=1)
2438        self.vapi.nat44_interface_add_del_feature(
2439            sw_if_index=self.pg1.sw_if_index,
2440            is_add=1)
2441
2442        # add static mapping for servers
2443        self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
2444        self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
2445
2446        # host to server1
2447        pkts = []
2448        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2449             IP(src=host.ip4, dst=server1_nat_ip) /
2450             TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2451        pkts.append(p)
2452        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2453             IP(src=host.ip4, dst=server1_nat_ip) /
2454             UDP(sport=self.udp_port_in, dport=server_udp_port))
2455        pkts.append(p)
2456        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2457             IP(src=host.ip4, dst=server1_nat_ip) /
2458             ICMP(id=self.icmp_id_in, type='echo-request'))
2459        pkts.append(p)
2460        self.pg0.add_stream(pkts)
2461        self.pg_enable_capture(self.pg_interfaces)
2462        self.pg_start()
2463        capture = self.pg0.get_capture(len(pkts))
2464        for packet in capture:
2465            try:
2466                self.assertEqual(packet[IP].src, self.nat_addr)
2467                self.assertEqual(packet[IP].dst, server1.ip4)
2468                if packet.haslayer(TCP):
2469                    self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
2470                    self.assertEqual(packet[TCP].dport, server_tcp_port)
2471                    self.tcp_port_out = packet[TCP].sport
2472                    self.assert_packet_checksums_valid(packet)
2473                elif packet.haslayer(UDP):
2474                    self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
2475                    self.assertEqual(packet[UDP].dport, server_udp_port)
2476                    self.udp_port_out = packet[UDP].sport
2477                else:
2478                    self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
2479                    self.icmp_id_out = packet[ICMP].id
2480            except:
2481                self.logger.error(ppp("Unexpected or invalid packet:", packet))
2482                raise
2483
2484        # server1 to host
2485        pkts = []
2486        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2487             IP(src=server1.ip4, dst=self.nat_addr) /
2488             TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2489        pkts.append(p)
2490        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2491             IP(src=server1.ip4, dst=self.nat_addr) /
2492             UDP(sport=server_udp_port, dport=self.udp_port_out))
2493        pkts.append(p)
2494        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2495             IP(src=server1.ip4, dst=self.nat_addr) /
2496             ICMP(id=self.icmp_id_out, type='echo-reply'))
2497        pkts.append(p)
2498        self.pg0.add_stream(pkts)
2499        self.pg_enable_capture(self.pg_interfaces)
2500        self.pg_start()
2501        capture = self.pg0.get_capture(len(pkts))
2502        for packet in capture:
2503            try:
2504                self.assertEqual(packet[IP].src, server1_nat_ip)
2505                self.assertEqual(packet[IP].dst, host.ip4)
2506                if packet.haslayer(TCP):
2507                    self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2508                    self.assertEqual(packet[TCP].sport, server_tcp_port)
2509                    self.assert_packet_checksums_valid(packet)
2510                elif packet.haslayer(UDP):
2511                    self.assertEqual(packet[UDP].dport, self.udp_port_in)
2512                    self.assertEqual(packet[UDP].sport, server_udp_port)
2513                else:
2514                    self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2515            except:
2516                self.logger.error(ppp("Unexpected or invalid packet:", packet))
2517                raise
2518
2519        # server2 to server1
2520        pkts = []
2521        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2522             IP(src=server2.ip4, dst=server1_nat_ip) /
2523             TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2524        pkts.append(p)
2525        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2526             IP(src=server2.ip4, dst=server1_nat_ip) /
2527             UDP(sport=self.udp_port_in, dport=server_udp_port))
2528        pkts.append(p)
2529        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2530             IP(src=server2.ip4, dst=server1_nat_ip) /
2531             ICMP(id=self.icmp_id_in, type='echo-request'))
2532        pkts.append(p)
2533        self.pg0.add_stream(pkts)
2534        self.pg_enable_capture(self.pg_interfaces)
2535        self.pg_start()
2536        capture = self.pg0.get_capture(len(pkts))
2537        for packet in capture:
2538            try:
2539                self.assertEqual(packet[IP].src, server2_nat_ip)
2540                self.assertEqual(packet[IP].dst, server1.ip4)
2541                if packet.haslayer(TCP):
2542                    self.assertEqual(packet[TCP].sport, self.tcp_port_in)
2543                    self.assertEqual(packet[TCP].dport, server_tcp_port)
2544                    self.tcp_port_out = packet[TCP].sport
2545                    self.assert_packet_checksums_valid(packet)
2546                elif packet.haslayer(UDP):
2547                    self.assertEqual(packet[UDP].sport, self.udp_port_in)
2548                    self.assertEqual(packet[UDP].dport, server_udp_port)
2549                    self.udp_port_out = packet[UDP].sport
2550                else:
2551                    self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2552                    self.icmp_id_out = packet[ICMP].id
2553            except:
2554                self.logger.error(ppp("Unexpected or invalid packet:", packet))
2555                raise
2556
2557        # server1 to server2
2558        pkts = []
2559        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2560             IP(src=server1.ip4, dst=server2_nat_ip) /
2561             TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2562        pkts.append(p)
2563        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2564             IP(src=server1.ip4, dst=server2_nat_ip) /
2565             UDP(sport=server_udp_port, dport=self.udp_port_out))
2566        pkts.append(p)
2567        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2568             IP(src=server1.ip4, dst=server2_nat_ip) /
2569             ICMP(id=self.icmp_id_out, type='echo-reply'))
2570        pkts.append(p)
2571        self.pg0.add_stream(pkts)
2572        self.pg_enable_capture(self.pg_interfaces)
2573        self.pg_start()
2574        capture = self.pg0.get_capture(len(pkts))
2575        for packet in capture:
2576            try:
2577                self.assertEqual(packet[IP].src, server1_nat_ip)
2578                self.assertEqual(packet[IP].dst, server2.ip4)
2579                if packet.haslayer(TCP):
2580                    self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2581                    self.assertEqual(packet[TCP].sport, server_tcp_port)
2582                    self.assert_packet_checksums_valid(packet)
2583                elif packet.haslayer(UDP):
2584                    self.assertEqual(packet[UDP].dport, self.udp_port_in)
2585                    self.assertEqual(packet[UDP].sport, server_udp_port)
2586                else:
2587                    self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2588            except:
2589                self.logger.error(ppp("Unexpected or invalid packet:", packet))
2590                raise
2591
2592    def test_interface_addr(self):
2593        """ Acquire NAT44 addresses from interface """
2594        self.vapi.nat44_add_del_interface_addr(
2595            is_add=1,
2596            sw_if_index=self.pg7.sw_if_index)
2597
2598        # no address in NAT pool
2599        addresses = self.vapi.nat44_address_dump()
2600        self.assertEqual(0, len(addresses))
2601
2602        # configure interface address and check NAT address pool
2603        self.pg7.config_ip4()
2604        addresses = self.vapi.nat44_address_dump()
2605        self.assertEqual(1, len(addresses))
2606        self.assertEqual(str(addresses[0].ip_address), self.pg7.local_ip4)
2607
2608        # remove interface address and check NAT address pool
2609        self.pg7.unconfig_ip4()
2610        addresses = self.vapi.nat44_address_dump()
2611        self.assertEqual(0, len(addresses))
2612
2613    def test_interface_addr_static_mapping(self):
2614        """ Static mapping with addresses from interface """
2615        tag = "testTAG"
2616
2617        self.vapi.nat44_add_del_interface_addr(
2618            is_add=1,
2619            sw_if_index=self.pg7.sw_if_index)
2620        self.nat44_add_static_mapping(
2621            '1.2.3.4',
2622            external_sw_if_index=self.pg7.sw_if_index,
2623            tag=tag)
2624
2625        # static mappings with external interface
2626        static_mappings = self.vapi.nat44_static_mapping_dump()
2627        self.assertEqual(1, len(static_mappings))
2628        self.assertEqual(self.pg7.sw_if_index,
2629                         static_mappings[0].external_sw_if_index)
2630        self.assertEqual(static_mappings[0].tag, tag)
2631
2632        # configure interface address and check static mappings
2633        self.pg7.config_ip4()
2634        static_mappings = self.vapi.nat44_static_mapping_dump()
2635        self.assertEqual(2, len(static_mappings))
2636        resolved = False
2637        for sm in static_mappings:
2638            if sm.external_sw_if_index == 0xFFFFFFFF:
2639                self.assertEqual(str(sm.external_ip_address),
2640                                 self.pg7.local_ip4)
2641                self.assertEqual(sm.tag, tag)
2642                resolved = True
2643        self.assertTrue(resolved)
2644
2645        # remove interface address and check static mappings
2646        self.pg7.unconfig_ip4()
2647        static_mappings = self.vapi.nat44_static_mapping_dump()
2648        self.assertEqual(1, len(static_mappings))
2649        self.assertEqual(self.pg7.sw_if_index,
2650                         static_mappings[0].external_sw_if_index)
2651        self.assertEqual(static_mappings[0].tag, tag)
2652
2653        # configure interface address again and check static mappings
2654        self.pg7.config_ip4()
2655        static_mappings = self.vapi.nat44_static_mapping_dump()
2656        self.assertEqual(2, len(static_mappings))
2657        resolved = False
2658        for sm in static_mappings:
2659            if sm.external_sw_if_index == 0xFFFFFFFF:
2660                self.assertEqual(str(sm.external_ip_address),
2661                                 self.pg7.local_ip4)
2662                self.assertEqual(sm.tag, tag)
2663                resolved = True
2664        self.assertTrue(resolved)
2665
2666        # remove static mapping
2667        self.nat44_add_static_mapping(
2668            '1.2.3.4',
2669            external_sw_if_index=self.pg7.sw_if_index,
2670            tag=tag,
2671            is_add=0)
2672        static_mappings = self.vapi.nat44_static_mapping_dump()
2673        self.assertEqual(0, len(static_mappings))
2674
2675    def test_interface_addr_identity_nat(self):
2676        """ Identity NAT with addresses from interface """
2677
2678        port = 53053
2679        self.vapi.nat44_add_del_interface_addr(
2680            is_add=1,
2681            sw_if_index=self.pg7.sw_if_index)
2682        self.vapi.nat44_add_del_identity_mapping(
2683            ip_address=b'0',
2684            sw_if_index=self.pg7.sw_if_index,
2685            port=port,
2686            protocol=IP_PROTOS.tcp,
2687            is_add=1)
2688
2689        # identity mappings with external interface
2690        identity_mappings = self.vapi.nat44_identity_mapping_dump()
2691        self.assertEqual(1, len(identity_mappings))
2692        self.assertEqual(self.pg7.sw_if_index,
2693                         identity_mappings[0].sw_if_index)
2694
2695        # configure interface address and check identity mappings
2696        self.pg7.config_ip4()
2697        identity_mappings = self.vapi.nat44_identity_mapping_dump()
2698        resolved = False
2699        self.assertEqual(2, len(identity_mappings))
2700        for sm in identity_mappings:
2701            if sm.sw_if_index == 0xFFFFFFFF:
2702                self.assertEqual(str(identity_mappings[0].ip_address),
2703                                 self.pg7.local_ip4)
2704                self.assertEqual(port, identity_mappings[0].port)
2705                self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2706                resolved = True
2707        self.assertTrue(resolved)
2708
2709        # remove interface address and check identity mappings
2710        self.pg7.unconfig_ip4()
2711        identity_mappings = self.vapi.nat44_identity_mapping_dump()
2712        self.assertEqual(1, len(identity_mappings))
2713        self.assertEqual(self.pg7.sw_if_index,
2714                         identity_mappings[0].sw_if_index)
2715
2716    def test_ipfix_nat44_sess(self):
2717        """ IPFIX logging NAT44 session created/deleted """
2718        self.ipfix_domain_id = 10
2719        self.ipfix_src_port = 20202
2720        collector_port = 30303
2721        bind_layers(UDP, IPFIX, dport=30303)
2722        self.nat44_add_address(self.nat_addr)
2723        flags = self.config_flags.NAT_IS_INSIDE
2724        self.vapi.nat44_interface_add_del_feature(
2725            sw_if_index=self.pg0.sw_if_index,
2726            flags=flags, is_add=1)
2727        self.vapi.nat44_interface_add_del_feature(
2728            sw_if_index=self.pg1.sw_if_index,
2729            is_add=1)
2730        self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
2731                                     src_address=self.pg3.local_ip4,
2732                                     path_mtu=512,
2733                                     template_interval=10,
2734                                     collector_port=collector_port)
2735        self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
2736                                           src_port=self.ipfix_src_port,
2737                                           enable=1)
2738
2739        pkts = self.create_stream_in(self.pg0, self.pg1)
2740        self.pg0.add_stream(pkts)
2741        self.pg_enable_capture(self.pg_interfaces)
2742        self.pg_start()
2743        capture = self.pg1.get_capture(len(pkts))
2744        self.verify_capture_out(capture)
2745        self.nat44_add_address(self.nat_addr, is_add=0)
2746        self.vapi.ipfix_flush()
2747        capture = self.pg3.get_capture(9)
2748        ipfix = IPFIXDecoder()
2749        # first load template
2750        for p in capture:
2751            self.assertTrue(p.haslayer(IPFIX))
2752            self.assertEqual(p[IP].src, self.pg3.local_ip4)
2753            self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2754            self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2755            self.assertEqual(p[UDP].dport, collector_port)
2756            self.assertEqual(p[IPFIX].observationDomainID,
2757                             self.ipfix_domain_id)
2758            if p.haslayer(Template):
2759                ipfix.add_template(p.getlayer(Template))
2760        # verify events in data set
2761        for p in capture:
2762            if p.haslayer(Data):
2763                data = ipfix.decode_data_set(p.getlayer(Set))
2764                self.verify_ipfix_nat44_ses(data)
2765
2766    def test_ipfix_addr_exhausted(self):
2767        """ IPFIX logging NAT addresses exhausted """
2768        flags = self.config_flags.NAT_IS_INSIDE
2769        self.vapi.nat44_interface_add_del_feature(
2770            sw_if_index=self.pg0.sw_if_index,
2771            flags=flags, is_add=1)
2772        self.vapi.nat44_interface_add_del_feature(
2773            sw_if_index=self.pg1.sw_if_index,
2774            is_add=1)
2775        self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
2776                                     src_address=self.pg3.local_ip4,
2777                                     path_mtu=512,
2778                                     template_interval=10)
2779        self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
2780                                           src_port=self.ipfix_src_port,
2781                                           enable=1)
2782
2783        p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2784             IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2785             TCP(sport=3025))
2786        self.pg0.add_stream(p)
2787        self.pg_enable_capture(self.pg_interfaces)
2788        self.pg_start()
2789        self.pg1.assert_nothing_captured()
2790        sleep(1)
2791        self.vapi.ipfix_flush()
2792        capture = self.pg3.get_capture(9)
2793        ipfix = IPFIXDecoder()
2794        # first load template
2795        for p in capture:
2796            self.assertTrue(p.haslayer(IPFIX))
2797            self.assertEqual(p[IP].src, self.pg3.local_ip4)
2798            self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2799            self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2800            self.assertEqual(p[UDP].dport, 4739)
2801            self.assertEqual(p[IPFIX].observationDomainID,
2802                             self.ipfix_domain_id)
2803            if p.haslayer(Template):
2804                ipfix.add_template(p.getlayer(Template))
2805        # verify events in data set
2806        for p in capture:
2807            if p.haslayer(Data):
2808                data = ipfix.decode_data_set(p.getlayer(Set))
2809                self.verify_ipfix_addr_exhausted(data)
2810
2811    @unittest.skipUnless(running_extended_tests, "part of extended tests")
2812    def test_ipfix_max_sessions(self):
2813        """ IPFIX logging maximum session entries exceeded """
2814        self.nat44_add_address(self.nat_addr)
2815        flags = self.config_flags.NAT_IS_INSIDE
2816        self.vapi.nat44_interface_add_del_feature(
2817            sw_if_index=self.pg0.sw_if_index,
2818            flags=flags, is_add=1)
2819        self.vapi.nat44_interface_add_del_feature(
2820            sw_if_index=self.pg1.sw_if_index,
2821            is_add=1)
2822
2823        nat44_config = self.vapi.nat_show_config()
2824        max_sessions = 10 * nat44_config.translation_buckets
2825
2826        pkts = []
2827        for i in range(0, max_sessions):
2828            src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2829            p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2830                 IP(src=src, dst=self.pg1.remote_ip4) /
2831                 TCP(sport=1025))
2832            pkts.append(p)
2833        self.pg0.add_stream(pkts)
2834        self.pg_enable_capture(self.pg_interfaces)
2835        self.pg_start()
2836
2837        self.pg1.get_capture(max_sessions)
2838        self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
2839                                     src_address=self.pg3.local_ip4,
2840                                     path_mtu=512,
2841                                     template_interval=10)
2842        self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
2843                                           src_port=self.ipfix_src_port,
2844                                           enable=1)
2845
2846        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2847             IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2848             TCP(sport=1025))
2849        self.pg0.add_stream(p)
2850        self.pg_enable_capture(self.pg_interfaces)
2851        self.pg_start()
2852        self.pg1.assert_nothing_captured()
2853        sleep(1)
2854        self.vapi.ipfix_flush()
2855        capture = self.pg3.get_capture(9)
2856        ipfix = IPFIXDecoder()
2857        # first load template
2858        for p in capture:
2859            self.assertTrue(p.haslayer(IPFIX))
2860            self.assertEqual(p[IP].src, self.pg3.local_ip4)
2861            self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2862            self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2863            self.assertEqual(p[UDP].dport, 4739)
2864            self.assertEqual(p[IPFIX].observationDomainID,
2865                             self.ipfix_domain_id)
2866            if p.haslayer(Template):
2867                ipfix.add_template(p.getlayer(Template))
2868        # verify events in data set
2869        for p in capture:
2870            if p.haslayer(Data):
2871                data = ipfix.decode_data_set(p.getlayer(Set))
2872                self.verify_ipfix_max_sessions(data, max_sessions)
2873
2874    def test_syslog_apmap(self):
2875        """ Test syslog address and port mapping creation and deletion """
2876        self.vapi.syslog_set_filter(
2877            self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
2878        self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
2879        self.nat44_add_address(self.nat_addr)
2880        flags = self.config_flags.NAT_IS_INSIDE
2881        self.vapi.nat44_interface_add_del_feature(
2882            sw_if_index=self.pg0.sw_if_index,
2883            flags=flags, is_add=1)
2884        self.vapi.nat44_interface_add_del_feature(
2885            sw_if_index=self.pg1.sw_if_index,
2886            is_add=1)
2887
2888        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2889             IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2890             TCP(sport=self.tcp_port_in, dport=20))
2891        self.pg0.add_stream(p)
2892        self.pg_enable_capture(self.pg_interfaces)
2893        self.pg_start()
2894        capture = self.pg1.get_capture(1)
2895        self.tcp_port_out = capture[0][TCP].sport
2896        capture = self.pg3.get_capture(1)
2897        self.verify_syslog_apmap(capture[0][Raw].load)
2898
2899        self.pg_enable_capture(self.pg_interfaces)
2900        self.pg_start()
2901        self.nat44_add_address(self.nat_addr, is_add=0)
2902        capture = self.pg3.get_capture(1)
2903        self.verify_syslog_apmap(capture[0][Raw].load, False)
2904
2905    def test_pool_addr_fib(self):
2906        """ NAT44 add pool addresses to FIB """
2907        static_addr = '10.0.0.10'
2908        self.nat44_add_address(self.nat_addr)
2909        flags = self.config_flags.NAT_IS_INSIDE
2910        self.vapi.nat44_interface_add_del_feature(
2911            sw_if_index=self.pg0.sw_if_index,
2912            flags=flags, is_add=1)
2913        self.vapi.nat44_interface_add_del_feature(
2914            sw_if_index=self.pg1.sw_if_index,
2915            is_add=1)
2916        self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2917
2918        # NAT44 address
2919        p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2920             ARP(op=ARP.who_has, pdst=self.nat_addr,
2921                 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2922        self.pg1.add_stream(p)
2923        self.pg_enable_capture(self.pg_interfaces)
2924        self.pg_start()
2925        capture = self.pg1.get_capture(1)
2926        self.assertTrue(capture[0].haslayer(ARP))
2927        self.assertTrue(capture[0][ARP].op, ARP.is_at)
2928
2929        # 1:1 NAT address
2930        p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2931             ARP(op=ARP.who_has, pdst=static_addr,
2932                 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2933        self.pg1.add_stream(p)
2934        self.pg_enable_capture(self.pg_interfaces)
2935        self.pg_start()
2936        capture = self.pg1.get_capture(1)
2937        self.assertTrue(capture[0].haslayer(ARP))
2938        self.assertTrue(capture[0][ARP].op, ARP.is_at)
2939
2940        # send ARP to non-NAT44 interface
2941        p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2942             ARP(op=ARP.who_has, pdst=self.nat_addr,
2943                 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2944        self.pg2.add_stream(p)
2945        self.pg_enable_capture(self.pg_interfaces)
2946        self.pg_start()
2947        self.pg1.assert_nothing_captured()
2948
2949        # remove addresses and verify
2950        self.nat44_add_address(self.nat_addr, is_add=0)
2951        self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2952                                      is_add=0)
2953
2954        p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2955             ARP(op=ARP.who_has, pdst=self.nat_addr,
2956                 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2957        self.pg1.add_stream(p)
2958        self.pg_enable_capture(self.pg_interfaces)
2959        self.pg_start()
2960        self.pg1.assert_nothing_captured()
2961
2962        p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2963             ARP(op=ARP.who_has, pdst=static_addr,
2964                 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2965        self.pg1.add_stream(p)
2966        self.pg_enable_capture(self.pg_interfaces)
2967        self.pg_start()
2968        self.pg1.assert_nothing_captured()
2969
2970    def test_vrf_mode(self):
2971        """ NAT44 tenant VRF aware address pool mode """
2972
2973        vrf_id1 = 1
2974        vrf_id2 = 2
2975        nat_ip1 = "10.0.0.10"
2976        nat_ip2 = "10.0.0.11"
2977
2978        self.pg0.unconfig_ip4()
2979        self.pg1.unconfig_ip4()
2980        self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id1})
2981        self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id2})
2982        self.pg0.set_table_ip4(vrf_id1)
2983        self.pg1.set_table_ip4(vrf_id2)
2984        self.pg0.config_ip4()
2985        self.pg1.config_ip4()
2986        self.pg0.resolve_arp()
2987        self.pg1.resolve_arp()
2988
2989        self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2990        self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2991        flags = self.config_flags.NAT_IS_INSIDE
2992        self.vapi.nat44_interface_add_del_feature(
2993            sw_if_index=self.pg0.sw_if_index,
2994            flags=flags, is_add=1)
2995        self.vapi.nat44_interface_add_del_feature(
2996            sw_if_index=self.pg1.sw_if_index,
2997            flags=flags, is_add=1)
2998        self.vapi.nat44_interface_add_del_feature(
2999            sw_if_index=self.pg2.sw_if_index,
3000            is_add=1)
3001
3002        try:
3003            # first VRF
3004            pkts = self.create_stream_in(self.pg0, self.pg2)
3005            self.pg0.add_stream(pkts)
3006            self.pg_enable_capture(self.pg_interfaces)
3007            self.pg_start()
3008            capture = self.pg2.get_capture(len(pkts))
3009            self.verify_capture_out(capture, nat_ip1)
3010
3011            # second VRF
3012            pkts = self.create_stream_in(self.pg1, self.pg2)
3013            self.pg1.add_stream(pkts)
3014            self.pg_enable_capture(self.pg_interfaces)
3015            self.pg_start()
3016            capture = self.pg2.get_capture(len(pkts))
3017            self.verify_capture_out(capture, nat_ip2)
3018
3019        finally:
3020            self.pg0.unconfig_ip4()
3021            self.pg1.unconfig_ip4()
3022            self.pg0.set_table_ip4(0)
3023            self.pg1.set_table_ip4(0)
3024            self.pg0.config_ip4()
3025            self.pg1.config_ip4()
3026            self.pg0.resolve_arp()
3027            self.pg1.resolve_arp()
3028            self.vapi.ip_table_add_del(is_add=0, table={'table_id': vrf_id1})
3029            self.vapi.ip_table_add_del(is_add=0, table={'table_id': vrf_id2})
3030
3031    def test_vrf_feature_independent(self):
3032        """ NAT44 tenant VRF independent address pool mode """
3033
3034        nat_ip1 = "10.0.0.10"
3035        nat_ip2 = "10.0.0.11"
3036
3037        self.nat44_add_address(nat_ip1)
3038        self.nat44_add_address(nat_ip2, vrf_id=99)
3039        flags = self.config_flags.NAT_IS_INSIDE
3040        self.vapi.nat44_interface_add_del_feature(
3041            sw_if_index=self.pg0.sw_if_index,
3042            flags=flags, is_add=1)
3043        self.vapi.nat44_interface_add_del_feature(
3044            sw_if_index=self.pg1.sw_if_index,
3045            flags=flags, is_add=1)
3046        self.vapi.nat44_interface_add_del_feature(
3047            sw_if_index=self.pg2.sw_if_index,
3048            is_add=1)
3049
3050        # first VRF
3051        pkts = self.create_stream_in(self.pg0, self.pg2)
3052        self.pg0.add_stream(pkts)
3053        self.pg_enable_capture(self.pg_interfaces)
3054        self.pg_start()
3055        capture = self.pg2.get_capture(len(pkts))
3056        self.verify_capture_out(capture, nat_ip1)
3057
3058        # second VRF
3059        pkts = self.create_stream_in(self.pg1, self.pg2)
3060        self.pg1.add_stream(pkts)
3061        self.pg_enable_capture(self.pg_interfaces)
3062        self.pg_start()
3063        capture = self.pg2.get_capture(len(pkts))
3064        self.verify_capture_out(capture, nat_ip1)
3065
3066    def create_routes_and_neigbors(self):
3067        r1 = VppIpRoute(self, self.pg7.remote_ip4, 32,
3068                        [VppRoutePath(self.pg7.remote_ip4,
3069                                      self.pg7.sw_if_index)])
3070        r2 = VppIpRoute(self, self.pg8.remote_ip4, 32,
3071                        [VppRoutePath(self.pg8.remote_ip4,
3072                                      self.pg8.sw_if_index)])
3073        r1.add_vpp_config()
3074        r2.add_vpp_config()
3075
3076        n1 = VppNeighbor(self,
3077                         self.pg7.sw_if_index,
3078                         self.pg7.remote_mac,
3079                         self.pg7.remote_ip4,
3080                         is_static=1)
3081        n2 = VppNeighbor(self,
3082                         self.pg8.sw_if_index,
3083                         self.pg8.remote_mac,
3084                         self.pg8.remote_ip4,
3085                         is_static=1)
3086        n1.add_vpp_config()
3087        n2.add_vpp_config()
3088
3089    def test_dynamic_ipless_interfaces(self):
3090        """ NAT44 interfaces without configured IP address """
3091        self.create_routes_and_neigbors()
3092        self.nat44_add_address(self.nat_addr)
3093        flags = self.config_flags.NAT_IS_INSIDE
3094        self.vapi.nat44_interface_add_del_feature(
3095            sw_if_index=self.pg7.sw_if_index,
3096            flags=flags, is_add=1)
3097        self.vapi.nat44_interface_add_del_feature(
3098            sw_if_index=self.pg8.sw_if_index,
3099            is_add=1)
3100
3101        # in2out
3102        pkts = self.create_stream_in(self.pg7, self.pg8)
3103        self.pg7.add_stream(pkts)
3104        self.pg_enable_capture(self.pg_interfaces)
3105        self.pg_start()
3106        capture = self.pg8.get_capture(len(pkts))
3107        self.verify_capture_out(capture)
3108
3109        # out2in
3110        pkts = self.create_stream_out(self.pg8, self.nat_addr)
3111        self.pg8.add_stream(pkts)
3112        self.pg_enable_capture(self.pg_interfaces)
3113        self.pg_start()
3114        capture = self.pg7.get_capture(len(pkts))
3115        self.verify_capture_in(capture, self.pg7)
3116
3117    def test_static_ipless_interfaces(self):
3118        """ NAT44 interfaces without configured IP address - 1:1 NAT """
3119
3120        self.create_routes_and_neigbors()
3121        self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
3122        flags = self.config_flags.NAT_IS_INSIDE
3123        self.vapi.nat44_interface_add_del_feature(
3124            sw_if_index=self.pg7.sw_if_index,
3125            flags=flags, is_add=1)
3126        self.vapi.nat44_interface_add_del_feature(
3127            sw_if_index=self.pg8.sw_if_index,
3128            is_add=1)
3129
3130        # out2in
3131        pkts = self.create_stream_out(self.pg8)
3132        self.pg8.add_stream(pkts)
3133        self.pg_enable_capture(self.pg_interfaces)
3134        self.pg_start()
3135        capture = self.pg7.get_capture(len(pkts))
3136        self.verify_capture_in(capture, self.pg7)
3137
3138        # in2out
3139        pkts = self.create_stream_in(self.pg7, self.pg8)
3140        self.pg7.add_stream(pkts)
3141        self.pg_enable_capture(self.pg_interfaces)
3142        self.pg_start()
3143        capture = self.pg8.get_capture(len(pkts))
3144        self.verify_capture_out(capture, self.nat_addr, True)
3145
3146    def test_static_with_port_ipless_interfaces(self):
3147        """ NAT44 interfaces without configured IP address - 1:1 NAPT """
3148
3149        self.tcp_port_out = 30606
3150        self.udp_port_out = 30607
3151        self.icmp_id_out = 30608
3152
3153        self.create_routes_and_neigbors()
3154        self.nat44_add_address(self.nat_addr)
3155        self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
3156                                      self.tcp_port_in, self.tcp_port_out,
3157                                      proto=IP_PROTOS.tcp)
3158        self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
3159                                      self.udp_port_in, self.udp_port_out,
3160                                      proto=IP_PROTOS.udp)
3161        self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
3162                                      self.icmp_id_in, self.icmp_id_out,
3163                                      proto=IP_PROTOS.icmp)
3164        flags = self.config_flags.NAT_IS_INSIDE
3165        self.vapi.nat44_interface_add_del_feature(
3166            sw_if_index=self.pg7.sw_if_index,
3167            flags=flags, is_add=1)
3168        self.vapi.nat44_interface_add_del_feature(
3169            sw_if_index=self.pg8.sw_if_index,
3170            is_add=1)
3171
3172        # out2in
3173        pkts = self.create_stream_out(self.pg8)
3174        self.pg8.add_stream(pkts)
3175        self.pg_enable_capture(self.pg_interfaces)
3176        self.pg_start()
3177        capture = self.pg7.get_capture(len(pkts))
3178        self.verify_capture_in(capture, self.pg7)
3179
3180        # in2out
3181        pkts = self.create_stream_in(self.pg7, self.pg8)
3182        self.pg7.add_stream(pkts)
3183        self.pg_enable_capture(self.pg_interfaces)
3184        self.pg_start()
3185        capture = self.pg8.get_capture(len(pkts))
3186        self.verify_capture_out(capture)
3187
3188    def test_static_unknown_proto(self):
3189        """ 1:1 NAT translate packet with unknown protocol """
3190        nat_ip = "10.0.0.10"
3191        self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
3192        flags = self.config_flags.NAT_IS_INSIDE
3193        self.vapi.nat44_interface_add_del_feature(
3194            sw_if_index=self.pg0.sw_if_index,
3195            flags=flags, is_add=1)
3196        self.vapi.nat44_interface_add_del_feature(
3197            sw_if_index=self.pg1.sw_if_index,
3198            is_add=1)
3199
3200        # in2out
3201        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3202             IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3203             GRE() /
3204             IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3205             TCP(sport=1234, dport=1234))
3206        self.pg0.add_stream(p)
3207        self.pg_enable_capture(self.pg_interfaces)
3208        self.pg_start()
3209        p = self.pg1.get_capture(1)
3210        packet = p[0]
3211        try:
3212            self.assertEqual(packet[IP].src, nat_ip)
3213            self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3214            self.assertEqual(packet.haslayer(GRE), 1)
3215            self.assert_packet_checksums_valid(packet)
3216        except:
3217            self.logger.error(ppp("Unexpected or invalid packet:", packet))
3218            raise
3219
3220        # out2in
3221        p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3222             IP(src=self.pg1.remote_ip4, dst=nat_ip) /
3223             GRE() /
3224             IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3225             TCP(sport=1234, dport=1234))
3226        self.pg1.add_stream(p)
3227        self.pg_enable_capture(self.pg_interfaces)
3228        self.pg_start()
3229        p = self.pg0.get_capture(1)
3230        packet = p[0]
3231        try:
3232            self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
3233            self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
3234            self.assertEqual(packet.haslayer(GRE), 1)
3235            self.assert_packet_checksums_valid(packet)
3236        except:
3237            self.logger.error(ppp("Unexpected or invalid packet:", packet))
3238            raise
3239
3240    def test_hairpinning_static_unknown_proto(self):
3241        """ 1:1 NAT translate packet with unknown protocol - hairpinning """
3242
3243        host = self.pg0.remote_hosts[0]
3244        server = self.pg0.remote_hosts[1]
3245
3246        host_nat_ip = "10.0.0.10"
3247        server_nat_ip = "10.0.0.11"
3248
3249        self.nat44_add_static_mapping(host.ip4, host_nat_ip)
3250        self.nat44_add_static_mapping(server.ip4, server_nat_ip)
3251        flags = self.config_flags.NAT_IS_INSIDE
3252        self.vapi.nat44_interface_add_del_feature(
3253            sw_if_index=self.pg0.sw_if_index,
3254            flags=flags, is_add=1)
3255        self.vapi.nat44_interface_add_del_feature(
3256            sw_if_index=self.pg1.sw_if_index,
3257            is_add=1)
3258
3259        # host to server
3260        p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
3261             IP(src=host.ip4, dst=server_nat_ip) /
3262             GRE() /
3263             IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3264             TCP(sport=1234, dport=1234))
3265        self.pg0.add_stream(p)
3266        self.pg_enable_capture(self.pg_interfaces)
3267        self.pg_start()
3268        p = self.pg0.get_capture(1)
3269        packet = p[0]
3270        try:
3271            self.assertEqual(packet[IP].src, host_nat_ip)
3272            self.assertEqual(packet[IP].dst, server.ip4)
3273            self.assertEqual(packet.haslayer(GRE), 1)
3274            self.assert_packet_checksums_valid(packet)
3275        except:
3276            self.logger.error(ppp("Unexpected or invalid packet:", packet))
3277            raise
3278
3279        # server to host
3280        p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3281             IP(src=server.ip4, dst=host_nat_ip) /
3282             GRE() /
3283             IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3284             TCP(sport=1234, dport=1234))
3285        self.pg0.add_stream(p)
3286        self.pg_enable_capture(self.pg_interfaces)
3287        self.pg_start()
3288        p = self.pg0.get_capture(1)
3289        packet = p[0]
3290        try:
3291            self.assertEqual(packet[IP].src, server_nat_ip)
3292            self.assertEqual(packet[IP].dst, host.ip4)
3293            self.assertEqual(packet.haslayer(GRE), 1)
3294            self.assert_packet_checksums_valid(packet)
3295        except:
3296            self.logger.error(ppp("Unexpected or invalid packet:", packet))
3297            raise
3298
3299    def test_output_feature(self):
3300        """ NAT44 interface output feature (in2out postrouting) """
3301        self.nat44_add_address(self.nat_addr)
3302        flags = self.config_flags.NAT_IS_INSIDE
3303        self.vapi.nat44_interface_add_del_output_feature(
3304            is_add=1, flags=flags,
3305            sw_if_index=self.pg0.sw_if_index)
3306        self.vapi.nat44_interface_add_del_output_feature(
3307            is_add=1, flags=flags,
3308            sw_if_index=self.pg1.sw_if_index)
3309        self.vapi.nat44_interface_add_del_output_feature(
3310            is_add=1,
3311            sw_if_index=self.pg3.sw_if_index)
3312
3313        # in2out
3314        pkts = self.create_stream_in(self.pg0, self.pg3)
3315        self.pg0.add_stream(pkts)
3316        self.pg_enable_capture(self.pg_interfaces)
3317        self.pg_start()
3318        capture = self.pg3.get_capture(len(pkts))
3319        self.verify_capture_out(capture)
3320
3321        # out2in
3322        pkts = self.create_stream_out(self.pg3)
3323        self.pg3.add_stream(pkts)
3324        self.pg_enable_capture(self.pg_interfaces)
3325        self.pg_start()
3326        capture = self.pg0.get_capture(len(pkts))
3327        self.verify_capture_in(capture, self.pg0)
3328
3329        # from non-NAT interface to NAT inside interface
3330        pkts = self.create_stream_in(self.pg2, self.pg0)
3331        self.pg2.add_stream(pkts)
3332        self.pg_enable_capture(self.pg_interfaces)
3333        self.pg_start()
3334        capture = self.pg0.get_capture(len(pkts))
3335        self.verify_capture_no_translation(capture, self.pg2, self.pg0)
3336
3337    def test_output_feature_vrf_aware(self):
3338        """ NAT44 interface output feature VRF aware (in2out postrouting) """
3339        nat_ip_vrf10 = "10.0.0.10"
3340        nat_ip_vrf20 = "10.0.0.20"
3341
3342        r1 = VppIpRoute(self, self.pg3.remote_ip4, 32,
3343                        [VppRoutePath(self.pg3.remote_ip4,
3344                                      self.pg3.sw_if_index)],
3345                        table_id=10)
3346        r2 = VppIpRoute(self, self.pg3.remote_ip4, 32,
3347                        [VppRoutePath(self.pg3.remote_ip4,
3348                                      self.pg3.sw_if_index)],
3349                        table_id=20)
3350        r1.add_vpp_config()
3351        r2.add_vpp_config()
3352
3353        self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
3354        self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
3355        flags = self.config_flags.NAT_IS_INSIDE
3356        self.vapi.nat44_interface_add_del_output_feature(
3357            is_add=1, flags=flags,
3358            sw_if_index=self.pg4.sw_if_index)
3359        self.vapi.nat44_interface_add_del_output_feature(
3360            is_add=1, flags=flags,
3361            sw_if_index=self.pg6.sw_if_index)
3362        self.vapi.nat44_interface_add_del_output_feature(
3363            is_add=1,
3364            sw_if_index=self.pg3.sw_if_index)
3365
3366        # in2out VRF 10
3367        pkts = self.create_stream_in(self.pg4, self.pg3)
3368        self.pg4.add_stream(pkts)
3369        self.pg_enable_capture(self.pg_interfaces)
3370        self.pg_start()
3371        capture = self.pg3.get_capture(len(pkts))
3372        self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
3373
3374        # out2in VRF 10
3375        pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
3376        self.pg3.add_stream(pkts)
3377        self.pg_enable_capture(self.pg_interfaces)
3378        self.pg_start()
3379        capture = self.pg4.get_capture(len(pkts))
3380        self.verify_capture_in(capture, self.pg4)
3381
3382        # in2out VRF 20
3383        pkts = self.create_stream_in(self.pg6, self.pg3)
3384        self.pg6.add_stream(pkts)
3385        self.pg_enable_capture(self.pg_interfaces)
3386        self.pg_start()
3387        capture = self.pg3.get_capture(len(pkts))
3388        self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
3389
3390        # out2in VRF 20
3391        pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
3392        self.pg3.add_stream(pkts)
3393        self.pg_enable_capture(self.pg_interfaces)
3394        self.pg_start()
3395        capture = self.pg6.get_capture(len(pkts))
3396        self.verify_capture_in(capture, self.pg6)
3397
3398    def test_output_feature_hairpinning(self):
3399        """ NAT44 interface output feature hairpinning (in2out postrouting) """
3400        host = self.pg0.remote_hosts[0]
3401        server = self.pg0.remote_hosts[1]
3402        host_in_port = 1234
3403        host_out_port = 0
3404        server_in_port = 5678
3405        server_out_port = 8765
3406
3407        self.nat44_add_address(self.nat_addr)
3408        flags = self.config_flags.NAT_IS_INSIDE
3409        self.vapi.nat44_interface_add_del_output_feature(
3410            is_add=1, flags=flags,
3411            sw_if_index=self.pg0.sw_if_index)
3412        self.vapi.nat44_interface_add_del_output_feature(
3413            is_add=1,
3414            sw_if_index=self.pg1.sw_if_index)
3415
3416        # add static mapping for server
3417        self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3418                                      server_in_port, server_out_port,
3419                                      proto=IP_PROTOS.tcp)
3420
3421        # send packet from host to server
3422        p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3423             IP(src=host.ip4, dst=self.nat_addr) /
3424             TCP(sport=host_in_port, dport=server_out_port))
3425        self.pg0.add_stream(p)
3426        self.pg_enable_capture(self.pg_interfaces)
3427        self.pg_start()
3428        capture = self.pg0.get_capture(1)
3429        p = capture[0]
3430        try:
3431            ip = p[IP]
3432            tcp = p[TCP]
3433            self.assertEqual(ip.src, self.nat_addr)
3434            self.assertEqual(ip.dst, server.ip4)
3435            self.assertNotEqual(tcp.sport, host_in_port)
3436            self.assertEqual(tcp.dport, server_in_port)
3437            self.assert_packet_checksums_valid(p)
3438            host_out_port = tcp.sport
3439        except:
3440            self.logger.error(ppp("Unexpected or invalid packet:", p))
3441            raise
3442
3443        # send reply from server to host
3444        p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3445             IP(src=server.ip4, dst=self.nat_addr) /
3446             TCP(sport=server_in_port, dport=host_out_port))
3447        self.pg0.add_stream(p)
3448        self.pg_enable_capture(self.pg_interfaces)
3449        self.pg_start()
3450        capture = self.pg0.get_capture(1)
3451        p = capture[0]
3452        try:
3453            ip = p[IP]
3454            tcp = p[TCP]
3455            self.assertEqual(ip.src, self.nat_addr)
3456            self.assertEqual(ip.dst, host.ip4)
3457            self.assertEqual(tcp.sport, server_out_port)
3458            self.assertEqual(tcp.dport, host_in_port)
3459            self.assert_packet_checksums_valid(p)
3460        except:
3461            self.logger.error(ppp("Unexpected or invalid packet:", p))
3462            raise
3463
3464    def test_one_armed_nat44(self):
3465        """ One armed NAT44 """
3466        remote_host = self.pg9.remote_hosts[0]
3467        local_host = self.pg9.remote_hosts[1]
3468        external_port = 0
3469
3470        self.nat44_add_address(self.nat_addr)
3471        flags = self.config_flags.NAT_IS_INSIDE
3472        self.vapi.nat44_interface_add_del_feature(
3473            sw_if_index=self.pg9.sw_if_index,
3474            is_add=1)
3475        self.vapi.nat44_interface_add_del_feature(
3476            sw_if_index=self.pg9.sw_if_index,
3477            flags=flags, is_add=1)
3478
3479        # in2out
3480        p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3481             IP(src=local_host.ip4, dst=remote_host.ip4) /
3482             TCP(sport=12345, dport=80))
3483        self.pg9.add_stream(p)
3484        self.pg_enable_capture(self.pg_interfaces)
3485        self.pg_start()
3486        capture = self.pg9.get_capture(1)
3487        p = capture[0]
3488        try:
3489            ip = p[IP]
3490            tcp = p[TCP]
3491            self.assertEqual(ip.src, self.nat_addr)
3492            self.assertEqual(ip.dst, remote_host.ip4)
3493            self.assertNotEqual(tcp.sport, 12345)
3494            external_port = tcp.sport
3495            self.assertEqual(tcp.dport, 80)
3496            self.assert_packet_checksums_valid(p)
3497        except:
3498            self.logger.error(ppp("Unexpected or invalid packet:", p))
3499            raise
3500
3501        # out2in
3502        p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3503             IP(src=remote_host.ip4, dst=self.nat_addr) /
3504             TCP(sport=80, dport=external_port))
3505        self.pg9.add_stream(p)
3506        self.pg_enable_capture(self.pg_interfaces)
3507        self.pg_start()
3508        capture = self.pg9.get_capture(1)
3509        p = capture[0]
3510        try:
3511            ip = p[IP]
3512            tcp = p[TCP]
3513            self.assertEqual(ip.src, remote_host.ip4)
3514            self.assertEqual(ip.dst, local_host.ip4)
3515            self.assertEqual(tcp.sport, 80)
3516            self.assertEqual(tcp.dport, 12345)
3517            self.assert_packet_checksums_valid(p)
3518        except:
3519            self.logger.error(ppp("Unexpected or invalid packet:", p))
3520            raise
3521
3522        err = self.statistics.get_err_counter(
3523            '/err/nat44-classify/next in2out')
3524        self.assertEqual(err, 1)
3525        err = self.statistics.get_err_counter(
3526            '/err/nat44-classify/next out2in')
3527        self.assertEqual(err, 1)
3528
3529    def test_del_session(self):
3530        """ Delete NAT44 session """
3531        self.nat44_add_address(self.nat_addr)
3532        flags = self.config_flags.NAT_IS_INSIDE
3533        self.vapi.nat44_interface_add_del_feature(
3534            sw_if_index=self.pg0.sw_if_index,
3535            flags=flags, is_add=1)
3536        self.vapi.nat44_interface_add_del_feature(
3537            sw_if_index=self.pg1.sw_if_index,
3538            is_add=1)
3539
3540        pkts = self.create_stream_in(self.pg0, self.pg1)
3541        self.pg0.add_stream(pkts)
3542        self.pg_enable_capture(self.pg_interfaces)
3543        self.pg_start()
3544        self.pg1.get_capture(len(pkts))
3545
3546        sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3547        nsessions = len(sessions)
3548
3549        self.vapi.nat44_del_session(address=sessions[0].inside_ip_address,
3550                                    port=sessions[0].inside_port,
3551                                    protocol=sessions[0].protocol,
3552                                    flags=self.config_flags.NAT_IS_INSIDE)
3553        self.vapi.nat44_del_session(address=sessions[1].outside_ip_address,
3554                                    port=sessions[1].outside_port,
3555                                    protocol=sessions[1].protocol)
3556
3557        sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3558        self.assertEqual(nsessions - len(sessions), 2)
3559
3560        self.vapi.nat44_del_session(address=sessions[0].inside_ip_address,
3561                                    port=sessions[0].inside_port,
3562                                    protocol=sessions[0].protocol,
3563                                    flags=self.config_flags.NAT_IS_INSIDE)
3564
3565        self.verify_no_nat44_user()
3566
3567    def test_frag_in_order(self):
3568        """ NAT44 translate fragments arriving in order """
3569
3570        self.nat44_add_address(self.nat_addr)
3571        flags = self.config_flags.NAT_IS_INSIDE
3572        self.vapi.nat44_interface_add_del_feature(
3573            sw_if_index=self.pg0.sw_if_index,
3574            flags=flags, is_add=1)
3575        self.vapi.nat44_interface_add_del_feature(
3576            sw_if_index=self.pg1.sw_if_index,
3577            is_add=1)
3578
3579        self.frag_in_order(proto=IP_PROTOS.tcp)
3580        self.frag_in_order(proto=IP_PROTOS.udp)
3581        self.frag_in_order(proto=IP_PROTOS.icmp)
3582
3583    def test_frag_forwarding(self):
3584        """ NAT44 forwarding fragment test """
3585        self.vapi.nat44_add_del_interface_addr(
3586            is_add=1,
3587            sw_if_index=self.pg1.sw_if_index)
3588        flags = self.config_flags.NAT_IS_INSIDE
3589        self.vapi.nat44_interface_add_del_feature(
3590            sw_if_index=self.pg0.sw_if_index,
3591            flags=flags, is_add=1)
3592        self.vapi.nat44_interface_add_del_feature(
3593            sw_if_index=self.pg1.sw_if_index,
3594            is_add=1)
3595        self.vapi.nat44_forwarding_enable_disable(enable=1)
3596
3597        data = b"A" * 16 + b"B" * 16 + b"C" * 3
3598        pkts = self.create_stream_frag(self.pg1,
3599                                       self.pg0.remote_ip4,
3600                                       4789,
3601                                       4789,
3602                                       data,
3603                                       proto=IP_PROTOS.udp)
3604        self.pg1.add_stream(pkts)
3605        self.pg_enable_capture(self.pg_interfaces)
3606        self.pg_start()
3607        frags = self.pg0.get_capture(len(pkts))
3608        p = self.reass_frags_and_verify(frags,
3609                                        self.pg1.remote_ip4,
3610                                        self.pg0.remote_ip4)
3611        self.assertEqual(p[UDP].sport, 4789)
3612        self.assertEqual(p[UDP].dport, 4789)
3613        self.assertEqual(data, p[Raw].load)
3614
3615    def test_reass_hairpinning(self):
3616        """ NAT44 fragments hairpinning """
3617
3618        self.server = self.pg0.remote_hosts[1]
3619        self.host_in_port = random.randint(1025, 65535)
3620        self.server_in_port = random.randint(1025, 65535)
3621        self.server_out_port = random.randint(1025, 65535)
3622
3623        self.nat44_add_address(self.nat_addr)
3624        flags = self.config_flags.NAT_IS_INSIDE
3625        self.vapi.nat44_interface_add_del_feature(
3626            sw_if_index=self.pg0.sw_if_index,
3627            flags=flags, is_add=1)
3628        self.vapi.nat44_interface_add_del_feature(
3629            sw_if_index=self.pg1.sw_if_index,
3630            is_add=1)
3631        # add static mapping for server
3632        self.nat44_add_static_mapping(self.server.ip4, self.nat_addr,
3633                                      self.server_in_port,
3634                                      self.server_out_port,
3635                                      proto=IP_PROTOS.tcp)
3636        self.nat44_add_static_mapping(self.server.ip4, self.nat_addr,
3637                                      self.server_in_port,
3638                                      self.server_out_port,
3639                                      proto=IP_PROTOS.udp)
3640        self.nat44_add_static_mapping(self.server.ip4, self.nat_addr)
3641
3642        self.reass_hairpinning(proto=IP_PROTOS.tcp)
3643        self.reass_hairpinning(proto=IP_PROTOS.udp)
3644        self.reass_hairpinning(proto=IP_PROTOS.icmp)
3645
3646    def test_frag_out_of_order(self):
3647        """ NAT44 translate fragments arriving out of order """
3648
3649        self.nat44_add_address(self.nat_addr)
3650        flags = self.config_flags.NAT_IS_INSIDE
3651        self.vapi.nat44_interface_add_del_feature(
3652            sw_if_index=self.pg0.sw_if_index,
3653            flags=flags, is_add=1)
3654        self.vapi.nat44_interface_add_del_feature(
3655            sw_if_index=self.pg1.sw_if_index,
3656            is_add=1)
3657
3658        self.frag_out_of_order(proto=IP_PROTOS.tcp)
3659        self.frag_out_of_order(proto=IP_PROTOS.udp)
3660        self.frag_out_of_order(proto=IP_PROTOS.icmp)
3661
3662    def test_port_restricted(self):
3663        """ Port restricted NAT44 (MAP-E CE) """
3664        self.nat44_add_address(self.nat_addr)
3665        flags = self.config_flags.NAT_IS_INSIDE
3666        self.vapi.nat44_interface_add_del_feature(
3667            sw_if_index=self.pg0.sw_if_index,
3668            flags=flags, is_add=1)
3669        self.vapi.nat44_interface_add_del_feature(
3670            sw_if_index=self.pg1.sw_if_index,
3671            is_add=1)
3672        self.vapi.nat_set_addr_and_port_alloc_alg(alg=1,
3673                                                  psid_offset=6,
3674                                                  psid_length=6,
3675                                                  psid=10)
3676
3677        p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3678             IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3679             TCP(sport=4567, dport=22))
3680        self.pg0.add_stream(p)
3681        self.pg_enable_capture(self.pg_interfaces)
3682        self.pg_start()
3683        capture = self.pg1.get_capture(1)
3684        p = capture[0]
3685        try:
3686            ip = p[IP]
3687            tcp = p[TCP]
3688            self.assertEqual(ip.dst, self.pg1.remote_ip4)
3689            self.assertEqual(ip.src, self.nat_addr)
3690            self.assertEqual(tcp.dport, 22)
3691            self.assertNotEqual(tcp.sport, 4567)
3692            self.assertEqual((tcp.sport >> 6) & 63, 10)
3693            self.assert_packet_checksums_valid(p)
3694        except:
3695            self.logger.error(ppp("Unexpected or invalid packet:", p))
3696            raise
3697
3698    def test_port_range(self):
3699        """ External address port range """
3700        self.nat44_add_address(self.nat_addr)
3701        flags = self.config_flags.NAT_IS_INSIDE
3702        self.vapi.nat44_interface_add_del_feature(
3703            sw_if_index=self.pg0.sw_if_index,
3704            flags=flags, is_add=1)
3705        self.vapi.nat44_interface_add_del_feature(
3706            sw_if_index=self.pg1.sw_if_index,
3707            is_add=1)
3708        self.vapi.nat_set_addr_and_port_alloc_alg(alg=2,
3709                                                  start_port=1025,
3710                                                  end_port=1027)
3711
3712        pkts = []
3713        for port in range(0, 5):
3714            p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3715                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3716                 TCP(sport=1125 + port))
3717            pkts.append(p)
3718        self.pg0.add_stream(pkts)
3719        self.pg_enable_capture(self.pg_interfaces)
3720        self.pg_start()
3721        capture = self.pg1.get_capture(3)
3722        for p in capture:
3723            tcp = p[TCP]
3724            self.assertGreaterEqual(tcp.sport, 1025)
3725            self.assertLessEqual(tcp.sport, 1027)
3726
3727    def test_multiple_outside_vrf(self):
3728        """ Multiple outside VRF """
3729        vrf_id1 = 1
3730        vrf_id2 = 2
3731
3732        self.pg1.unconfig_ip4()
3733        self.pg2.unconfig_ip4()
3734        self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id1})
3735        self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id2})
3736        self.pg1.set_table_ip4(vrf_id1)
3737        self.pg2.set_table_ip4(vrf_id2)
3738        self.pg1.config_ip4()
3739        self.pg2.config_ip4()
3740        self.pg1.resolve_arp()
3741        self.pg2.resolve_arp()
3742
3743        self.nat44_add_address(self.nat_addr)
3744        flags = self.config_flags.NAT_IS_INSIDE
3745        self.vapi.nat44_interface_add_del_feature(
3746            sw_if_index=self.pg0.sw_if_index,
3747            flags=flags, is_add=1)
3748        self.vapi.nat44_interface_add_del_feature(
3749            sw_if_index=self.pg1.sw_if_index,
3750            is_add=1)
3751        self.vapi.nat44_interface_add_del_feature(
3752            sw_if_index=self.pg2.sw_if_index,
3753            is_add=1)
3754
3755        try:
3756            # first VRF
3757            pkts = self.create_stream_in(self.pg0, self.pg1)
3758            self.pg0.add_stream(pkts)
3759            self.pg_enable_capture(self.pg_interfaces)
3760            self.pg_start()
3761            capture = self.pg1.get_capture(len(pkts))
3762            self.verify_capture_out(capture, self.nat_addr)
3763
3764            pkts = self.create_stream_out(self.pg1, self.nat_addr)
3765            self.pg1.add_stream(pkts)
3766            self.pg_enable_capture(self.pg_interfaces)
3767            self.pg_start()
3768            capture = self.pg0.get_capture(len(pkts))
3769            self.verify_capture_in(capture, self.pg0)
3770
3771            self.tcp_port_in = 60303
3772            self.udp_port_in = 60304
3773            self.icmp_id_in = 60305
3774
3775            # second VRF
3776            pkts = self.create_stream_in(self.pg0, self.pg2)
3777            self.pg0.add_stream(pkts)
3778            self.pg_enable_capture(self.pg_interfaces)
3779            self.pg_start()
3780            capture = self.pg2.get_capture(len(pkts))
3781            self.verify_capture_out(capture, self.nat_addr)
3782
3783            pkts = self.create_stream_out(self.pg2, self.nat_addr)
3784            self.pg2.add_stream(pkts)
3785            self.pg_enable_capture(self.pg_interfaces)
3786            self.pg_start()
3787            capture = self.pg0.get_capture(len(pkts))
3788            self.verify_capture_in(capture, self.pg0)
3789
3790        finally:
3791            self.nat44_add_address(self.nat_addr, is_add=0)
3792            self.pg1.unconfig_ip4()
3793            self.pg2.unconfig_ip4()
3794            self.pg1.set_table_ip4(0)
3795            self.pg2.set_table_ip4(0)
3796            self.pg1.config_ip4()
3797            self.pg2.config_ip4()
3798            self.pg1.resolve_arp()
3799            self.pg2.resolve_arp()
3800
3801    @unittest.skipUnless(running_extended_tests, "part of extended tests")
3802    def test_session_timeout(self):
3803        """ NAT44 session timeouts """
3804        self.nat44_add_address(self.nat_addr)
3805        flags = self.config_flags.NAT_IS_INSIDE
3806        self.vapi.nat44_interface_add_del_feature(
3807            sw_if_index=self.pg0.sw_if_index,
3808            flags=flags, is_add=1)
3809        self.vapi.nat44_interface_add_del_feature(
3810            sw_if_index=self.pg1.sw_if_index,
3811            is_add=1)
3812        self.vapi.nat_set_timeouts(udp=5, tcp_established=7440,
3813                                   tcp_transitory=240, icmp=60)
3814
3815        max_sessions = 1000
3816        pkts = []
3817        for i in range(0, max_sessions):
3818            src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
3819            p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3820                 IP(src=src, dst=self.pg1.remote_ip4) /
3821                 UDP(sport=1025, dport=53))
3822            pkts.append(p)
3823        self.pg0.add_stream(pkts)
3824        self.pg_enable_capture(self.pg_interfaces)
3825        self.pg_start()
3826        self.pg1.get_capture(max_sessions)
3827
3828        sleep(6)
3829
3830        pkts = []
3831        for i in range(0, max_sessions):
3832            src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
3833            p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3834                 IP(src=src, dst=self.pg1.remote_ip4) /
3835                 UDP(sport=1026, dport=53))
3836            pkts.append(p)
3837        self.pg0.add_stream(pkts)
3838        self.pg_enable_capture(self.pg_interfaces)
3839        self.pg_start()
3840        self.pg1.get_capture(max_sessions)
3841
3842        nsessions = 0
3843        users = self.vapi.nat44_user_dump()
3844        for user in users:
3845            nsessions = nsessions + user.nsessions
3846        self.assertLess(nsessions, 2 * max_sessions)
3847
3848    def test_mss_clamping(self):
3849        """ TCP MSS clamping """
3850        self.nat44_add_address(self.nat_addr)
3851        flags = self.config_flags.NAT_IS_INSIDE
3852        self.vapi.nat44_interface_add_del_feature(
3853            sw_if_index=self.pg0.sw_if_index,
3854            flags=flags, is_add=1)
3855        self.vapi.nat44_interface_add_del_feature(
3856            sw_if_index=self.pg1.sw_if_index,
3857            is_add=1)
3858
3859        p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3860             IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3861             TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3862                 flags="S", options=[('MSS', 1400)]))
3863
3864        self.vapi.nat_set_mss_clamping(enable=1, mss_value=1000)
3865        self.pg0.add_stream(p)
3866        self.pg_enable_capture(self.pg_interfaces)
3867        self.pg_start()
3868        capture = self.pg1.get_capture(1)
3869        # Negotiated MSS value greater than configured - changed
3870        self.verify_mss_value(capture[0], 1000)
3871
3872        self.vapi.nat_set_mss_clamping(enable=0, mss_value=1500)
3873        self.pg0.add_stream(p)
3874        self.pg_enable_capture(self.pg_interfaces)
3875        self.pg_start()
3876        capture = self.pg1.get_capture(1)
3877        # MSS clamping disabled - negotiated MSS unchanged
3878        self.verify_mss_value(capture[0], 1400)
3879
3880        self.vapi.nat_set_mss_clamping(enable=1, mss_value=1500)
3881        self.pg0.add_stream(p)
3882        self.pg_enable_capture(self.pg_interfaces)
3883        self.pg_start()
3884        capture = self.pg1.get_capture(1)
3885        # Negotiated MSS value smaller than configured - unchanged
3886        self.verify_mss_value(capture[0], 1400)
3887
3888    @unittest.skipUnless(running_extended_tests, "part of extended tests")
3889    def test_ha_send(self):
3890        """ Send HA session synchronization events (active) """
3891        self.nat44_add_address(self.nat_addr)
3892        flags = self.config_flags.NAT_IS_INSIDE
3893        self.vapi.nat44_interface_add_del_feature(
3894            sw_if_index=self.pg0.sw_if_index,
3895            flags=flags, is_add=1)
3896        self.vapi.nat44_interface_add_del_feature(
3897            sw_if_index=self.pg1.sw_if_index,
3898            is_add=1)
3899        self.vapi.nat_ha_set_listener(ip_address=self.pg3.local_ip4,
3900                                      port=12345,
3901                                      path_mtu=512)
3902        self.vapi.nat_ha_set_failover(ip_address=self.pg3.remote_ip4,
3903                                      port=12346, session_refresh_interval=10)
3904        bind_layers(UDP, HANATStateSync, sport=12345)
3905
3906        # create sessions
3907        pkts = self.create_stream_in(self.pg0, self.pg1)
3908        self.pg0.add_stream(pkts)
3909        self.pg_enable_capture(self.pg_interfaces)
3910        self.pg_start()
3911        capture = self.pg1.get_capture(len(pkts))
3912        self.verify_capture_out(capture)
3913        # active send HA events
3914        self.vapi.nat_ha_flush()
3915        stats = self.statistics.get_counter('/nat44/ha/add-event-send')
3916        self.assertEqual(stats[0][0], 3)
3917        capture = self.pg3.get_capture(1)
3918        p = capture[0]
3919        self.assert_packet_checksums_valid(p)
3920        try:
3921            ip = p[IP]
3922            udp = p[UDP]
3923            hanat = p[HANATStateSync]
3924        except IndexError:
3925            self.logger.error(ppp("Invalid packet:", p))
3926            raise
3927        else:
3928            self.assertEqual(ip.src, self.pg3.local_ip4)
3929            self.assertEqual(ip.dst, self.pg3.remote_ip4)
3930            self.assertEqual(udp.sport, 12345)
3931            self.assertEqual(udp.dport, 12346)
3932            self.assertEqual(hanat.version, 1)
3933            self.assertEqual(hanat.thread_index, 0)
3934            self.assertEqual(hanat.count, 3)
3935            seq = hanat.sequence_number
3936            for event in hanat.events:
3937                self.assertEqual(event.event_type, 1)
3938                self.assertEqual(event.in_addr, self.pg0.remote_ip4)
3939                self.assertEqual(event.out_addr, self.nat_addr)
3940                self.assertEqual(event.fib_index, 0)
3941
3942        # ACK received events
3943        ack = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
3944               IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
3945               UDP(sport=12346, dport=12345) /
3946               HANATStateSync(sequence_number=seq, flags='ACK'))
3947        self.pg3.add_stream(ack)
3948        self.pg_start()
3949        stats = self.statistics.get_counter('/nat44/ha/ack-recv')
3950        self.assertEqual(stats[0][0], 1)
3951
3952        # delete one session
3953        self.pg_enable_capture(self.pg_interfaces)
3954        self.vapi.nat44_del_session(address=self.pg0.remote_ip4,
3955                                    port=self.tcp_port_in,
3956                                    protocol=IP_PROTOS.tcp,
3957                                    flags=self.config_flags.NAT_IS_INSIDE)
3958        self.vapi.nat_ha_flush()
3959        stats = self.statistics.get_counter('/nat44/ha/del-event-send')
3960        self.assertEqual(stats[0][0], 1)
3961        capture = self.pg3.get_capture(1)
3962        p = capture[0]
3963        try:
3964            hanat = p[HANATStateSync]
3965        except IndexError:
3966            self.logger.error(ppp("Invalid packet:", p))
3967            raise
3968        else:
3969            self.assertGreater(hanat.sequence_number, seq)
3970
3971        # do not send ACK, active retry send HA event again
3972        self.pg_enable_capture(self.pg_interfaces)
3973        sleep(12)
3974        stats = self.statistics.get_counter('/nat44/ha/retry-count')
3975        self.assertEqual(stats[0][0], 3)
3976        stats = self.statistics.get_counter('/nat44/ha/missed-count')
3977        self.assertEqual(stats[0][0], 1)
3978        capture = self.pg3.get_capture(3)
3979        for packet in capture:
3980            self.assertEqual(packet, p)
3981
3982        # session counters refresh
3983        pkts = self.create_stream_out(self.pg1)
3984        self.pg1.add_stream(pkts)
3985        self.pg_enable_capture(self.pg_interfaces)
3986        self.pg_start()
3987        self.pg0.get_capture(2)
3988        self.vapi.nat_ha_flush()
3989        stats = self.statistics.get_counter('/nat44/ha/refresh-event-send')
3990        self.assertEqual(stats[0][0], 2)
3991        capture = self.pg3.get_capture(1)
3992        p = capture[0]
3993        self.assert_packet_checksums_valid(p)
3994        try:
3995            ip = p[IP]
3996            udp = p[UDP]
3997            hanat = p[HANATStateSync]
3998        except IndexError:
3999            self.logger.error(ppp("Invalid packet:", p))
4000            raise
4001        else:
4002            self.assertEqual(ip.src, self.pg3.local_ip4)
4003            self.assertEqual(ip.dst, self.pg3.remote_ip4)
4004            self.assertEqual(udp.sport, 12345)
4005            self.assertEqual(udp.dport, 12346)
4006            self.assertEqual(hanat.version, 1)
4007            self.assertEqual(hanat.count, 2)
4008            seq = hanat.sequence_number
4009            for event in hanat.events:
4010                self.assertEqual(event.event_type, 3)
4011                self.assertEqual(event.out_addr, self.nat_addr)
4012                self.assertEqual(event.fib_index, 0)
4013                self.assertEqual(event.total_pkts, 2)
4014                self.assertGreater(event.total_bytes, 0)
4015
4016        ack = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
4017               IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
4018               UDP(sport=12346, dport=12345) /
4019               HANATStateSync(sequence_number=seq, flags='ACK'))
4020        self.pg3.add_stream(ack)
4021        self.pg_start()
4022        stats = self.statistics.get_counter('/nat44/ha/ack-recv')
4023        self.assertEqual(stats[0][0], 2)
4024
4025    def test_ha_recv(self):
4026        """ Receive HA session synchronization events (passive) """
4027        self.nat44_add_address(self.nat_addr)
4028        flags = self.config_flags.NAT_IS_INSIDE
4029        self.vapi.nat44_interface_add_del_feature(
4030            sw_if_index=self.pg0.sw_if_index,
4031            flags=flags, is_add=1)
4032        self.vapi.nat44_interface_add_del_feature(
4033            sw_if_index=self.pg1.sw_if_index,
4034            is_add=1)
4035        self.vapi.nat_ha_set_listener(ip_address=self.pg3.local_ip4,
4036                                      port=12345,
4037                                      path_mtu=512)
4038        bind_layers(UDP, HANATStateSync, sport=12345)
4039
4040        self.tcp_port_out = random.randint(1025, 65535)
4041        self.udp_port_out = random.randint(1025, 65535)
4042
4043        # send HA session add events to failover/passive
4044        p = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
4045             IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
4046             UDP(sport=12346, dport=12345) /
4047             HANATStateSync(sequence_number=1, events=[
4048                 Event(event_type='add', protocol='tcp',
4049                       in_addr=self.pg0.remote_ip4, out_addr=self.nat_addr,
4050                       in_port=self.tcp_port_in, out_port=self.tcp_port_out,
4051                       eh_addr=self.pg1.remote_ip4,
4052                       ehn_addr=self.pg1.remote_ip4,
4053                       eh_port=self.tcp_external_port,
4054                       ehn_port=self.tcp_external_port, fib_index=0),
4055                 Event(event_type='add', protocol='udp',
4056                       in_addr=self.pg0.remote_ip4, out_addr=self.nat_addr,
4057                       in_port=self.udp_port_in, out_port=self.udp_port_out,
4058                       eh_addr=self.pg1.remote_ip4,
4059                       ehn_addr=self.pg1.remote_ip4,
4060                       eh_port=self.udp_external_port,
4061                       ehn_port=self.udp_external_port, fib_index=0)]))
4062
4063        self.pg3.add_stream(p)
4064        self.pg_enable_capture(self.pg_interfaces)
4065        self.pg_start()
4066        # receive ACK
4067        capture = self.pg3.get_capture(1)
4068        p = capture[0]
4069        try:
4070            hanat = p[HANATStateSync]
4071        except IndexError:
4072            self.logger.error(ppp("Invalid packet:", p))
4073            raise
4074        else:
4075            self.assertEqual(hanat.sequence_number, 1)
4076            self.assertEqual(hanat.flags, 'ACK')
4077            self.assertEqual(hanat.version, 1)
4078            self.assertEqual(hanat.thread_index, 0)
4079        stats = self.statistics.get_counter('/nat44/ha/ack-send')
4080        self.assertEqual(stats[0][0], 1)
4081        stats = self.statistics.get_counter('/nat44/ha/add-event-recv')
4082        self.assertEqual(stats[0][0], 2)
4083        users = self.statistics.get_counter('/nat44/total-users')
4084        self.assertEqual(users[0][0], 1)
4085        sessions = self.statistics.get_counter('/nat44/total-sessions')
4086        self.assertEqual(sessions[0][0], 2)
4087        users = self.vapi.nat44_user_dump()
4088        self.assertEqual(len(users), 1)
4089        self.assertEqual(str(users[0].ip_address),
4090                         self.pg0.remote_ip4)
4091        # there should be 2 sessions created by HA
4092        sessions = self.vapi.nat44_user_session_dump(users[0].ip_address,
4093                                                     users[0].vrf_id)
4094        self.assertEqual(len(sessions), 2)
4095        for session in sessions:
4096            self.assertEqual(str(session.inside_ip_address),
4097                             self.pg0.remote_ip4)
4098            self.assertEqual(str(session.outside_ip_address),
4099                             self.nat_addr)
4100            self.assertIn(session.inside_port,
4101                          [self.tcp_port_in, self.udp_port_in])
4102            self.assertIn(session.outside_port,
4103                          [self.tcp_port_out, self.udp_port_out])
4104            self.assertIn(session.protocol, [IP_PROTOS.tcp, IP_PROTOS.udp])
4105
4106        # send HA session delete event to failover/passive
4107        p = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
4108             IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
4109             UDP(sport=12346, dport=12345) /
4110             HANATStateSync(sequence_number=2, events=[
4111                 Event(event_type='del', protocol='udp',
4112                       in_addr=self.pg0.remote_ip4, out_addr=self.nat_addr,
4113                       in_port=self.udp_port_in, out_port=self.udp_port_out,
4114                       eh_addr=self.pg1.remote_ip4,
4115                       ehn_addr=self.pg1.remote_ip4,
4116                       eh_port=self.udp_external_port,
4117                       ehn_port=self.udp_external_port, fib_index=0)]))
4118
4119        self.pg3.add_stream(p)
4120        self.pg_enable_capture(self.pg_interfaces)
4121        self.pg_start()
4122        # receive ACK
4123        capture = self.pg3.get_capture(1)
4124        p = capture[0]
4125        try:
4126            hanat = p[HANATStateSync]
4127        except IndexError:
4128            self.logger.error(ppp("Invalid packet:", p))
4129            raise
4130        else:
4131            self.assertEqual(hanat.sequence_number, 2)
4132            self.assertEqual(hanat.flags, 'ACK')
4133            self.assertEqual(hanat.version, 1)
4134        users = self.vapi.nat44_user_dump()
4135        self.assertEqual(len(users), 1)
4136        self.assertEqual(str(users[0].ip_address),
4137                         self.pg0.remote_ip4)
4138        # now we should have only 1 session, 1 deleted by HA
4139        sessions = self.vapi.nat44_user_session_dump(users[0].ip_address,
4140                                                     users[0].vrf_id)
4141        self.assertEqual(len(sessions), 1)
4142        stats = self.statistics.get_counter('/nat44/ha/del-event-recv')
4143        self.assertEqual(stats[0][0], 1)
4144
4145        stats = self.statistics.get_err_counter('/err/nat-ha/pkts-processed')
4146        self.assertEqual(stats, 2)
4147
4148        # send HA session refresh event to failover/passive
4149        p = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
4150             IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
4151             UDP(sport=12346, dport=12345) /
4152             HANATStateSync(sequence_number=3, events=[
4153                 Event(event_type='refresh', protocol='tcp',
4154                       in_addr=self.pg0.remote_ip4, out_addr=self.nat_addr,
4155                       in_port=self.tcp_port_in, out_port=self.tcp_port_out,
4156                       eh_addr=self.pg1.remote_ip4,
4157                       ehn_addr=self.pg1.remote_ip4,
4158                       eh_port=self.tcp_external_port,
4159                       ehn_port=self.tcp_external_port, fib_index=0,
4160                       total_bytes=1024, total_pkts=2)]))
4161        self.pg3.add_stream(p)
4162        self.pg_enable_capture(self.pg_interfaces)
4163        self.pg_start()
4164        # receive ACK
4165        capture = self.pg3.get_capture(1)
4166        p = capture[0]
4167        try:
4168            hanat = p[HANATStateSync]
4169        except IndexError:
4170            self.logger.error(ppp("Invalid packet:", p))
4171            raise
4172        else:
4173            self.assertEqual(hanat.sequence_number, 3)
4174            self.assertEqual(hanat.flags, 'ACK')
4175            self.assertEqual(hanat.version, 1)
4176        users = self.vapi.nat44_user_dump()
4177        self.assertEqual(len(users), 1)
4178        self.assertEqual(str(users[0].ip_address),
4179                         self.pg0.remote_ip4)
4180        sessions = self.vapi.nat44_user_session_dump(users[0].ip_address,
4181                                                     users[0].vrf_id)
4182        self.assertEqual(len(sessions), 1)
4183        session = sessions[0]
4184        self.assertEqual(session.total_bytes, 1024)
4185        self.assertEqual(session.total_pkts, 2)
4186        stats = self.statistics.get_counter('/nat44/ha/refresh-event-recv')
4187        self.assertEqual(stats[0][0], 1)
4188
4189        stats = self.statistics.get_err_counter('/err/nat-ha/pkts-processed')
4190        self.assertEqual(stats, 3)
4191
4192        # send packet to test session created by HA
4193        p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4194             IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4195             TCP(sport=self.tcp_external_port, dport=self.tcp_port_out))
4196        self.pg1.add_stream(p)
4197        self.pg_enable_capture(self.pg_interfaces)
4198        self.pg_start()
4199        capture = self.pg0.get_capture(1)
4200        p = capture[0]
4201        try:
4202            ip = p[IP]
4203            tcp = p[TCP]
4204        except IndexError:
4205            self.logger.error(ppp("Invalid packet:", p))
4206            raise
4207        else:
4208            self.assertEqual(ip.src, self.pg1.remote_ip4)
4209            self.assertEqual(ip.dst, self.pg0.remote_ip4)
4210            self.assertEqual(tcp.sport, self.tcp_external_port)
4211            self.assertEqual(tcp.dport, self.tcp_port_in)
4212
4213    def tearDown(self):
4214        super(TestNAT44, self).tearDown()
4215        self.clear_nat44()
4216        self.vapi.cli("clear logging")
4217
4218    def show_commands_at_teardown(self):
4219        self.logger.info(self.vapi.cli("show nat44 addresses"))
4220        self.logger.info(self.vapi.cli("show nat44 interfaces"))
4221        self.logger.info(self.vapi.cli("show nat44 static mappings"))
4222        self.logger.info(self.vapi.cli("show nat44 interface address"))
4223        self.logger.info(self.vapi.cli("show nat44 sessions detail"))
4224        self.logger.info(self.vapi.cli("show nat44 hash tables detail"))
4225        self.logger.info(self.vapi.cli("show nat timeouts"))
4226        self.logger.info(
4227            self.vapi.cli("show nat addr-port-assignment-alg"))
4228        self.logger.info(self.vapi.cli("show nat ha"))
4229
4230
4231class TestNAT44EndpointDependent2(MethodHolder):
4232    """ Endpoint-Dependent session test cases """
4233
4234    icmp_timeout = 2
4235
4236    @classmethod
4237    def setUpConstants(cls):
4238        super(TestNAT44EndpointDependent2, cls).setUpConstants()
4239        cls.vpp_cmdline.extend(["nat", "{", "endpoint-dependent",
4240                                "translation", "hash", "buckets", "1",
4241                                "icmp", "timeout", str(cls.icmp_timeout), "}"])
4242
4243    @classmethod
4244    def setUpClass(cls):
4245        super(TestNAT44EndpointDependent2, cls).setUpClass()
4246        try:
4247            translation_buckets = 1
4248            cls.max_translations = 10 * translation_buckets
4249
4250            cls.create_pg_interfaces(range(2))
4251            cls.interfaces = list(cls.pg_interfaces[0:2])
4252
4253            for i in cls.interfaces:
4254                i.admin_up()
4255                i.config_ip4()
4256                i.resolve_arp()
4257
4258            cls.pg0.generate_remote_hosts(1)
4259            cls.pg0.configure_ipv4_neighbors()
4260
4261            cls.pg1.generate_remote_hosts(1)
4262            cls.pg1.configure_ip