test_map.py revision 5d4b8912
1#!/usr/bin/env python3
2
3import ipaddress
4import unittest
5
6from framework import VppTestCase, VppTestRunner
7from vpp_ip import DpoProto
8from vpp_ip_route import VppIpRoute, VppRoutePath
9from util import fragment_rfc791, fragment_rfc8200
10
11import scapy.compat
12from scapy.layers.l2 import Ether
13from scapy.packet import Raw
14from scapy.layers.inet import IP, UDP, ICMP, TCP, fragment
15from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded
16
17
18class TestMAP(VppTestCase):
19    """ MAP Test Case """
20
21    @classmethod
22    def setUpClass(cls):
23        super(TestMAP, cls).setUpClass()
24
25    @classmethod
26    def tearDownClass(cls):
27        super(TestMAP, cls).tearDownClass()
28
29    def setUp(self):
30        super(TestMAP, self).setUp()
31
32        # create 2 pg interfaces
33        self.create_pg_interfaces(range(4))
34
35        # pg0 is 'inside' IPv4
36        self.pg0.admin_up()
37        self.pg0.config_ip4()
38        self.pg0.resolve_arp()
39
40        # pg1 is 'outside' IPv6
41        self.pg1.admin_up()
42        self.pg1.config_ip6()
43        self.pg1.generate_remote_hosts(4)
44        self.pg1.configure_ipv6_neighbors()
45
46    def tearDown(self):
47        super(TestMAP, self).tearDown()
48        for i in self.pg_interfaces:
49            i.unconfig_ip4()
50            i.unconfig_ip6()
51            i.admin_down()
52
53    def send_and_assert_encapped(self, packets, ip6_src, ip6_dst, dmac=None):
54        if not dmac:
55            dmac = self.pg1.remote_mac
56
57        self.pg0.add_stream(packets)
58
59        self.pg_enable_capture(self.pg_interfaces)
60        self.pg_start()
61
62        capture = self.pg1.get_capture(len(packets))
63        for rx, tx in zip(capture, packets):
64            self.assertEqual(rx[Ether].dst, dmac)
65            self.assertEqual(rx[IP].src, tx[IP].src)
66            self.assertEqual(rx[IPv6].src, ip6_src)
67            self.assertEqual(rx[IPv6].dst, ip6_dst)
68
69    def send_and_assert_encapped_one(self, packet, ip6_src, ip6_dst,
70                                     dmac=None):
71        return self.send_and_assert_encapped([packet], ip6_src, ip6_dst, dmac)
72
73    def test_api_map_domain_dump(self):
74        map_dst = '2001::/64'
75        map_src = '3000::1/128'
76        client_pfx = '192.168.0.0/16'
77        tag = 'MAP-E tag.'
78        index = self.vapi.map_add_domain(ip4_prefix=client_pfx,
79                                         ip6_prefix=map_dst,
80                                         ip6_src=map_src,
81                                         tag=tag).index
82        rv = self.vapi.map_domain_dump()
83
84        # restore the state early so as to not impact subsequent tests.
85        # If an assert fails, we will not get the chance to do it at the end.
86        self.vapi.map_del_domain(index=index)
87
88        self.assertGreater(len(rv), 0,
89                           "Expected output from 'map_domain_dump'")
90
91        # typedefs are returned as ipaddress objects.
92        # wrap results in str() ugh! to avoid the need to call unicode.
93        self.assertEqual(str(rv[0].ip4_prefix), client_pfx)
94        self.assertEqual(str(rv[0].ip6_prefix), map_dst)
95        self.assertEqual(str(rv[0].ip6_src), map_src)
96
97        self.assertEqual(rv[0].tag, tag,
98                         "output produced incorrect tag value.")
99
100    def test_map_e_udp(self):
101        """ MAP-E UDP"""
102
103        #
104        # Add a route to the MAP-BR
105        #
106        map_br_pfx = "2001::"
107        map_br_pfx_len = 32
108        map_route = VppIpRoute(self,
109                               map_br_pfx,
110                               map_br_pfx_len,
111                               [VppRoutePath(self.pg1.remote_ip6,
112                                             self.pg1.sw_if_index)])
113        map_route.add_vpp_config()
114
115        #
116        # Add a domain that maps from pg0 to pg1
117        #
118        map_dst = '2001::/32'
119        map_src = '3000::1/128'
120        client_pfx = '192.168.0.0/16'
121        map_translated_addr = '2001:0:101:7000:0:c0a8:101:7'
122        tag = 'MAP-E tag.'
123        self.vapi.map_add_domain(ip4_prefix=client_pfx,
124                                 ip6_prefix=map_dst,
125                                 ip6_src=map_src,
126                                 ea_bits_len=20,
127                                 psid_offset=4,
128                                 psid_length=4,
129                                 tag=tag)
130
131        self.vapi.map_param_set_security_check(enable=1, fragments=1)
132
133        # Enable MAP on interface.
134        self.vapi.map_if_enable_disable(is_enable=1,
135                                        sw_if_index=self.pg0.sw_if_index,
136                                        is_translation=0)
137
138        # Ensure MAP doesn't steal all packets!
139        v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
140              IP(src=self.pg0.remote_ip4, dst=self.pg0.remote_ip4) /
141              UDP(sport=20000, dport=10000) /
142              Raw(b'\xa5' * 100))
143        rx = self.send_and_expect(self.pg0, v4 * 4, self.pg0)
144        v4_reply = v4[1]
145        v4_reply.ttl -= 1
146        for p in rx:
147            self.validate(p[1], v4_reply)
148
149        #
150        # Fire in a v4 packet that will be encapped to the BR
151        #
152        v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
153              IP(src=self.pg0.remote_ip4, dst='192.168.1.1') /
154              UDP(sport=20000, dport=10000) /
155              Raw(b'\xa5' * 100))
156
157        self.send_and_assert_encapped(v4 * 4, "3000::1", map_translated_addr)
158
159        #
160        # Verify reordered fragments are able to pass as well
161        #
162        v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
163              IP(id=1, src=self.pg0.remote_ip4, dst='192.168.1.1') /
164              UDP(sport=20000, dport=10000) /
165              Raw(b'\xa5' * 1000))
166
167        frags = fragment_rfc791(v4, 400)
168        frags.reverse()
169
170        self.send_and_assert_encapped(frags, "3000::1", map_translated_addr)
171
172        # Enable MAP on interface.
173        self.vapi.map_if_enable_disable(is_enable=1,
174                                        sw_if_index=self.pg1.sw_if_index,
175                                        is_translation=0)
176
177        # Ensure MAP doesn't steal all packets
178        v6 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
179              IPv6(src=self.pg1.remote_ip6, dst=self.pg1.remote_ip6) /
180              UDP(sport=20000, dport=10000) /
181              Raw(b'\xa5' * 100))
182        rx = self.send_and_expect(self.pg1, v6*1, self.pg1)
183        v6_reply = v6[1]
184        v6_reply.hlim -= 1
185        for p in rx:
186            self.validate(p[1], v6_reply)
187
188        #
189        # Fire in a V6 encapped packet.
190        # expect a decapped packet on the inside ip4 link
191        #
192        p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
193             IPv6(dst='3000::1', src=map_translated_addr) /
194             IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
195             UDP(sport=10000, dport=20000) /
196             Raw(b'\xa5' * 100))
197
198        self.pg1.add_stream(p)
199
200        self.pg_enable_capture(self.pg_interfaces)
201        self.pg_start()
202
203        rx = self.pg0.get_capture(1)
204        rx = rx[0]
205
206        self.assertFalse(rx.haslayer(IPv6))
207        self.assertEqual(rx[IP].src, p[IP].src)
208        self.assertEqual(rx[IP].dst, p[IP].dst)
209
210        #
211        # Verify encapped reordered fragments pass as well
212        #
213        p = (IP(id=1, dst=self.pg0.remote_ip4, src='192.168.1.1') /
214             UDP(sport=10000, dport=20000) /
215             Raw(b'\xa5' * 1500))
216        frags = fragment_rfc791(p, 400)
217        frags.reverse()
218
219        stream = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
220                  IPv6(dst='3000::1', src=map_translated_addr) /
221                  x for x in frags)
222
223        self.pg1.add_stream(stream)
224
225        self.pg_enable_capture(self.pg_interfaces)
226        self.pg_start()
227
228        rx = self.pg0.get_capture(len(frags))
229
230        for r in rx:
231            self.assertFalse(r.haslayer(IPv6))
232            self.assertEqual(r[IP].src, p[IP].src)
233            self.assertEqual(r[IP].dst, p[IP].dst)
234
235        # Verify that fragments pass even if ipv6 layer is fragmented
236        stream = (IPv6(dst='3000::1', src=map_translated_addr) / x
237                  for x in frags)
238
239        v6_stream = [
240            Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / x
241            for i in range(len(frags))
242            for x in fragment_rfc8200(
243                IPv6(dst='3000::1', src=map_translated_addr) / frags[i],
244                i, 200)]
245
246        self.pg1.add_stream(v6_stream)
247
248        self.pg_enable_capture(self.pg_interfaces)
249        self.pg_start()
250
251        rx = self.pg0.get_capture(len(frags))
252
253        for r in rx:
254            self.assertFalse(r.haslayer(IPv6))
255            self.assertEqual(r[IP].src, p[IP].src)
256            self.assertEqual(r[IP].dst, p[IP].dst)
257
258        #
259        # Pre-resolve. No API for this!!
260        #
261        self.vapi.ppcli("map params pre-resolve ip6-nh 4001::1")
262
263        self.send_and_assert_no_replies(self.pg0, v4,
264                                        "resolved via default route")
265
266        #
267        # Add a route to 4001::1. Expect the encapped traffic to be
268        # sent via that routes next-hop
269        #
270        pre_res_route = VppIpRoute(self, "4001::1", 128,
271                                   [VppRoutePath(self.pg1.remote_hosts[2].ip6,
272                                                 self.pg1.sw_if_index)])
273        pre_res_route.add_vpp_config()
274
275        self.send_and_assert_encapped_one(v4, "3000::1",
276                                          map_translated_addr,
277                                          dmac=self.pg1.remote_hosts[2].mac)
278
279        #
280        # change the route to the pre-solved next-hop
281        #
282        pre_res_route.modify([VppRoutePath(self.pg1.remote_hosts[3].ip6,
283                                           self.pg1.sw_if_index)])
284        pre_res_route.add_vpp_config()
285
286        self.send_and_assert_encapped_one(v4, "3000::1",
287                                          map_translated_addr,
288                                          dmac=self.pg1.remote_hosts[3].mac)
289
290        #
291        # cleanup. The test infra's object registry will ensure
292        # the route is really gone and thus that the unresolve worked.
293        #
294        pre_res_route.remove_vpp_config()
295        self.vapi.ppcli("map params pre-resolve del ip6-nh 4001::1")
296
297    def test_map_e_inner_frag(self):
298        """ MAP-E Inner fragmentation """
299
300        #
301        # Add a route to the MAP-BR
302        #
303        map_br_pfx = "2001::"
304        map_br_pfx_len = 32
305        map_route = VppIpRoute(self,
306                               map_br_pfx,
307                               map_br_pfx_len,
308                               [VppRoutePath(self.pg1.remote_ip6,
309                                             self.pg1.sw_if_index)])
310        map_route.add_vpp_config()
311
312        #
313        # Add a domain that maps from pg0 to pg1
314        #
315        map_dst = '2001::/32'
316        map_src = '3000::1/128'
317        client_pfx = '192.168.0.0/16'
318        map_translated_addr = '2001:0:101:7000:0:c0a8:101:7'
319        tag = 'MAP-E tag.'
320        self.vapi.map_add_domain(ip4_prefix=client_pfx,
321                                 ip6_prefix=map_dst,
322                                 ip6_src=map_src,
323                                 ea_bits_len=20,
324                                 psid_offset=4,
325                                 psid_length=4,
326                                 mtu=1000,
327                                 tag=tag)
328
329        # Enable MAP on interface.
330        self.vapi.map_if_enable_disable(is_enable=1,
331                                        sw_if_index=self.pg0.sw_if_index,
332                                        is_translation=0)
333
334        # Enable inner fragmentation
335        self.vapi.map_param_set_fragmentation(inner=1)
336
337        v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
338              IP(src=self.pg0.remote_ip4, dst='192.168.1.1') /
339              UDP(sport=20000, dport=10000) /
340              Raw(b'\xa5' * 1300))
341
342        self.pg_send(self.pg0, v4*1)
343        rx = self.pg1.get_capture(2)
344
345        frags = fragment_rfc791(v4[1], 1000)
346        frags[0].id = 0
347        frags[1].id = 0
348        frags[0].ttl -= 1
349        frags[1].ttl -= 1
350        frags[0].chksum = 0
351        frags[1].chksum = 0
352
353        v6_reply1 = (IPv6(src='3000::1', dst=map_translated_addr, hlim=63) /
354                     frags[0])
355        v6_reply2 = (IPv6(src='3000::1', dst=map_translated_addr, hlim=63) /
356                     frags[1])
357        rx[0][1].fl = 0
358        rx[1][1].fl = 0
359        rx[0][1][IP].id = 0
360        rx[1][1][IP].id = 0
361        rx[0][1][IP].chksum = 0
362        rx[1][1][IP].chksum = 0
363
364        self.validate(rx[0][1], v6_reply1)
365        self.validate(rx[1][1], v6_reply2)
366
367    def test_map_e_tcp_mss(self):
368        """ MAP-E TCP MSS"""
369
370        #
371        # Add a route to the MAP-BR
372        #
373        map_br_pfx = "2001::"
374        map_br_pfx_len = 32
375        map_route = VppIpRoute(self,
376                               map_br_pfx,
377                               map_br_pfx_len,
378                               [VppRoutePath(self.pg1.remote_ip6,
379                                             self.pg1.sw_if_index)])
380        map_route.add_vpp_config()
381
382        #
383        # Add a domain that maps from pg0 to pg1
384        #
385        map_dst = '2001::/32'
386        map_src = '3000::1/128'
387        client_pfx = '192.168.0.0/16'
388        map_translated_addr = '2001:0:101:5000:0:c0a8:101:5'
389        tag = 'MAP-E TCP tag.'
390        self.vapi.map_add_domain(ip4_prefix=client_pfx,
391                                 ip6_prefix=map_dst,
392                                 ip6_src=map_src,
393                                 ea_bits_len=20,
394                                 psid_offset=4,
395                                 psid_length=4,
396                                 tag=tag)
397
398        # Enable MAP on pg0 interface.
399        self.vapi.map_if_enable_disable(is_enable=1,
400                                        sw_if_index=self.pg0.sw_if_index,
401                                        is_translation=0)
402
403        # Enable MAP on pg1 interface.
404        self.vapi.map_if_enable_disable(is_enable=1,
405                                        sw_if_index=self.pg1.sw_if_index,
406                                        is_translation=0)
407
408        # TCP MSS clamping
409        mss_clamp = 1300
410        self.vapi.map_param_set_tcp(mss_clamp)
411
412        #
413        # Send a v4 packet that will be encapped.
414        #
415        p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
416        p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.1.1')
417        p_tcp = TCP(sport=20000, dport=30000, flags="S",
418                    options=[("MSS", 1455)])
419        p4 = p_ether / p_ip4 / p_tcp
420
421        self.pg1.add_stream(p4)
422        self.pg_enable_capture(self.pg_interfaces)
423        self.pg_start()
424
425        rx = self.pg1.get_capture(1)
426        rx = rx[0]
427
428        self.assertTrue(rx.haslayer(IPv6))
429        self.assertEqual(rx[IP].src, p4[IP].src)
430        self.assertEqual(rx[IP].dst, p4[IP].dst)
431        self.assertEqual(rx[IPv6].src, "3000::1")
432        self.assertEqual(rx[TCP].options,
433                         TCP(options=[('MSS', mss_clamp)]).options)
434
435    def validate(self, rx, expected):
436        self.assertEqual(rx, expected.__class__(scapy.compat.raw(expected)))
437
438    def payload(self, len):
439        return 'x' * len
440
441    def test_map_t(self):
442        """ MAP-T """
443
444        #
445        # Add a domain that maps from pg0 to pg1
446        #
447        map_dst = '2001:db8::/32'
448        map_src = '1234:5678:90ab:cdef::/64'
449        ip4_pfx = '192.168.0.0/24'
450        tag = 'MAP-T Tag.'
451
452        self.vapi.map_add_domain(ip6_prefix=map_dst,
453                                 ip4_prefix=ip4_pfx,
454                                 ip6_src=map_src,
455                                 ea_bits_len=16,
456                                 psid_offset=6,
457                                 psid_length=4,
458                                 mtu=1500,
459                                 tag=tag)
460
461        # Enable MAP-T on interfaces.
462        self.vapi.map_if_enable_disable(is_enable=1,
463                                        sw_if_index=self.pg0.sw_if_index,
464                                        is_translation=1)
465        self.vapi.map_if_enable_disable(is_enable=1,
466                                        sw_if_index=self.pg1.sw_if_index,
467                                        is_translation=1)
468
469        # Ensure MAP doesn't steal all packets!
470        v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
471              IP(src=self.pg0.remote_ip4, dst=self.pg0.remote_ip4) /
472              UDP(sport=20000, dport=10000) /
473              Raw(b'\xa5' * 100))
474        rx = self.send_and_expect(self.pg0, v4*1, self.pg0)
475        v4_reply = v4[1]
476        v4_reply.ttl -= 1
477        for p in rx:
478            self.validate(p[1], v4_reply)
479        # Ensure MAP doesn't steal all packets
480        v6 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
481              IPv6(src=self.pg1.remote_ip6, dst=self.pg1.remote_ip6) /
482              UDP(sport=20000, dport=10000) /
483              Raw(b'\xa5' * 100))
484        rx = self.send_and_expect(self.pg1, v6*1, self.pg1)
485        v6_reply = v6[1]
486        v6_reply.hlim -= 1
487        for p in rx:
488            self.validate(p[1], v6_reply)
489
490        map_route = VppIpRoute(self,
491                               "2001:db8::",
492                               32,
493                               [VppRoutePath(self.pg1.remote_ip6,
494                                             self.pg1.sw_if_index,
495                                             proto=DpoProto.DPO_PROTO_IP6)])
496        map_route.add_vpp_config()
497
498        #
499        # Send a v4 packet that will be translated
500        #
501        p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
502        p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
503        payload = TCP(sport=0xabcd, dport=0xabcd)
504
505        p4 = (p_ether / p_ip4 / payload)
506        p6_translated = (IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0",
507                              dst="2001:db8:1f0::c0a8:1:f") / payload)
508        p6_translated.hlim -= 1
509        rx = self.send_and_expect(self.pg0, p4*1, self.pg1)
510        for p in rx:
511            self.validate(p[1], p6_translated)
512
513        # Send back an IPv6 packet that will be "untranslated"
514        p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
515        p_ip6 = IPv6(src='2001:db8:1f0::c0a8:1:f',
516                     dst='1234:5678:90ab:cdef:ac:1001:200:0')
517        p6 = (p_ether6 / p_ip6 / payload)
518        p4_translated = (IP(src='192.168.0.1',
519                            dst=self.pg0.remote_ip4) / payload)
520        p4_translated.id = 0
521        p4_translated.ttl -= 1
522        rx = self.send_and_expect(self.pg1, p6*1, self.pg0)
523        for p in rx:
524            self.validate(p[1], p4_translated)
525
526        # IPv4 TTL
527        ip4_ttl_expired = IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=0)
528        p4 = (p_ether / ip4_ttl_expired / payload)
529
530        icmp4_reply = (IP(id=0, ttl=254, src=self.pg0.local_ip4,
531                          dst=self.pg0.remote_ip4) /
532                       ICMP(type='time-exceeded',
533                            code='ttl-zero-during-transit') /
534                       IP(src=self.pg0.remote_ip4,
535                          dst='192.168.0.1', ttl=0) / payload)
536        rx = self.send_and_expect(self.pg0, p4*1, self.pg0)
537        for p in rx:
538            self.validate(p[1], icmp4_reply)
539
540        '''
541        This one is broken, cause it would require hairpinning...
542        # IPv4 TTL TTL1
543        ip4_ttl_expired = IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=1)
544        p4 = (p_ether / ip4_ttl_expired / payload)
545
546        icmp4_reply = IP(id=0, ttl=254, src=self.pg0.local_ip4,
547        dst=self.pg0.remote_ip4) / \
548        ICMP(type='time-exceeded', code='ttl-zero-during-transit' ) / \
549        IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=0) / payload
550        rx = self.send_and_expect(self.pg0, p4*1, self.pg0)
551        for p in rx:
552            self.validate(p[1], icmp4_reply)
553        '''
554
555        # IPv6 Hop limit
556        ip6_hlim_expired = IPv6(hlim=0, src='2001:db8:1ab::c0a8:1:ab',
557                                dst='1234:5678:90ab:cdef:ac:1001:200:0')
558        p6 = (p_ether6 / ip6_hlim_expired / payload)
559
560        icmp6_reply = (IPv6(hlim=255, src=self.pg1.local_ip6,
561                            dst="2001:db8:1ab::c0a8:1:ab") /
562                       ICMPv6TimeExceeded(code=0) /
563                       IPv6(src="2001:db8:1ab::c0a8:1:ab",
564                            dst='1234:5678:90ab:cdef:ac:1001:200:0',
565                            hlim=0) / payload)
566        rx = self.send_and_expect(self.pg1, p6*1, self.pg1)
567        for p in rx:
568            self.validate(p[1], icmp6_reply)
569
570        # IPv4 Well-known port
571        p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
572        payload = UDP(sport=200, dport=200)
573        p4 = (p_ether / p_ip4 / payload)
574        self.send_and_assert_no_replies(self.pg0, p4*1)
575
576        # IPv6 Well-known port
577        payload = UDP(sport=200, dport=200)
578        p6 = (p_ether6 / p_ip6 / payload)
579        self.send_and_assert_no_replies(self.pg1, p6*1)
580
581        # Packet fragmentation
582        payload = UDP(sport=40000, dport=4000) / self.payload(1453)
583        p4 = (p_ether / p_ip4 / payload)
584        self.pg_enable_capture()
585        self.pg0.add_stream(p4)
586        self.pg_start()
587        rx = self.pg1.get_capture(2)
588        for p in rx:
589            pass
590            # TODO: Manual validation
591            # self.validate(p[1], icmp4_reply)
592
593        # Packet fragmentation send fragments
594        payload = UDP(sport=40000, dport=4000) / self.payload(1453)
595        p4 = (p_ether / p_ip4 / payload)
596        frags = fragment(p4, fragsize=1000)
597        self.pg_enable_capture()
598        self.pg0.add_stream(frags)
599        self.pg_start()
600        rx = self.pg1.get_capture(2)
601        for p in rx:
602            pass
603            # p.show2()
604
605        # reass_pkt = reassemble(rx)
606        # p4_reply.ttl -= 1
607        # p4_reply.id = 256
608        # self.validate(reass_pkt, p4_reply)
609
610        # TCP MSS clamping
611        self.vapi.map_param_set_tcp(1300)
612
613        #
614        # Send a v4 TCP SYN packet that will be translated and MSS clamped
615        #
616        p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
617        p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
618        payload = TCP(sport=0xabcd, dport=0xabcd, flags="S",
619                      options=[('MSS', 1460)])
620
621        p4 = (p_ether / p_ip4 / payload)
622        p6_translated = (IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0",
623                              dst="2001:db8:1f0::c0a8:1:f") / payload)
624        p6_translated.hlim -= 1
625        p6_translated[TCP].options = [('MSS', 1300)]
626        rx = self.send_and_expect(self.pg0, p4*1, self.pg1)
627        for p in rx:
628            self.validate(p[1], p6_translated)
629
630        # Send back an IPv6 packet that will be "untranslated"
631        p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
632        p_ip6 = IPv6(src='2001:db8:1f0::c0a8:1:f',
633                     dst='1234:5678:90ab:cdef:ac:1001:200:0')
634        p6 = (p_ether6 / p_ip6 / payload)
635        p4_translated = (IP(src='192.168.0.1',
636                            dst=self.pg0.remote_ip4) / payload)
637        p4_translated.id = 0
638        p4_translated.ttl -= 1
639        p4_translated[TCP].options = [('MSS', 1300)]
640        rx = self.send_and_expect(self.pg1, p6*1, self.pg0)
641        for p in rx:
642            self.validate(p[1], p4_translated)
643
644    def test_map_t_ip6_psid(self):
645        """ MAP-T v6->v4 PSID validation"""
646
647        #
648        # Add a domain that maps from pg0 to pg1
649        #
650        map_dst = '2001:db8::/32'
651        map_src = '1234:5678:90ab:cdef::/64'
652        ip4_pfx = '192.168.0.0/24'
653        tag = 'MAP-T Test Domain'
654
655        self.vapi.map_add_domain(ip6_prefix=map_dst,
656                                 ip4_prefix=ip4_pfx,
657                                 ip6_src=map_src,
658                                 ea_bits_len=16,
659                                 psid_offset=6,
660                                 psid_length=4,
661                                 mtu=1500,
662                                 tag=tag)
663
664        # Enable MAP-T on interfaces.
665        self.vapi.map_if_enable_disable(is_enable=1,
666                                        sw_if_index=self.pg0.sw_if_index,
667                                        is_translation=1)
668        self.vapi.map_if_enable_disable(is_enable=1,
669                                        sw_if_index=self.pg1.sw_if_index,
670                                        is_translation=1)
671
672        map_route = VppIpRoute(self,
673                               "2001:db8::",
674                               32,
675                               [VppRoutePath(self.pg1.remote_ip6,
676                                             self.pg1.sw_if_index,
677                                             proto=DpoProto.DPO_PROTO_IP6)])
678        map_route.add_vpp_config()
679
680        p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
681        p_ip6 = IPv6(src='2001:db8:1f0::c0a8:1:f',
682                     dst='1234:5678:90ab:cdef:ac:1001:200:0')
683
684        # Send good IPv6 source port, ensure translated IPv4 received
685        payload = TCP(sport=0xabcd, dport=80)
686        p6 = (p_ether6 / p_ip6 / payload)
687        p4_translated = (IP(src='192.168.0.1',
688                            dst=self.pg0.remote_ip4) / payload)
689        p4_translated.id = 0
690        p4_translated.ttl -= 1
691        rx = self.send_and_expect(self.pg1, p6*1, self.pg0)
692        for p in rx:
693            self.validate(p[1], p4_translated)
694
695        # Send bad IPv6 source port, ensure translated IPv4 not received
696        payload = TCP(sport=0xdcba, dport=80)
697        p6 = (p_ether6 / p_ip6 / payload)
698        self.send_and_assert_no_replies(self.pg1, p6*1)
699
700if __name__ == '__main__':
701    unittest.main(testRunner=VppTestRunner)
702