test_ip6.py revision 07a0f212
1#!/usr/bin/env python
2
3import socket
4import unittest
5
6from parameterized import parameterized
7import scapy.compat
8import scapy.layers.inet6 as inet6
9from scapy.contrib.mpls import MPLS
10from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6ND_RS, \
11    ICMPv6ND_RA, ICMPv6NDOptMTU, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptPrefixInfo, \
12    ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types, \
13    ICMPv6TimeExceeded, ICMPv6EchoRequest, ICMPv6EchoReply, IPv6ExtHdrHopByHop
14from scapy.layers.l2 import Ether, Dot1Q
15from scapy.packet import Raw
16from scapy.utils import inet_pton, inet_ntop
17from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ptop, in6_islladdr, \
18    in6_mactoifaceid
19from six import moves
20
21from framework import VppTestCase, VppTestRunner
22from util import ppp, ip6_normalize, mk_ll_addr
23from vpp_ip import DpoProto, VppIpAddress
24from vpp_ip_route import VppIpRoute, VppRoutePath, find_route, VppIpMRoute, \
25    VppMRoutePath, MRouteItfFlags, MRouteEntryFlags, VppMplsIpBind, \
26    VppMplsRoute, VppMplsTable, VppIpTable, FibPathType, \
27    VppIpInterfaceAddress
28from vpp_neighbor import find_nbr, VppNeighbor
29from vpp_pg_interface import is_ipv6_misc
30from vpp_sub_interface import VppSubInterface, VppDot1QSubint
31from ipaddress import IPv6Network, IPv4Network, IPv6Address
32
33AF_INET6 = socket.AF_INET6
34
35try:
36    text_type = unicode
37except NameError:
38    text_type = str
39
40NUM_PKTS = 67
41
42
43class TestIPv6ND(VppTestCase):
44    def validate_ra(self, intf, rx, dst_ip=None):
45        if not dst_ip:
46            dst_ip = intf.remote_ip6
47
48        # unicasted packets must come to the unicast mac
49        self.assertEqual(rx[Ether].dst, intf.remote_mac)
50
51        # and from the router's MAC
52        self.assertEqual(rx[Ether].src, intf.local_mac)
53
54        # the rx'd RA should be addressed to the sender's source
55        self.assertTrue(rx.haslayer(ICMPv6ND_RA))
56        self.assertEqual(in6_ptop(rx[IPv6].dst),
57                         in6_ptop(dst_ip))
58
59        # and come from the router's link local
60        self.assertTrue(in6_islladdr(rx[IPv6].src))
61        self.assertEqual(in6_ptop(rx[IPv6].src),
62                         in6_ptop(mk_ll_addr(intf.local_mac)))
63
64    def validate_na(self, intf, rx, dst_ip=None, tgt_ip=None):
65        if not dst_ip:
66            dst_ip = intf.remote_ip6
67        if not tgt_ip:
68            dst_ip = intf.local_ip6
69
70        # unicasted packets must come to the unicast mac
71        self.assertEqual(rx[Ether].dst, intf.remote_mac)
72
73        # and from the router's MAC
74        self.assertEqual(rx[Ether].src, intf.local_mac)
75
76        # the rx'd NA should be addressed to the sender's source
77        self.assertTrue(rx.haslayer(ICMPv6ND_NA))
78        self.assertEqual(in6_ptop(rx[IPv6].dst),
79                         in6_ptop(dst_ip))
80
81        # and come from the target address
82        self.assertEqual(
83            in6_ptop(rx[IPv6].src), in6_ptop(tgt_ip))
84
85        # Dest link-layer options should have the router's MAC
86        dll = rx[ICMPv6NDOptDstLLAddr]
87        self.assertEqual(dll.lladdr, intf.local_mac)
88
89    def validate_ns(self, intf, rx, tgt_ip):
90        nsma = in6_getnsma(inet_pton(AF_INET6, tgt_ip))
91        dst_ip = inet_ntop(AF_INET6, nsma)
92
93        # NS is broadcast
94        self.assertEqual(rx[Ether].dst, in6_getnsmac(nsma))
95
96        # and from the router's MAC
97        self.assertEqual(rx[Ether].src, intf.local_mac)
98
99        # the rx'd NS should be addressed to an mcast address
100        # derived from the target address
101        self.assertEqual(
102            in6_ptop(rx[IPv6].dst), in6_ptop(dst_ip))
103
104        # expect the tgt IP in the NS header
105        ns = rx[ICMPv6ND_NS]
106        self.assertEqual(in6_ptop(ns.tgt), in6_ptop(tgt_ip))
107
108        # packet is from the router's local address
109        self.assertEqual(
110            in6_ptop(rx[IPv6].src), intf.local_ip6)
111
112        # Src link-layer options should have the router's MAC
113        sll = rx[ICMPv6NDOptSrcLLAddr]
114        self.assertEqual(sll.lladdr, intf.local_mac)
115
116    def send_and_expect_ra(self, intf, pkts, remark, dst_ip=None,
117                           filter_out_fn=is_ipv6_misc):
118        intf.add_stream(pkts)
119        self.pg_enable_capture(self.pg_interfaces)
120        self.pg_start()
121        rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
122
123        self.assertEqual(len(rx), 1)
124        rx = rx[0]
125        self.validate_ra(intf, rx, dst_ip)
126
127    def send_and_expect_na(self, intf, pkts, remark, dst_ip=None,
128                           tgt_ip=None,
129                           filter_out_fn=is_ipv6_misc):
130        intf.add_stream(pkts)
131        self.pg_enable_capture(self.pg_interfaces)
132        self.pg_start()
133        rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
134
135        self.assertEqual(len(rx), 1)
136        rx = rx[0]
137        self.validate_na(intf, rx, dst_ip, tgt_ip)
138
139    def send_and_expect_ns(self, tx_intf, rx_intf, pkts, tgt_ip,
140                           filter_out_fn=is_ipv6_misc):
141        tx_intf.add_stream(pkts)
142        self.pg_enable_capture(self.pg_interfaces)
143        self.pg_start()
144        rx = rx_intf.get_capture(1, filter_out_fn=filter_out_fn)
145
146        self.assertEqual(len(rx), 1)
147        rx = rx[0]
148        self.validate_ns(rx_intf, rx, tgt_ip)
149
150    def verify_ip(self, rx, smac, dmac, sip, dip):
151        ether = rx[Ether]
152        self.assertEqual(ether.dst, dmac)
153        self.assertEqual(ether.src, smac)
154
155        ip = rx[IPv6]
156        self.assertEqual(ip.src, sip)
157        self.assertEqual(ip.dst, dip)
158
159
160class TestIPv6(TestIPv6ND):
161    """ IPv6 Test Case """
162
163    @classmethod
164    def setUpClass(cls):
165        super(TestIPv6, cls).setUpClass()
166
167    @classmethod
168    def tearDownClass(cls):
169        super(TestIPv6, cls).tearDownClass()
170
171    def setUp(self):
172        """
173        Perform test setup before test case.
174
175        **Config:**
176            - create 3 pg interfaces
177                - untagged pg0 interface
178                - Dot1Q subinterface on pg1
179                - Dot1AD subinterface on pg2
180            - setup interfaces:
181                - put it into UP state
182                - set IPv6 addresses
183                - resolve neighbor address using NDP
184            - configure 200 fib entries
185
186        :ivar list interfaces: pg interfaces and subinterfaces.
187        :ivar dict flows: IPv4 packet flows in test.
188
189        *TODO:* Create AD sub interface
190        """
191        super(TestIPv6, self).setUp()
192
193        # create 3 pg interfaces
194        self.create_pg_interfaces(range(3))
195
196        # create 2 subinterfaces for p1 and pg2
197        self.sub_interfaces = [
198            VppDot1QSubint(self, self.pg1, 100),
199            VppDot1QSubint(self, self.pg2, 200)
200            # TODO: VppDot1ADSubint(self, self.pg2, 200, 300, 400)
201        ]
202
203        # packet flows mapping pg0 -> pg1.sub, pg2.sub, etc.
204        self.flows = dict()
205        self.flows[self.pg0] = [self.pg1.sub_if, self.pg2.sub_if]
206        self.flows[self.pg1.sub_if] = [self.pg0, self.pg2.sub_if]
207        self.flows[self.pg2.sub_if] = [self.pg0, self.pg1.sub_if]
208
209        # packet sizes
210        self.pg_if_packet_sizes = [64, 1500, 9020]
211
212        self.interfaces = list(self.pg_interfaces)
213        self.interfaces.extend(self.sub_interfaces)
214
215        # setup all interfaces
216        for i in self.interfaces:
217            i.admin_up()
218            i.config_ip6()
219            i.resolve_ndp()
220
221    def tearDown(self):
222        """Run standard test teardown and log ``show ip6 neighbors``."""
223        for i in self.interfaces:
224            i.unconfig_ip6()
225            i.ip6_disable()
226            i.admin_down()
227        for i in self.sub_interfaces:
228            i.remove_vpp_config()
229
230        super(TestIPv6, self).tearDown()
231        if not self.vpp_dead:
232            self.logger.info(self.vapi.cli("show ip6 neighbors"))
233            # info(self.vapi.cli("show ip6 fib"))  # many entries
234
235    def modify_packet(self, src_if, packet_size, pkt):
236        """Add load, set destination IP and extend packet to required packet
237        size for defined interface.
238
239        :param VppInterface src_if: Interface to create packet for.
240        :param int packet_size: Required packet size.
241        :param Scapy pkt: Packet to be modified.
242        """
243        dst_if_idx = int(packet_size / 10 % 2)
244        dst_if = self.flows[src_if][dst_if_idx]
245        info = self.create_packet_info(src_if, dst_if)
246        payload = self.info_to_payload(info)
247        p = pkt / Raw(payload)
248        p[IPv6].dst = dst_if.remote_ip6
249        info.data = p.copy()
250        if isinstance(src_if, VppSubInterface):
251            p = src_if.add_dot1_layer(p)
252        self.extend_packet(p, packet_size)
253
254        return p
255
256    def create_stream(self, src_if):
257        """Create input packet stream for defined interface.
258
259        :param VppInterface src_if: Interface to create packet stream for.
260        """
261        hdr_ext = 4 if isinstance(src_if, VppSubInterface) else 0
262        pkt_tmpl = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
263                    IPv6(src=src_if.remote_ip6) /
264                    inet6.UDP(sport=1234, dport=1234))
265
266        pkts = [self.modify_packet(src_if, i, pkt_tmpl)
267                for i in moves.range(self.pg_if_packet_sizes[0],
268                                     self.pg_if_packet_sizes[1], 10)]
269        pkts_b = [self.modify_packet(src_if, i, pkt_tmpl)
270                  for i in moves.range(self.pg_if_packet_sizes[1] + hdr_ext,
271                                       self.pg_if_packet_sizes[2] + hdr_ext,
272                                       50)]
273        pkts.extend(pkts_b)
274
275        return pkts
276
277    def verify_capture(self, dst_if, capture):
278        """Verify captured input packet stream for defined interface.
279
280        :param VppInterface dst_if: Interface to verify captured packet stream
281                                    for.
282        :param list capture: Captured packet stream.
283        """
284        self.logger.info("Verifying capture on interface %s" % dst_if.name)
285        last_info = dict()
286        for i in self.interfaces:
287            last_info[i.sw_if_index] = None
288        is_sub_if = False
289        dst_sw_if_index = dst_if.sw_if_index
290        if hasattr(dst_if, 'parent'):
291            is_sub_if = True
292        for packet in capture:
293            if is_sub_if:
294                # Check VLAN tags and Ethernet header
295                packet = dst_if.remove_dot1_layer(packet)
296            self.assertTrue(Dot1Q not in packet)
297            try:
298                ip = packet[IPv6]
299                udp = packet[inet6.UDP]
300                payload_info = self.payload_to_info(packet[Raw])
301                packet_index = payload_info.index
302                self.assertEqual(payload_info.dst, dst_sw_if_index)
303                self.logger.debug(
304                    "Got packet on port %s: src=%u (id=%u)" %
305                    (dst_if.name, payload_info.src, packet_index))
306                next_info = self.get_next_packet_info_for_interface2(
307                    payload_info.src, dst_sw_if_index,
308                    last_info[payload_info.src])
309                last_info[payload_info.src] = next_info
310                self.assertTrue(next_info is not None)
311                self.assertEqual(packet_index, next_info.index)
312                saved_packet = next_info.data
313                # Check standard fields
314                self.assertEqual(
315                    ip.src, saved_packet[IPv6].src)
316                self.assertEqual(
317                    ip.dst, saved_packet[IPv6].dst)
318                self.assertEqual(
319                    udp.sport, saved_packet[inet6.UDP].sport)
320                self.assertEqual(
321                    udp.dport, saved_packet[inet6.UDP].dport)
322            except:
323                self.logger.error(ppp("Unexpected or invalid packet:", packet))
324                raise
325        for i in self.interfaces:
326            remaining_packet = self.get_next_packet_info_for_interface2(
327                i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
328            self.assertTrue(remaining_packet is None,
329                            "Interface %s: Packet expected from interface %s "
330                            "didn't arrive" % (dst_if.name, i.name))
331
332    def test_next_header_anomaly(self):
333        """ IPv6 next header anomaly test
334
335        Test scenario:
336            - ipv6 next header field = Fragment Header (44)
337            - next header is ICMPv6 Echo Request
338            - wait for reassembly
339        """
340        pkt = (Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac) /
341               IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6, nh=44) /
342               ICMPv6EchoRequest())
343
344        self.pg0.add_stream(pkt)
345        self.pg_start()
346
347        # wait for reassembly
348        self.sleep(10)
349
350    def test_fib(self):
351        """ IPv6 FIB test
352
353        Test scenario:
354            - Create IPv6 stream for pg0 interface
355            - Create IPv6 tagged streams for pg1's and pg2's subinterface.
356            - Send and verify received packets on each interface.
357        """
358
359        pkts = self.create_stream(self.pg0)
360        self.pg0.add_stream(pkts)
361
362        for i in self.sub_interfaces:
363            pkts = self.create_stream(i)
364            i.parent.add_stream(pkts)
365
366        self.pg_enable_capture(self.pg_interfaces)
367        self.pg_start()
368
369        pkts = self.pg0.get_capture()
370        self.verify_capture(self.pg0, pkts)
371
372        for i in self.sub_interfaces:
373            pkts = i.parent.get_capture()
374            self.verify_capture(i, pkts)
375
376    def test_ns(self):
377        """ IPv6 Neighbour Solicitation Exceptions
378
379        Test scenario:
380           - Send an NS Sourced from an address not covered by the link sub-net
381           - Send an NS to an mcast address the router has not joined
382           - Send NS for a target address the router does not onn.
383        """
384
385        #
386        # An NS from a non link source address
387        #
388        nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
389        d = inet_ntop(AF_INET6, nsma)
390
391        p = (Ether(dst=in6_getnsmac(nsma)) /
392             IPv6(dst=d, src="2002::2") /
393             ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
394             ICMPv6NDOptSrcLLAddr(
395                 lladdr=self.pg0.remote_mac))
396        pkts = [p]
397
398        self.send_and_assert_no_replies(
399            self.pg0, pkts,
400            "No response to NS source by address not on sub-net")
401
402        #
403        # An NS for sent to a solicited mcast group the router is
404        # not a member of FAILS
405        #
406        if 0:
407            nsma = in6_getnsma(inet_pton(AF_INET6, "fd::ffff"))
408            d = inet_ntop(AF_INET6, nsma)
409
410            p = (Ether(dst=in6_getnsmac(nsma)) /
411                 IPv6(dst=d, src=self.pg0.remote_ip6) /
412                 ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
413                 ICMPv6NDOptSrcLLAddr(
414                     lladdr=self.pg0.remote_mac))
415            pkts = [p]
416
417            self.send_and_assert_no_replies(
418                self.pg0, pkts,
419                "No response to NS sent to unjoined mcast address")
420
421        #
422        # An NS whose target address is one the router does not own
423        #
424        nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
425        d = inet_ntop(AF_INET6, nsma)
426
427        p = (Ether(dst=in6_getnsmac(nsma)) /
428             IPv6(dst=d, src=self.pg0.remote_ip6) /
429             ICMPv6ND_NS(tgt="fd::ffff") /
430             ICMPv6NDOptSrcLLAddr(
431                 lladdr=self.pg0.remote_mac))
432        pkts = [p]
433
434        self.send_and_assert_no_replies(self.pg0, pkts,
435                                        "No response to NS for unknown target")
436
437        #
438        # A neighbor entry that has no associated FIB-entry
439        #
440        self.pg0.generate_remote_hosts(4)
441        nd_entry = VppNeighbor(self,
442                               self.pg0.sw_if_index,
443                               self.pg0.remote_hosts[2].mac,
444                               self.pg0.remote_hosts[2].ip6,
445                               is_no_fib_entry=1)
446        nd_entry.add_vpp_config()
447
448        #
449        # check we have the neighbor, but no route
450        #
451        self.assertTrue(find_nbr(self,
452                                 self.pg0.sw_if_index,
453                                 self.pg0._remote_hosts[2].ip6))
454        self.assertFalse(find_route(self,
455                                    self.pg0._remote_hosts[2].ip6,
456                                    128))
457
458        #
459        # send an NS from a link local address to the interface's global
460        # address
461        #
462        p = (Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac) /
463             IPv6(
464                 dst=d, src=self.pg0._remote_hosts[2].ip6_ll) /
465             ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
466             ICMPv6NDOptSrcLLAddr(
467                 lladdr=self.pg0.remote_mac))
468
469        self.send_and_expect_na(self.pg0, p,
470                                "NS from link-local",
471                                dst_ip=self.pg0._remote_hosts[2].ip6_ll,
472                                tgt_ip=self.pg0.local_ip6)
473
474        #
475        # we should have learned an ND entry for the peer's link-local
476        # but not inserted a route to it in the FIB
477        #
478        self.assertTrue(find_nbr(self,
479                                 self.pg0.sw_if_index,
480                                 self.pg0._remote_hosts[2].ip6_ll))
481        self.assertFalse(find_route(self,
482                                    self.pg0._remote_hosts[2].ip6_ll,
483                                    128))
484
485        #
486        # An NS to the router's own Link-local
487        #
488        p = (Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac) /
489             IPv6(
490                 dst=d, src=self.pg0._remote_hosts[3].ip6_ll) /
491             ICMPv6ND_NS(tgt=self.pg0.local_ip6_ll) /
492             ICMPv6NDOptSrcLLAddr(
493                 lladdr=self.pg0.remote_mac))
494
495        self.send_and_expect_na(self.pg0, p,
496                                "NS to/from link-local",
497                                dst_ip=self.pg0._remote_hosts[3].ip6_ll,
498                                tgt_ip=self.pg0.local_ip6_ll)
499
500        #
501        # we should have learned an ND entry for the peer's link-local
502        # but not inserted a route to it in the FIB
503        #
504        self.assertTrue(find_nbr(self,
505                                 self.pg0.sw_if_index,
506                                 self.pg0._remote_hosts[3].ip6_ll))
507        self.assertFalse(find_route(self,
508                                    self.pg0._remote_hosts[3].ip6_ll,
509                                    128))
510
511    def test_ns_duplicates(self):
512        """ ND Duplicates"""
513
514        #
515        # Generate some hosts on the LAN
516        #
517        self.pg1.generate_remote_hosts(3)
518
519        #
520        # Add host 1 on pg1 and pg2
521        #
522        ns_pg1 = VppNeighbor(self,
523                             self.pg1.sw_if_index,
524                             self.pg1.remote_hosts[1].mac,
525                             self.pg1.remote_hosts[1].ip6)
526        ns_pg1.add_vpp_config()
527        ns_pg2 = VppNeighbor(self,
528                             self.pg2.sw_if_index,
529                             self.pg2.remote_mac,
530                             self.pg1.remote_hosts[1].ip6)
531        ns_pg2.add_vpp_config()
532
533        #
534        # IP packet destined for pg1 remote host arrives on pg1 again.
535        #
536        p = (Ether(dst=self.pg0.local_mac,
537                   src=self.pg0.remote_mac) /
538             IPv6(src=self.pg0.remote_ip6,
539                  dst=self.pg1.remote_hosts[1].ip6) /
540             inet6.UDP(sport=1234, dport=1234) /
541             Raw())
542
543        self.pg0.add_stream(p)
544        self.pg_enable_capture(self.pg_interfaces)
545        self.pg_start()
546
547        rx1 = self.pg1.get_capture(1)
548
549        self.verify_ip(rx1[0],
550                       self.pg1.local_mac,
551                       self.pg1.remote_hosts[1].mac,
552                       self.pg0.remote_ip6,
553                       self.pg1.remote_hosts[1].ip6)
554
555        #
556        # remove the duplicate on pg1
557        # packet stream should generate NSs out of pg1
558        #
559        ns_pg1.remove_vpp_config()
560
561        self.send_and_expect_ns(self.pg0, self.pg1,
562                                p, self.pg1.remote_hosts[1].ip6)
563
564        #
565        # Add it back
566        #
567        ns_pg1.add_vpp_config()
568
569        self.pg0.add_stream(p)
570        self.pg_enable_capture(self.pg_interfaces)
571        self.pg_start()
572
573        rx1 = self.pg1.get_capture(1)
574
575        self.verify_ip(rx1[0],
576                       self.pg1.local_mac,
577                       self.pg1.remote_hosts[1].mac,
578                       self.pg0.remote_ip6,
579                       self.pg1.remote_hosts[1].ip6)
580
581    def validate_ra(self, intf, rx, dst_ip=None, mtu=9000, pi_opt=None):
582        if not dst_ip:
583            dst_ip = intf.remote_ip6
584
585        # unicasted packets must come to the unicast mac
586        self.assertEqual(rx[Ether].dst, intf.remote_mac)
587
588        # and from the router's MAC
589        self.assertEqual(rx[Ether].src, intf.local_mac)
590
591        # the rx'd RA should be addressed to the sender's source
592        self.assertTrue(rx.haslayer(ICMPv6ND_RA))
593        self.assertEqual(in6_ptop(rx[IPv6].dst),
594                         in6_ptop(dst_ip))
595
596        # and come from the router's link local
597        self.assertTrue(in6_islladdr(rx[IPv6].src))
598        self.assertEqual(in6_ptop(rx[IPv6].src),
599                         in6_ptop(mk_ll_addr(intf.local_mac)))
600
601        # it should contain the links MTU
602        ra = rx[ICMPv6ND_RA]
603        self.assertEqual(ra[ICMPv6NDOptMTU].mtu, mtu)
604
605        # it should contain the source's link layer address option
606        sll = ra[ICMPv6NDOptSrcLLAddr]
607        self.assertEqual(sll.lladdr, intf.local_mac)
608
609        if not pi_opt:
610            # the RA should not contain prefix information
611            self.assertFalse(ra.haslayer(
612                ICMPv6NDOptPrefixInfo))
613        else:
614            raos = rx.getlayer(ICMPv6NDOptPrefixInfo, 1)
615
616            # the options are nested in the scapy packet in way that i cannot
617            # decipher how to decode. this 1st layer of option always returns
618            # nested classes, so a direct obj1=obj2 comparison always fails.
619            # however, the getlayer(.., 2) does give one instance.
620            # so we cheat here and construct a new opt instance for comparison
621            rd = ICMPv6NDOptPrefixInfo(
622                prefixlen=raos.prefixlen,
623                prefix=raos.prefix,
624                L=raos.L,
625                A=raos.A)
626            if type(pi_opt) is list:
627                for ii in range(len(pi_opt)):
628                    self.assertEqual(pi_opt[ii], rd)
629                    rd = rx.getlayer(
630                        ICMPv6NDOptPrefixInfo, ii + 2)
631            else:
632                self.assertEqual(pi_opt, raos, 'Expected: %s, received: %s'
633                                 % (pi_opt.show(dump=True),
634                                    raos.show(dump=True)))
635
636    def send_and_expect_ra(self, intf, pkts, remark, dst_ip=None,
637                           filter_out_fn=is_ipv6_misc,
638                           opt=None):
639        intf.add_stream(pkts)
640        self.pg_enable_capture(self.pg_interfaces)
641        self.pg_start()
642        rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
643
644        self.assertEqual(len(rx), 1)
645        rx = rx[0]
646        self.validate_ra(intf, rx, dst_ip, pi_opt=opt)
647
648    def test_rs(self):
649        """ IPv6 Router Solicitation Exceptions
650
651        Test scenario:
652        """
653
654        #
655        # Before we begin change the IPv6 RA responses to use the unicast
656        # address - that way we will not confuse them with the periodic
657        # RAs which go to the mcast address
658        # Sit and wait for the first periodic RA.
659        #
660        # TODO
661        #
662        self.pg0.ip6_ra_config(send_unicast=1)
663
664        #
665        # An RS from a link source address
666        #  - expect an RA in return
667        #
668        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
669             IPv6(
670                 dst=self.pg0.local_ip6, src=self.pg0.remote_ip6) /
671             ICMPv6ND_RS())
672        pkts = [p]
673        self.send_and_expect_ra(self.pg0, pkts, "Genuine RS")
674
675        #
676        # For the next RS sent the RA should be rate limited
677        #
678        self.send_and_assert_no_replies(self.pg0, pkts, "RA rate limited")
679
680        #
681        # When we reconfigure the IPv6 RA config,
682        # we reset the RA rate limiting,
683        # so we need to do this before each test below so as not to drop
684        # packets for rate limiting reasons. Test this works here.
685        #
686        self.pg0.ip6_ra_config(send_unicast=1)
687        self.send_and_expect_ra(self.pg0, pkts, "Rate limit reset RS")
688
689        #
690        # An RS sent from a non-link local source
691        #
692        self.pg0.ip6_ra_config(send_unicast=1)
693        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
694             IPv6(dst=self.pg0.local_ip6,
695                  src="2002::ffff") /
696             ICMPv6ND_RS())
697        pkts = [p]
698        self.send_and_assert_no_replies(self.pg0, pkts,
699                                        "RS from non-link source")
700
701        #
702        # Source an RS from a link local address
703        #
704        self.pg0.ip6_ra_config(send_unicast=1)
705        ll = mk_ll_addr(self.pg0.remote_mac)
706        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
707             IPv6(dst=self.pg0.local_ip6, src=ll) /
708             ICMPv6ND_RS())
709        pkts = [p]
710        self.send_and_expect_ra(self.pg0, pkts,
711                                "RS sourced from link-local",
712                                dst_ip=ll)
713
714        #
715        # Send the RS multicast
716        #
717        self.pg0.ip6_ra_config(send_unicast=1)
718        dmac = in6_getnsmac(inet_pton(AF_INET6, "ff02::2"))
719        ll = mk_ll_addr(self.pg0.remote_mac)
720        p = (Ether(dst=dmac, src=self.pg0.remote_mac) /
721             IPv6(dst="ff02::2", src=ll) /
722             ICMPv6ND_RS())
723        pkts = [p]
724        self.send_and_expect_ra(self.pg0, pkts,
725                                "RS sourced from link-local",
726                                dst_ip=ll)
727
728        #
729        # Source from the unspecified address ::. This happens when the RS
730        # is sent before the host has a configured address/sub-net,
731        # i.e. auto-config. Since the sender has no IP address, the reply
732        # comes back mcast - so the capture needs to not filter this.
733        # If we happen to pick up the periodic RA at this point then so be it,
734        # it's not an error.
735        #
736        self.pg0.ip6_ra_config(send_unicast=1, suppress=1)
737        p = (Ether(dst=dmac, src=self.pg0.remote_mac) /
738             IPv6(dst="ff02::2", src="::") /
739             ICMPv6ND_RS())
740        pkts = [p]
741        self.send_and_expect_ra(self.pg0, pkts,
742                                "RS sourced from unspecified",
743                                dst_ip="ff02::1",
744                                filter_out_fn=None)
745
746        #
747        # Configure The RA to announce the links prefix
748        #
749        self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
750                               self.pg0.local_ip6_prefix_len))
751
752        #
753        # RAs should now contain the prefix information option
754        #
755        opt = ICMPv6NDOptPrefixInfo(
756            prefixlen=self.pg0.local_ip6_prefix_len,
757            prefix=self.pg0.local_ip6,
758            L=1,
759            A=1)
760
761        self.pg0.ip6_ra_config(send_unicast=1)
762        ll = mk_ll_addr(self.pg0.remote_mac)
763        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
764             IPv6(dst=self.pg0.local_ip6, src=ll) /
765             ICMPv6ND_RS())
766        self.send_and_expect_ra(self.pg0, p,
767                                "RA with prefix-info",
768                                dst_ip=ll,
769                                opt=opt)
770
771        #
772        # Change the prefix info to not off-link
773        #  L-flag is clear
774        #
775        self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
776                               self.pg0.local_ip6_prefix_len),
777                               off_link=1)
778
779        opt = ICMPv6NDOptPrefixInfo(
780            prefixlen=self.pg0.local_ip6_prefix_len,
781            prefix=self.pg0.local_ip6,
782            L=0,
783            A=1)
784
785        self.pg0.ip6_ra_config(send_unicast=1)
786        self.send_and_expect_ra(self.pg0, p,
787                                "RA with Prefix info with L-flag=0",
788                                dst_ip=ll,
789                                opt=opt)
790
791        #
792        # Change the prefix info to not off-link, no-autoconfig
793        #  L and A flag are clear in the advert
794        #
795        self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
796                               self.pg0.local_ip6_prefix_len),
797                               off_link=1,
798                               no_autoconfig=1)
799
800        opt = ICMPv6NDOptPrefixInfo(
801            prefixlen=self.pg0.local_ip6_prefix_len,
802            prefix=self.pg0.local_ip6,
803            L=0,
804            A=0)
805
806        self.pg0.ip6_ra_config(send_unicast=1)
807        self.send_and_expect_ra(self.pg0, p,
808                                "RA with Prefix info with A & L-flag=0",
809                                dst_ip=ll,
810                                opt=opt)
811
812        #
813        # Change the flag settings back to the defaults
814        #  L and A flag are set in the advert
815        #
816        self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
817                               self.pg0.local_ip6_prefix_len))
818
819        opt = ICMPv6NDOptPrefixInfo(
820            prefixlen=self.pg0.local_ip6_prefix_len,
821            prefix=self.pg0.local_ip6,
822            L=1,
823            A=1)
824
825        self.pg0.ip6_ra_config(send_unicast=1)
826        self.send_and_expect_ra(self.pg0, p,
827                                "RA with Prefix info",
828                                dst_ip=ll,
829                                opt=opt)
830
831        #
832        # Change the prefix info to not off-link, no-autoconfig
833        #  L and A flag are clear in the advert
834        #
835        self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
836                               self.pg0.local_ip6_prefix_len),
837                               off_link=1,
838                               no_autoconfig=1)
839
840        opt = ICMPv6NDOptPrefixInfo(
841            prefixlen=self.pg0.local_ip6_prefix_len,
842            prefix=self.pg0.local_ip6,
843            L=0,
844            A=0)
845
846        self.pg0.ip6_ra_config(send_unicast=1)
847        self.send_and_expect_ra(self.pg0, p,
848                                "RA with Prefix info with A & L-flag=0",
849                                dst_ip=ll,
850                                opt=opt)
851
852        #
853        # Use the reset to defaults option to revert to defaults
854        #  L and A flag are clear in the advert
855        #
856        self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
857                               self.pg0.local_ip6_prefix_len),
858                               use_default=1)
859
860        opt = ICMPv6NDOptPrefixInfo(
861            prefixlen=self.pg0.local_ip6_prefix_len,
862            prefix=self.pg0.local_ip6,
863            L=1,
864            A=1)
865
866        self.pg0.ip6_ra_config(send_unicast=1)
867        self.send_and_expect_ra(self.pg0, p,
868                                "RA with Prefix reverted to defaults",
869                                dst_ip=ll,
870                                opt=opt)
871
872        #
873        # Advertise Another prefix. With no L-flag/A-flag
874        #
875        self.pg0.ip6_ra_prefix('%s/%s' % (self.pg1.local_ip6,
876                               self.pg1.local_ip6_prefix_len),
877                               off_link=1,
878                               no_autoconfig=1)
879
880        opt = [ICMPv6NDOptPrefixInfo(
881            prefixlen=self.pg0.local_ip6_prefix_len,
882            prefix=self.pg0.local_ip6,
883            L=1,
884            A=1),
885            ICMPv6NDOptPrefixInfo(
886                prefixlen=self.pg1.local_ip6_prefix_len,
887                prefix=self.pg1.local_ip6,
888                L=0,
889                A=0)]
890
891        self.pg0.ip6_ra_config(send_unicast=1)
892        ll = mk_ll_addr(self.pg0.remote_mac)
893        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
894             IPv6(dst=self.pg0.local_ip6, src=ll) /
895             ICMPv6ND_RS())
896        self.send_and_expect_ra(self.pg0, p,
897                                "RA with multiple Prefix infos",
898                                dst_ip=ll,
899                                opt=opt)
900
901        #
902        # Remove the first prefix-info - expect the second is still in the
903        # advert
904        #
905        self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
906                               self.pg0.local_ip6_prefix_len),
907                               is_no=1)
908
909        opt = ICMPv6NDOptPrefixInfo(
910            prefixlen=self.pg1.local_ip6_prefix_len,
911            prefix=self.pg1.local_ip6,
912            L=0,
913            A=0)
914
915        self.pg0.ip6_ra_config(send_unicast=1)
916        self.send_and_expect_ra(self.pg0, p,
917                                "RA with Prefix reverted to defaults",
918                                dst_ip=ll,
919                                opt=opt)
920
921        #
922        # Remove the second prefix-info - expect no prefix-info in the adverts
923        #
924        self.pg0.ip6_ra_prefix('%s/%s' % (self.pg1.local_ip6,
925                               self.pg1.local_ip6_prefix_len),
926                               is_no=1)
927
928        self.pg0.ip6_ra_config(send_unicast=1)
929        self.send_and_expect_ra(self.pg0, p,
930                                "RA with Prefix reverted to defaults",
931                                dst_ip=ll)
932
933        #
934        # Reset the periodic advertisements back to default values
935        #
936        self.pg0.ip6_ra_config(no=1, suppress=1, send_unicast=0)
937
938
939class TestIPv6IfAddrRoute(VppTestCase):
940    """ IPv6 Interface Addr Route Test Case """
941
942    @classmethod
943    def setUpClass(cls):
944        super(TestIPv6IfAddrRoute, cls).setUpClass()
945
946    @classmethod
947    def tearDownClass(cls):
948        super(TestIPv6IfAddrRoute, cls).tearDownClass()
949
950    def setUp(self):
951        super(TestIPv6IfAddrRoute, self).setUp()
952
953        # create 1 pg interface
954        self.create_pg_interfaces(range(1))
955
956        for i in self.pg_interfaces:
957            i.admin_up()
958            i.config_ip6()
959            i.resolve_ndp()
960
961    def tearDown(self):
962        super(TestIPv6IfAddrRoute, self).tearDown()
963        for i in self.pg_interfaces:
964            i.unconfig_ip6()
965            i.admin_down()
966
967    def test_ipv6_ifaddrs_same_prefix(self):
968        """ IPv6 Interface Addresses Same Prefix test
969
970        Test scenario:
971
972            - Verify no route in FIB for prefix 2001:10::/64
973            - Configure IPv4 address 2001:10::10/64  on an interface
974            - Verify route in FIB for prefix 2001:10::/64
975            - Configure IPv4 address 2001:10::20/64 on an interface
976            - Delete 2001:10::10/64 from interface
977            - Verify route in FIB for prefix 2001:10::/64
978            - Delete 2001:10::20/64 from interface
979            - Verify no route in FIB for prefix 2001:10::/64
980        """
981
982        addr1 = "2001:10::10"
983        addr2 = "2001:10::20"
984
985        if_addr1 = VppIpInterfaceAddress(self, self.pg0,
986                                         VppIpAddress(addr1), 64)
987        if_addr2 = VppIpInterfaceAddress(self, self.pg0,
988                                         VppIpAddress(addr2), 64)
989        self.assertFalse(if_addr1.query_vpp_config())  # 2001:10::/64
990        self.assertFalse(find_route(self, addr1, 128))
991        self.assertFalse(find_route(self, addr2, 128))
992
993        # configure first address, verify route present
994        if_addr1.add_vpp_config()
995        self.assertTrue(if_addr1.query_vpp_config())  # 2001:10::/64
996        self.assertTrue(find_route(self, addr1, 128))
997        self.assertFalse(find_route(self, addr2, 128))
998
999        # configure second address, delete first, verify route not removed
1000        if_addr2.add_vpp_config()
1001        if_addr1.remove_vpp_config()
1002        self.assertTrue(if_addr1.query_vpp_config())  # 2001:10::/64
1003        self.assertFalse(find_route(self, addr1, 128))
1004        self.assertTrue(find_route(self, addr2, 128))
1005
1006        # delete second address, verify route removed
1007        if_addr2.remove_vpp_config()
1008        self.assertFalse(if_addr1.query_vpp_config())  # 2001:10::/64
1009        self.assertFalse(find_route(self, addr1, 128))
1010        self.assertFalse(find_route(self, addr2, 128))
1011
1012
1013class TestICMPv6Echo(VppTestCase):
1014    """ ICMPv6 Echo Test Case """
1015
1016    @classmethod
1017    def setUpClass(cls):
1018        super(TestICMPv6Echo, cls).setUpClass()
1019
1020    @classmethod
1021    def tearDownClass(cls):
1022        super(TestICMPv6Echo, cls).tearDownClass()
1023
1024    def setUp(self):
1025        super(TestICMPv6Echo, self).setUp()
1026
1027        # create 1 pg interface
1028        self.create_pg_interfaces(range(1))
1029
1030        for i in self.pg_interfaces:
1031            i.admin_up()
1032            i.config_ip6()
1033            i.resolve_ndp()
1034
1035    def tearDown(self):
1036        super(TestICMPv6Echo, self).tearDown()
1037        for i in self.pg_interfaces:
1038            i.unconfig_ip6()
1039            i.ip6_disable()
1040            i.admin_down()
1041
1042    def test_icmpv6_echo(self):
1043        """ VPP replies to ICMPv6 Echo Request
1044
1045        Test scenario:
1046
1047            - Receive ICMPv6 Echo Request message on pg0 interface.
1048            - Check outgoing ICMPv6 Echo Reply message on pg0 interface.
1049        """
1050
1051        icmpv6_id = 0xb
1052        icmpv6_seq = 5
1053        icmpv6_data = b'\x0a' * 18
1054        p_echo_request = (Ether(src=self.pg0.remote_mac,
1055                                dst=self.pg0.local_mac) /
1056                          IPv6(src=self.pg0.remote_ip6,
1057                               dst=self.pg0.local_ip6) /
1058                          ICMPv6EchoRequest(
1059                              id=icmpv6_id,
1060                              seq=icmpv6_seq,
1061                              data=icmpv6_data))
1062
1063        self.pg0.add_stream(p_echo_request)
1064        self.pg_enable_capture(self.pg_interfaces)
1065        self.pg_start()
1066
1067        rx = self.pg0.get_capture(1)
1068        rx = rx[0]
1069        ether = rx[Ether]
1070        ipv6 = rx[IPv6]
1071        icmpv6 = rx[ICMPv6EchoReply]
1072
1073        self.assertEqual(ether.src, self.pg0.local_mac)
1074        self.assertEqual(ether.dst, self.pg0.remote_mac)
1075
1076        self.assertEqual(ipv6.src, self.pg0.local_ip6)
1077        self.assertEqual(ipv6.dst, self.pg0.remote_ip6)
1078
1079        self.assertEqual(
1080            icmp6types[icmpv6.type], "Echo Reply")
1081        self.assertEqual(icmpv6.id, icmpv6_id)
1082        self.assertEqual(icmpv6.seq, icmpv6_seq)
1083        self.assertEqual(icmpv6.data, icmpv6_data)
1084
1085
1086class TestIPv6RD(TestIPv6ND):
1087    """ IPv6 Router Discovery Test Case """
1088
1089    @classmethod
1090    def setUpClass(cls):
1091        super(TestIPv6RD, cls).setUpClass()
1092
1093    @classmethod
1094    def tearDownClass(cls):
1095        super(TestIPv6RD, cls).tearDownClass()
1096
1097    def setUp(self):
1098        super(TestIPv6RD, self).setUp()
1099
1100        # create 2 pg interfaces
1101        self.create_pg_interfaces(range(2))
1102
1103        self.interfaces = list(self.pg_interfaces)
1104
1105        # setup all interfaces
1106        for i in self.interfaces:
1107            i.admin_up()
1108            i.config_ip6()
1109
1110    def tearDown(self):
1111        for i in self.interfaces:
1112            i.unconfig_ip6()
1113            i.admin_down()
1114        super(TestIPv6RD, self).tearDown()
1115
1116    def test_rd_send_router_solicitation(self):
1117        """ Verify router solicitation packets """
1118
1119        count = 2
1120        self.pg_enable_capture(self.pg_interfaces)
1121        self.pg_start()
1122        self.vapi.ip6nd_send_router_solicitation(self.pg1.sw_if_index,
1123                                                 mrc=count)
1124        rx_list = self.pg1.get_capture(count, timeout=3)
1125        self.assertEqual(len(rx_list), count)
1126        for packet in rx_list:
1127            self.assertEqual(packet.haslayer(IPv6), 1)
1128            self.assertEqual(packet[IPv6].haslayer(
1129                ICMPv6ND_RS), 1)
1130            dst = ip6_normalize(packet[IPv6].dst)
1131            dst2 = ip6_normalize("ff02::2")
1132            self.assert_equal(dst, dst2)
1133            src = ip6_normalize(packet[IPv6].src)
1134            src2 = ip6_normalize(self.pg1.local_ip6_ll)
1135            self.assert_equal(src, src2)
1136            self.assertTrue(
1137                bool(packet[ICMPv6ND_RS].haslayer(
1138                    ICMPv6NDOptSrcLLAddr)))
1139            self.assert_equal(
1140                packet[ICMPv6NDOptSrcLLAddr].lladdr,
1141                self.pg1.local_mac)
1142
1143    def verify_prefix_info(self, reported_prefix, prefix_option):
1144        prefix = IPv6Network(
1145            text_type(prefix_option.getfieldval("prefix") +
1146                      "/" +
1147                      text_type(prefix_option.getfieldval("prefixlen"))),
1148            strict=False)
1149        self.assert_equal(reported_prefix.prefix.network_address,
1150                          prefix.network_address)
1151        L = prefix_option.getfieldval("L")
1152        A = prefix_option.getfieldval("A")
1153        option_flags = (L << 7) | (A << 6)
1154        self.assert_equal(reported_prefix.flags, option_flags)
1155        self.assert_equal(reported_prefix.valid_time,
1156                          prefix_option.getfieldval("validlifetime"))
1157        self.assert_equal(reported_prefix.preferred_time,
1158                          prefix_option.getfieldval("preferredlifetime"))
1159
1160    def test_rd_receive_router_advertisement(self):
1161        """ Verify events triggered by received RA packets """
1162
1163        self.vapi.want_ip6_ra_events()
1164
1165        prefix_info_1 = ICMPv6NDOptPrefixInfo(
1166            prefix="1::2",
1167            prefixlen=50,
1168            validlifetime=200,
1169            preferredlifetime=500,
1170            L=1,
1171            A=1,
1172        )
1173
1174        prefix_info_2 = ICMPv6NDOptPrefixInfo(
1175            prefix="7::4",
1176            prefixlen=20,
1177            validlifetime=70,
1178            preferredlifetime=1000,
1179            L=1,
1180            A=0,
1181        )
1182
1183        p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1184             IPv6(dst=self.pg1.local_ip6_ll,
1185                  src=mk_ll_addr(self.pg1.remote_mac)) /
1186             ICMPv6ND_RA() /
1187             prefix_info_1 /
1188             prefix_info_2)
1189        self.pg1.add_stream([p])
1190        self.pg_start()
1191
1192        ev = self.vapi.wait_for_event(10, "ip6_ra_event")
1193
1194        self.assert_equal(ev.current_hop_limit, 0)
1195        self.assert_equal(ev.flags, 8)
1196        self.assert_equal(ev.router_lifetime_in_sec, 1800)
1197        self.assert_equal(ev.neighbor_reachable_time_in_msec, 0)
1198        self.assert_equal(
1199            ev.time_in_msec_between_retransmitted_neighbor_solicitations, 0)
1200
1201        self.assert_equal(ev.n_prefixes, 2)
1202
1203        self.verify_prefix_info(ev.prefixes[0], prefix_info_1)
1204        self.verify_prefix_info(ev.prefixes[1], prefix_info_2)
1205
1206
1207class TestIPv6RDControlPlane(TestIPv6ND):
1208    """ IPv6 Router Discovery Control Plane Test Case """
1209
1210    @classmethod
1211    def setUpClass(cls):
1212        super(TestIPv6RDControlPlane, cls).setUpClass()
1213
1214    @classmethod
1215    def tearDownClass(cls):
1216        super(TestIPv6RDControlPlane, cls).tearDownClass()
1217
1218    def setUp(self):
1219        super(TestIPv6RDControlPlane, self).setUp()
1220
1221        # create 1 pg interface
1222        self.create_pg_interfaces(range(1))
1223
1224        self.interfaces = list(self.pg_interfaces)
1225
1226        # setup all interfaces
1227        for i in self.interfaces:
1228            i.admin_up()
1229            i.config_ip6()
1230
1231    def tearDown(self):
1232        super(TestIPv6RDControlPlane, self).tearDown()
1233
1234    @staticmethod
1235    def create_ra_packet(pg, routerlifetime=None):
1236        src_ip = pg.remote_ip6_ll
1237        dst_ip = pg.local_ip6
1238        if routerlifetime is not None:
1239            ra = ICMPv6ND_RA(routerlifetime=routerlifetime)
1240        else:
1241            ra = ICMPv6ND_RA()
1242        p = (Ether(dst=pg.local_mac, src=pg.remote_mac) /
1243             IPv6(dst=dst_ip, src=src_ip) / ra)
1244        return p
1245
1246    @staticmethod
1247    def get_default_routes(fib):
1248        list = []
1249        for entry in fib:
1250            if entry.route.prefix.prefixlen == 0:
1251                for path in entry.route.paths:
1252                    if path.sw_if_index != 0xFFFFFFFF:
1253                        defaut_route = {}
1254                        defaut_route['sw_if_index'] = path.sw_if_index
1255                        defaut_route['next_hop'] = path.nh.address.ip6
1256                        list.append(defaut_route)
1257        return list
1258
1259    @staticmethod
1260    def get_interface_addresses(fib, pg):
1261        list = []
1262        for entry in fib:
1263            if entry.route.prefix.prefixlen == 128:
1264                path = entry.route.paths[0]
1265                if path.sw_if_index == pg.sw_if_index:
1266                    list.append(str(entry.route.prefix.network_address))
1267        return list
1268
1269    def test_all(self):
1270        """ Test handling of SLAAC addresses and default routes """
1271
1272        fib = self.vapi.ip_route_dump(0, True)
1273        default_routes = self.get_default_routes(fib)
1274        initial_addresses = set(self.get_interface_addresses(fib, self.pg0))
1275        self.assertEqual(default_routes, [])
1276        router_address = IPv6Address(text_type(self.pg0.remote_ip6_ll))
1277
1278        self.vapi.ip6_nd_address_autoconfig(self.pg0.sw_if_index, 1, 1)
1279
1280        self.sleep(0.1)
1281
1282        # send RA
1283        packet = (self.create_ra_packet(
1284            self.pg0) / ICMPv6NDOptPrefixInfo(
1285            prefix="1::",
1286            prefixlen=64,
1287            validlifetime=2,
1288            preferredlifetime=2,
1289            L=1,
1290            A=1,
1291        ) / ICMPv6NDOptPrefixInfo(
1292            prefix="7::",
1293            prefixlen=20,
1294            validlifetime=1500,
1295            preferredlifetime=1000,
1296            L=1,
1297            A=0,
1298        ))
1299        self.pg0.add_stream([packet])
1300        self.pg_start()
1301
1302        self.sleep(0.1)
1303
1304        fib = self.vapi.ip_route_dump(0, True)
1305
1306        # check FIB for new address
1307        addresses = set(self.get_interface_addresses(fib, self.pg0))
1308        new_addresses = addresses.difference(initial_addresses)
1309        self.assertEqual(len(new_addresses), 1)
1310        prefix = IPv6Network(text_type("%s/%d" % (list(new_addresses)[0], 20)),
1311                             strict=False)
1312        self.assertEqual(prefix, IPv6Network(text_type('1::/20')))
1313
1314        # check FIB for new default route
1315        default_routes = self.get_default_routes(fib)
1316        self.assertEqual(len(default_routes), 1)
1317        dr = default_routes[0]
1318        self.assertEqual(dr['sw_if_index'], self.pg0.sw_if_index)
1319        self.assertEqual(dr['next_hop'], router_address)
1320
1321        # send RA to delete default route
1322        packet = self.create_ra_packet(self.pg0, routerlifetime=0)
1323        self.pg0.add_stream([packet])
1324        self.pg_start()
1325
1326        self.sleep(0.1)
1327
1328        # check that default route is deleted
1329        fib = self.vapi.ip_route_dump(0, True)
1330        default_routes = self.get_default_routes(fib)
1331        self.assertEqual(len(default_routes), 0)
1332
1333        self.sleep(0.1)
1334
1335        # send RA
1336        packet = self.create_ra_packet(self.pg0)
1337        self.pg0.add_stream([packet])
1338        self.pg_start()
1339
1340        self.sleep(0.1)
1341
1342        # check FIB for new default route
1343        fib = self.vapi.ip_route_dump(0, True)
1344        default_routes = self.get_default_routes(fib)
1345        self.assertEqual(len(default_routes), 1)
1346        dr = default_routes[0]
1347        self.assertEqual(dr['sw_if_index'], self.pg0.sw_if_index)
1348        self.assertEqual(dr['next_hop'], router_address)
1349
1350        # send RA, updating router lifetime to 1s
1351        packet = self.create_ra_packet(self.pg0, 1)
1352        self.pg0.add_stream([packet])
1353        self.pg_start()
1354
1355        self.sleep(0.1)
1356
1357        # check that default route still exists
1358        fib = self.vapi.ip_route_dump(0, True)
1359        default_routes = self.get_default_routes(fib)
1360        self.assertEqual(len(default_routes), 1)
1361        dr = default_routes[0]
1362        self.assertEqual(dr['sw_if_index'], self.pg0.sw_if_index)
1363        self.assertEqual(dr['next_hop'], router_address)
1364
1365        self.sleep(1)
1366
1367        # check that default route is deleted
1368        fib = self.vapi.ip_route_dump(0, True)
1369        default_routes = self.get_default_routes(fib)
1370        self.assertEqual(len(default_routes), 0)
1371
1372        # check FIB still contains the SLAAC address
1373        addresses = set(self.get_interface_addresses(fib, self.pg0))
1374        new_addresses = addresses.difference(initial_addresses)
1375
1376        self.assertEqual(len(new_addresses), 1)
1377        prefix = IPv6Network(text_type("%s/%d" % (list(new_addresses)[0], 20)),
1378                             strict=False)
1379        self.assertEqual(prefix, IPv6Network(text_type('1::/20')))
1380
1381        self.sleep(1)
1382
1383        # check that SLAAC address is deleted
1384        fib = self.vapi.ip_route_dump(0, True)
1385        addresses = set(self.get_interface_addresses(fib, self.pg0))
1386        new_addresses = addresses.difference(initial_addresses)
1387        self.assertEqual(len(new_addresses), 0)
1388
1389
1390class IPv6NDProxyTest(TestIPv6ND):
1391    """ IPv6 ND ProxyTest Case """
1392
1393    @classmethod
1394    def setUpClass(cls):
1395        super(IPv6NDProxyTest, cls).setUpClass()
1396
1397    @classmethod
1398    def tearDownClass(cls):
1399        super(IPv6NDProxyTest, cls).tearDownClass()
1400
1401    def setUp(self):
1402        super(IPv6NDProxyTest, self).setUp()
1403
1404        # create 3 pg interfaces
1405        self.create_pg_interfaces(range(3))
1406
1407        # pg0 is the master interface, with the configured subnet
1408        self.pg0.admin_up()
1409        self.pg0.config_ip6()
1410        self.pg0.resolve_ndp()
1411
1412        self.pg1.ip6_enable()
1413        self.pg2.ip6_enable()
1414
1415    def tearDown(self):
1416        super(IPv6NDProxyTest, self).tearDown()
1417
1418    def test_nd_proxy(self):
1419        """ IPv6 Proxy ND """
1420
1421        #
1422        # Generate some hosts in the subnet that we are proxying
1423        #
1424        self.pg0.generate_remote_hosts(8)
1425
1426        nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
1427        d = inet_ntop(AF_INET6, nsma)
1428
1429        #
1430        # Send an NS for one of those remote hosts on one of the proxy links
1431        # expect no response since it's from an address that is not
1432        # on the link that has the prefix configured
1433        #
1434        ns_pg1 = (Ether(dst=in6_getnsmac(nsma), src=self.pg1.remote_mac) /
1435                  IPv6(dst=d,
1436                       src=self.pg0._remote_hosts[2].ip6) /
1437                  ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
1438                  ICMPv6NDOptSrcLLAddr(
1439                      lladdr=self.pg0._remote_hosts[2].mac))
1440
1441        self.send_and_assert_no_replies(self.pg1, ns_pg1, "Off link NS")
1442
1443        #
1444        # Add proxy support for the host
1445        #
1446        self.vapi.ip6nd_proxy_add_del(
1447            ip=inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
1448            sw_if_index=self.pg1.sw_if_index)
1449
1450        #
1451        # try that NS again. this time we expect an NA back
1452        #
1453        self.send_and_expect_na(self.pg1, ns_pg1,
1454                                "NS to proxy entry",
1455                                dst_ip=self.pg0._remote_hosts[2].ip6,
1456                                tgt_ip=self.pg0.local_ip6)
1457
1458        #
1459        # ... and that we have an entry in the ND cache
1460        #
1461        self.assertTrue(find_nbr(self,
1462                                 self.pg1.sw_if_index,
1463                                 self.pg0._remote_hosts[2].ip6))
1464
1465        #
1466        # ... and we can route traffic to it
1467        #
1468        t = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1469             IPv6(dst=self.pg0._remote_hosts[2].ip6,
1470                  src=self.pg0.remote_ip6) /
1471             inet6.UDP(sport=10000, dport=20000) /
1472             Raw('\xa5' * 100))
1473
1474        self.pg0.add_stream(t)
1475        self.pg_enable_capture(self.pg_interfaces)
1476        self.pg_start()
1477        rx = self.pg1.get_capture(1)
1478        rx = rx[0]
1479
1480        self.assertEqual(rx[Ether].dst, self.pg0._remote_hosts[2].mac)
1481        self.assertEqual(rx[Ether].src, self.pg1.local_mac)
1482
1483        self.assertEqual(rx[IPv6].src,
1484                         t[IPv6].src)
1485        self.assertEqual(rx[IPv6].dst,
1486                         t[IPv6].dst)
1487
1488        #
1489        # Test we proxy for the host on the main interface
1490        #
1491        ns_pg0 = (Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac) /
1492                  IPv6(dst=d, src=self.pg0.remote_ip6) /
1493                  ICMPv6ND_NS(
1494                      tgt=self.pg0._remote_hosts[2].ip6) /
1495                  ICMPv6NDOptSrcLLAddr(
1496                      lladdr=self.pg0.remote_mac))
1497
1498        self.send_and_expect_na(self.pg0, ns_pg0,
1499                                "NS to proxy entry on main",
1500                                tgt_ip=self.pg0._remote_hosts[2].ip6,
1501                                dst_ip=self.pg0.remote_ip6)
1502
1503        #
1504        # Setup and resolve proxy for another host on another interface
1505        #
1506        ns_pg2 = (Ether(dst=in6_getnsmac(nsma), src=self.pg2.remote_mac) /
1507                  IPv6(dst=d,
1508                       src=self.pg0._remote_hosts[3].ip6) /
1509                  ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
1510                  ICMPv6NDOptSrcLLAddr(
1511                      lladdr=self.pg0._remote_hosts[2].mac))
1512
1513        self.vapi.ip6nd_proxy_add_del(
1514            ip=inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
1515            sw_if_index=self.pg2.sw_if_index)
1516
1517        self.send_and_expect_na(self.pg2, ns_pg2,
1518                                "NS to proxy entry other interface",
1519                                dst_ip=self.pg0._remote_hosts[3].ip6,
1520                                tgt_ip=self.pg0.local_ip6)
1521
1522        self.assertTrue(find_nbr(self,
1523                                 self.pg2.sw_if_index,
1524                                 self.pg0._remote_hosts[3].ip6))
1525
1526        #
1527        # hosts can communicate. pg2->pg1
1528        #
1529        t2 = (Ether(dst=self.pg2.local_mac,
1530                    src=self.pg0.remote_hosts[3].mac) /
1531              IPv6(dst=self.pg0._remote_hosts[2].ip6,
1532                   src=self.pg0._remote_hosts[3].ip6) /
1533              inet6.UDP(sport=10000, dport=20000) /
1534              Raw('\xa5' * 100))
1535
1536        self.pg2.add_stream(t2)
1537        self.pg_enable_capture(self.pg_interfaces)
1538        self.pg_start()
1539        rx = self.pg1.get_capture(1)
1540        rx = rx[0]
1541
1542        self.assertEqual(rx[Ether].dst, self.pg0._remote_hosts[2].mac)
1543        self.assertEqual(rx[Ether].src, self.pg1.local_mac)
1544
1545        self.assertEqual(rx[IPv6].src,
1546                         t2[IPv6].src)
1547        self.assertEqual(rx[IPv6].dst,
1548                         t2[IPv6].dst)
1549
1550        #
1551        # remove the proxy configs
1552        #
1553        self.vapi.ip6nd_proxy_add_del(
1554            ip=inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
1555            sw_if_index=self.pg1.sw_if_index, is_del=1)
1556        self.vapi.ip6nd_proxy_add_del(
1557            ip=inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
1558            sw_if_index=self.pg2.sw_if_index, is_del=1)
1559
1560        self.assertFalse(find_nbr(self,
1561                                  self.pg2.sw_if_index,
1562                                  self.pg0._remote_hosts[3].ip6))
1563        self.assertFalse(find_nbr(self,
1564                                  self.pg1.sw_if_index,
1565                                  self.pg0._remote_hosts[2].ip6))
1566
1567        #
1568        # no longer proxy-ing...
1569        #
1570        self.send_and_assert_no_replies(self.pg0, ns_pg0, "Proxy unconfigured")
1571        self.send_and_assert_no_replies(self.pg1, ns_pg1, "Proxy unconfigured")
1572        self.send_and_assert_no_replies(self.pg2, ns_pg2, "Proxy unconfigured")
1573
1574        #
1575        # no longer forwarding. traffic generates NS out of the glean/main
1576        # interface
1577        #
1578        self.pg2.add_stream(t2)
1579        self.pg_enable_capture(self.pg_interfaces)
1580        self.pg_start()
1581
1582        rx = self.pg0.get_capture(1)
1583
1584        self.assertTrue(rx[0].haslayer(ICMPv6ND_NS))
1585
1586
1587class TestIPNull(VppTestCase):
1588    """ IPv6 routes via NULL """
1589
1590    @classmethod
1591    def setUpClass(cls):
1592        super(TestIPNull, cls).setUpClass()
1593
1594    @classmethod
1595    def tearDownClass(cls):
1596        super(TestIPNull, cls).tearDownClass()
1597
1598    def setUp(self):
1599        super(TestIPNull, self).setUp()
1600
1601        # create 2 pg interfaces
1602        self.create_pg_interfaces(range(1))
1603
1604        for i in self.pg_interfaces:
1605            i.admin_up()
1606            i.config_ip6()
1607            i.resolve_ndp()
1608
1609    def tearDown(self):
1610        super(TestIPNull, self).tearDown()
1611        for i in self.pg_interfaces:
1612            i.unconfig_ip6()
1613            i.admin_down()
1614
1615    def test_ip_null(self):
1616        """ IP NULL route """
1617
1618        p = (Ether(src=self.pg0.remote_mac,
1619                   dst=self.pg0.local_mac) /
1620             IPv6(src=self.pg0.remote_ip6, dst="2001::1") /
1621             inet6.UDP(sport=1234, dport=1234) /
1622             Raw('\xa5' * 100))
1623
1624        #
1625        # A route via IP NULL that will reply with ICMP unreachables
1626        #
1627        ip_unreach = VppIpRoute(
1628            self, "2001::", 64,
1629            [VppRoutePath("::", 0xffffffff,
1630                          type=FibPathType.FIB_PATH_TYPE_ICMP_UNREACH)])
1631        ip_unreach.add_vpp_config()
1632
1633        self.pg0.add_stream(p)
1634        self.pg_enable_capture(self.pg_interfaces)
1635        self.pg_start()
1636
1637        rx = self.pg0.get_capture(1)
1638        rx = rx[0]
1639        icmp = rx[ICMPv6DestUnreach]
1640
1641        # 0 = "No route to destination"
1642        self.assertEqual(icmp.code, 0)
1643
1644        # ICMP is rate limited. pause a bit
1645        self.sleep(1)
1646
1647        #
1648        # A route via IP NULL that will reply with ICMP prohibited
1649        #
1650        ip_prohibit = VppIpRoute(
1651            self, "2001::1", 128,
1652            [VppRoutePath("::", 0xffffffff,
1653                          type=FibPathType.FIB_PATH_TYPE_ICMP_PROHIBIT)])
1654        ip_prohibit.add_vpp_config()
1655
1656        self.pg0.add_stream(p)
1657        self.pg_enable_capture(self.pg_interfaces)
1658        self.pg_start()
1659
1660        rx = self.pg0.get_capture(1)
1661        rx = rx[0]
1662        icmp = rx[ICMPv6DestUnreach]
1663
1664        # 1 = "Communication with destination administratively prohibited"
1665        self.assertEqual(icmp.code, 1)
1666
1667
1668class TestIPDisabled(VppTestCase):
1669    """ IPv6 disabled """
1670
1671    @classmethod
1672    def setUpClass(cls):
1673        super(TestIPDisabled, cls).setUpClass()
1674
1675    @classmethod
1676    def tearDownClass(cls):
1677        super(TestIPDisabled, cls).tearDownClass()
1678
1679    def setUp(self):
1680        super(TestIPDisabled, self).setUp()
1681
1682        # create 2 pg interfaces
1683        self.create_pg_interfaces(range(2))
1684
1685        # PG0 is IP enabled
1686        self.pg0.admin_up()
1687        self.pg0.config_ip6()
1688        self.pg0.resolve_ndp()
1689
1690        # PG 1 is not IP enabled
1691        self.pg1.admin_up()
1692
1693    def tearDown(self):
1694        super(TestIPDisabled, self).tearDown()
1695        for i in self.pg_interfaces:
1696            i.unconfig_ip4()
1697            i.admin_down()
1698
1699    def test_ip_disabled(self):
1700        """ IP Disabled """
1701
1702        #
1703        # An (S,G).
1704        # one accepting interface, pg0, 2 forwarding interfaces
1705        #
1706        route_ff_01 = VppIpMRoute(
1707            self,
1708            "::",
1709            "ffef::1", 128,
1710            MRouteEntryFlags.MFIB_ENTRY_FLAG_NONE,
1711            [VppMRoutePath(self.pg1.sw_if_index,
1712                           MRouteItfFlags.MFIB_ITF_FLAG_ACCEPT),
1713             VppMRoutePath(self.pg0.sw_if_index,
1714                           MRouteItfFlags.MFIB_ITF_FLAG_FORWARD)])
1715        route_ff_01.add_vpp_config()
1716
1717        pu = (Ether(src=self.pg1.remote_mac,
1718                    dst=self.pg1.local_mac) /
1719              IPv6(src="2001::1", dst=self.pg0.remote_ip6) /
1720              inet6.UDP(sport=1234, dport=1234) /
1721              Raw('\xa5' * 100))
1722        pm = (Ether(src=self.pg1.remote_mac,
1723                    dst=self.pg1.local_mac) /
1724              IPv6(src="2001::1", dst="ffef::1") /
1725              inet6.UDP(sport=1234, dport=1234) /
1726              Raw('\xa5' * 100))
1727
1728        #
1729        # PG1 does not forward IP traffic
1730        #
1731        self.send_and_assert_no_replies(self.pg1, pu, "IPv6 disabled")
1732        self.send_and_assert_no_replies(self.pg1, pm, "IPv6 disabled")
1733
1734        #
1735        # IP enable PG1
1736        #
1737        self.pg1.config_ip6()
1738
1739        #
1740        # Now we get packets through
1741        #
1742        self.pg1.add_stream(pu)
1743        self.pg_enable_capture(self.pg_interfaces)
1744        self.pg_start()
1745        rx = self.pg0.get_capture(1)
1746
1747        self.pg1.add_stream(pm)
1748        self.pg_enable_capture(self.pg_interfaces)
1749        self.pg_start()
1750        rx = self.pg0.get_capture(1)
1751
1752        #
1753        # Disable PG1
1754        #
1755        self.pg1.unconfig_ip6()
1756
1757        #
1758        # PG1 does not forward IP traffic
1759        #
1760        self.send_and_assert_no_replies(self.pg1, pu, "IPv6 disabled")
1761        self.send_and_assert_no_replies(self.pg1, pm, "IPv6 disabled")
1762
1763
1764class TestIP6LoadBalance(VppTestCase):
1765    """ IPv6 Load-Balancing """
1766
1767    @classmethod
1768    def setUpClass(cls):
1769        super(TestIP6LoadBalance, cls).setUpClass()
1770
1771    @classmethod
1772    def tearDownClass(cls):
1773        super(TestIP6LoadBalance, cls).tearDownClass()
1774
1775    def setUp(self):
1776        super(TestIP6LoadBalance, self).setUp()
1777
1778        self.create_pg_interfaces(range(5))
1779
1780        mpls_tbl = VppMplsTable(self, 0)
1781        mpls_tbl.add_vpp_config()
1782
1783        for i in self.pg_interfaces:
1784            i.admin_up()
1785            i.config_ip6()
1786            i.resolve_ndp()
1787            i.enable_mpls()
1788
1789    def tearDown(self):
1790        for i in self.pg_interfaces:
1791            i.unconfig_ip6()
1792            i.admin_down()
1793            i.disable_mpls()
1794        super(TestIP6LoadBalance, self).tearDown()
1795
1796    def pg_send(self, input, pkts):
1797        self.vapi.cli("clear trace")
1798        input.add_stream(pkts)
1799        self.pg_enable_capture(self.pg_interfaces)
1800        self.pg_start()
1801
1802    def send_and_expect_load_balancing(self, input, pkts, outputs):
1803        self.pg_send(input, pkts)
1804        for oo in outputs:
1805            rx = oo._get_capture(1)
1806            self.assertNotEqual(0, len(rx))
1807
1808    def send_and_expect_one_itf(self, input, pkts, itf):
1809        self.pg_send(input, pkts)
1810        rx = itf.get_capture(len(pkts))
1811
1812    def test_ip6_load_balance(self):
1813        """ IPv6 Load-Balancing """
1814
1815        #
1816        # An array of packets that differ only in the destination port
1817        #  - IP only
1818        #  - MPLS EOS
1819        #  - MPLS non-EOS
1820        #  - MPLS non-EOS with an entropy label
1821        #
1822        port_ip_pkts = []
1823        port_mpls_pkts = []
1824        port_mpls_neos_pkts = []
1825        port_ent_pkts = []
1826
1827        #
1828        # An array of packets that differ only in the source address
1829        #
1830        src_ip_pkts = []
1831        src_mpls_pkts = []
1832
1833        for ii in range(NUM_PKTS):
1834            port_ip_hdr = (
1835                IPv6(dst="3000::1", src="3000:1::1") /
1836                inet6.UDP(sport=1234, dport=1234 + ii) /
1837                Raw('\xa5' * 100))
1838            port_ip_pkts.append((Ether(src=self.pg0.remote_mac,
1839                                       dst=self.pg0.local_mac) /
1840                                 port_ip_hdr))
1841            port_mpls_pkts.append((Ether(src=self.pg0.remote_mac,
1842                                         dst=self.pg0.local_mac) /
1843                                   MPLS(label=66, ttl=2) /
1844                                   port_ip_hdr))
1845            port_mpls_neos_pkts.append((Ether(src=self.pg0.remote_mac,
1846                                              dst=self.pg0.local_mac) /
1847                                        MPLS(label=67, ttl=2) /
1848                                        MPLS(label=77, ttl=2) /
1849                                        port_ip_hdr))
1850            port_ent_pkts.append((Ether(src=self.pg0.remote_mac,
1851                                        dst=self.pg0.local_mac) /
1852                                  MPLS(label=67, ttl=2) /
1853                                  MPLS(label=14, ttl=2) /
1854                                  MPLS(label=999, ttl=2) /
1855                                  port_ip_hdr))
1856            src_ip_hdr = (
1857                IPv6(dst="3000::1", src="3000:1::%d" % ii) /
1858                inet6.UDP(sport=1234, dport=1234) /
1859                Raw('\xa5' * 100))
1860            src_ip_pkts.append((Ether(src=self.pg0.remote_mac,
1861                                      dst=self.pg0.local_mac) /
1862                                src_ip_hdr))
1863            src_mpls_pkts.append((Ether(src=self.pg0.remote_mac,
1864                                        dst=self.pg0.local_mac) /
1865                                  MPLS(label=66, ttl=2) /
1866                                  src_ip_hdr))
1867
1868        #
1869        # A route for the IP packets
1870        #
1871        route_3000_1 = VppIpRoute(self, "3000::1", 128,
1872                                  [VppRoutePath(self.pg1.remote_ip6,
1873                                                self.pg1.sw_if_index),
1874                                   VppRoutePath(self.pg2.remote_ip6,
1875                                                self.pg2.sw_if_index)])
1876        route_3000_1.add_vpp_config()
1877
1878        #
1879        # a local-label for the EOS packets
1880        #
1881        binding = VppMplsIpBind(self, 66, "3000::1", 128, is_ip6=1)
1882        binding.add_vpp_config()
1883
1884        #
1885        # An MPLS route for the non-EOS packets
1886        #
1887        route_67 = VppMplsRoute(self, 67, 0,
1888                                [VppRoutePath(self.pg1.remote_ip6,
1889                                              self.pg1.sw_if_index,
1890                                              labels=[67]),
1891                                 VppRoutePath(self.pg2.remote_ip6,
1892                                              self.pg2.sw_if_index,
1893                                              labels=[67])])
1894        route_67.add_vpp_config()
1895
1896        #
1897        # inject the packet on pg0 - expect load-balancing across the 2 paths
1898        #  - since the default hash config is to use IP src,dst and port
1899        #    src,dst
1900        # We are not going to ensure equal amounts of packets across each link,
1901        # since the hash algorithm is statistical and therefore this can never
1902        # be guaranteed. But with 64 different packets we do expect some
1903        # balancing. So instead just ensure there is traffic on each link.
1904        #
1905        self.send_and_expect_load_balancing(self.pg0, port_ip_pkts,
1906                                            [self.pg1, self.pg2])
1907        self.send_and_expect_load_balancing(self.pg0, src_ip_pkts,
1908                                            [self.pg1, self.pg2])
1909        self.send_and_expect_load_balancing(self.pg0, port_mpls_pkts,
1910                                            [self.pg1, self.pg2])
1911        self.send_and_expect_load_balancing(self.pg0, src_mpls_pkts,
1912                                            [self.pg1, self.pg2])
1913        self.send_and_expect_load_balancing(self.pg0, port_mpls_neos_pkts,
1914                                            [self.pg1, self.pg2])
1915
1916        #
1917        # The packets with Entropy label in should not load-balance,
1918        # since the Entropy value is fixed.
1919        #
1920        self.send_and_expect_one_itf(self.pg0, port_ent_pkts, self.pg1)
1921
1922        #
1923        # change the flow hash config so it's only IP src,dst
1924        #  - now only the stream with differing source address will
1925        #    load-balance
1926        #
1927        self.vapi.set_ip_flow_hash(vrf_id=0, src=1, dst=1, sport=0, dport=0,
1928                                   is_ipv6=1)
1929
1930        self.send_and_expect_load_balancing(self.pg0, src_ip_pkts,
1931                                            [self.pg1, self.pg2])
1932        self.send_and_expect_load_balancing(self.pg0, src_mpls_pkts,
1933                                            [self.pg1, self.pg2])
1934        self.send_and_expect_one_itf(self.pg0, port_ip_pkts, self.pg2)
1935
1936        #
1937        # change the flow hash config back to defaults
1938        #
1939        self.vapi.set_ip_flow_hash(vrf_id=0, src=1, dst=1, sport=1, dport=1,
1940                                   is_ipv6=1)
1941
1942        #
1943        # Recursive prefixes
1944        #  - testing that 2 stages of load-balancing occurs and there is no
1945        #    polarisation (i.e. only 2 of 4 paths are used)
1946        #
1947        port_pkts = []
1948        src_pkts = []
1949
1950        for ii in range(257):
1951            port_pkts.append((Ether(src=self.pg0.remote_mac,
1952                                    dst=self.pg0.local_mac) /
1953                              IPv6(dst="4000::1",
1954                                   src="4000:1::1") /
1955                              inet6.UDP(sport=1234,
1956                                        dport=1234 + ii) /
1957                              Raw('\xa5' * 100)))
1958            src_pkts.append((Ether(src=self.pg0.remote_mac,
1959                                   dst=self.pg0.local_mac) /
1960                             IPv6(dst="4000::1",
1961                                  src="4000:1::%d" % ii) /
1962                             inet6.UDP(sport=1234, dport=1234) /
1963                             Raw('\xa5' * 100)))
1964
1965        route_3000_2 = VppIpRoute(self, "3000::2", 128,
1966                                  [VppRoutePath(self.pg3.remote_ip6,
1967                                                self.pg3.sw_if_index),
1968                                   VppRoutePath(self.pg4.remote_ip6,
1969                                                self.pg4.sw_if_index)])
1970        route_3000_2.add_vpp_config()
1971
1972        route_4000_1 = VppIpRoute(self, "4000::1", 128,
1973                                  [VppRoutePath("3000::1",
1974                                                0xffffffff),
1975                                   VppRoutePath("3000::2",
1976                                                0xffffffff)])
1977        route_4000_1.add_vpp_config()
1978
1979        #
1980        # inject the packet on pg0 - expect load-balancing across all 4 paths
1981        #
1982        self.vapi.cli("clear trace")
1983        self.send_and_expect_load_balancing(self.pg0, port_pkts,
1984                                            [self.pg1, self.pg2,
1985                                             self.pg3, self.pg4])
1986        self.send_and_expect_load_balancing(self.pg0, src_pkts,
1987                                            [self.pg1, self.pg2,
1988                                             self.pg3, self.pg4])
1989
1990        #
1991        # Recursive prefixes
1992        #  - testing that 2 stages of load-balancing no choices
1993        #
1994        port_pkts = []
1995
1996        for ii in range(257):
1997            port_pkts.append((Ether(src=self.pg0.remote_mac,
1998                                    dst=self.pg0.local_mac) /
1999                              IPv6(dst="6000::1",
2000                                   src="6000:1::1") /
2001                              inet6.UDP(sport=1234,
2002                                        dport=1234 + ii) /
2003                              Raw('\xa5' * 100)))
2004
2005        route_5000_2 = VppIpRoute(self, "5000::2", 128,
2006                                  [VppRoutePath(self.pg3.remote_ip6,
2007                                                self.pg3.sw_if_index)])
2008        route_5000_2.add_vpp_config()
2009
2010        route_6000_1 = VppIpRoute(self, "6000::1", 128,
2011                                  [VppRoutePath("5000::2",
2012                                                0xffffffff)])
2013        route_6000_1.add_vpp_config()
2014
2015        #
2016        # inject the packet on pg0 - expect load-balancing across all 4 paths
2017        #
2018        self.vapi.cli("clear trace")
2019        self.send_and_expect_one_itf(self.pg0, port_pkts, self.pg3)
2020
2021
2022class TestIP6Punt(VppTestCase):
2023    """ IPv6 Punt Police/Redirect """
2024
2025    @classmethod
2026    def setUpClass(cls):
2027        super(TestIP6Punt, cls).setUpClass()
2028
2029    @classmethod
2030    def tearDownClass(cls):
2031        super(TestIP6Punt, cls).tearDownClass()
2032
2033    def setUp(self):
2034        super(TestIP6Punt, self).setUp()
2035
2036        self.create_pg_interfaces(range(4))
2037
2038        for i in self.pg_interfaces:
2039            i.admin_up()
2040            i.config_ip6()
2041            i.resolve_ndp()
2042
2043    def tearDown(self):
2044        super(TestIP6Punt, self).tearDown()
2045        for i in self.pg_interfaces:
2046            i.unconfig_ip6()
2047            i.admin_down()
2048
2049    def test_ip_punt(self):
2050        """ IP6 punt police and redirect """
2051
2052        p = (Ether(src=self.pg0.remote_mac,
2053                   dst=self.pg0.local_mac) /
2054             IPv6(src=self.pg0.remote_ip6,
2055                  dst=self.pg0.local_ip6) /
2056             inet6.TCP(sport=1234, dport=1234) /
2057             Raw('\xa5' * 100))
2058
2059        pkts = p * 1025
2060
2061        #
2062        # Configure a punt redirect via pg1.
2063        #
2064        nh_addr = self.pg1.remote_ip6
2065        self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
2066                                   self.pg1.sw_if_index,
2067                                   nh_addr)
2068
2069        self.send_and_expect(self.pg0, pkts, self.pg1)
2070
2071        #
2072        # add a policer
2073        #
2074        policer = self.vapi.policer_add_del(b"ip6-punt", 400, 0, 10, 0,
2075                                            rate_type=1)
2076        self.vapi.ip_punt_police(policer.policer_index, is_ip6=1)
2077
2078        self.vapi.cli("clear trace")
2079        self.pg0.add_stream(pkts)
2080        self.pg_enable_capture(self.pg_interfaces)
2081        self.pg_start()
2082
2083        #
2084        # the number of packet received should be greater than 0,
2085        # but not equal to the number sent, since some were policed
2086        #
2087        rx = self.pg1._get_capture(1)
2088        self.assertGreater(len(rx), 0)
2089        self.assertLess(len(rx), len(pkts))
2090
2091        #
2092        # remove the policer. back to full rx
2093        #
2094        self.vapi.ip_punt_police(policer.policer_index, is_add=0, is_ip6=1)
2095        self.vapi.policer_add_del(b"ip6-punt", 400, 0, 10, 0,
2096                                  rate_type=1, is_add=0)
2097        self.send_and_expect(self.pg0, pkts, self.pg1)
2098
2099        #
2100        # remove the redirect. expect full drop.
2101        #
2102        self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
2103                                   self.pg1.sw_if_index,
2104                                   nh_addr,
2105                                   is_add=0)
2106        self.send_and_assert_no_replies(self.pg0, pkts,
2107                                        "IP no punt config")
2108
2109        #
2110        # Add a redirect that is not input port selective
2111        #
2112        self.vapi.ip_punt_redirect(0xffffffff,
2113                                   self.pg1.sw_if_index,
2114                                   nh_addr)
2115        self.send_and_expect(self.pg0, pkts, self.pg1)
2116
2117        self.vapi.ip_punt_redirect(0xffffffff,
2118                                   self.pg1.sw_if_index,
2119                                   nh_addr,
2120                                   is_add=0)
2121
2122    def test_ip_punt_dump(self):
2123        """ IP6 punt redirect dump"""
2124
2125        #
2126        # Configure a punt redirects
2127        #
2128        nh_addr = self.pg3.remote_ip6
2129        self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
2130                                   self.pg3.sw_if_index,
2131                                   nh_addr)
2132        self.vapi.ip_punt_redirect(self.pg1.sw_if_index,
2133                                   self.pg3.sw_if_index,
2134                                   nh_addr)
2135        self.vapi.ip_punt_redirect(self.pg2.sw_if_index,
2136                                   self.pg3.sw_if_index,
2137                                   '0::0')
2138
2139        #
2140        # Dump pg0 punt redirects
2141        #
2142        punts = self.vapi.ip_punt_redirect_dump(self.pg0.sw_if_index,
2143                                                is_ipv6=1)
2144        for p in punts:
2145            self.assertEqual(p.punt.rx_sw_if_index, self.pg0.sw_if_index)
2146
2147        #
2148        # Dump punt redirects for all interfaces
2149        #
2150        punts = self.vapi.ip_punt_redirect_dump(0xffffffff, is_ipv6=1)
2151        self.assertEqual(len(punts), 3)
2152        for p in punts:
2153            self.assertEqual(p.punt.tx_sw_if_index, self.pg3.sw_if_index)
2154        self.assertNotEqual(punts[1].punt.nh, self.pg3.remote_ip6)
2155        self.assertEqual(str(punts[2].punt.nh), '::')
2156
2157
2158class TestIPDeag(VppTestCase):
2159    """ IPv6 Deaggregate Routes """
2160
2161    @classmethod
2162    def setUpClass(cls):
2163        super(TestIPDeag, cls).setUpClass()
2164
2165    @classmethod
2166    def tearDownClass(cls):
2167        super(TestIPDeag, cls).tearDownClass()
2168
2169    def setUp(self):
2170        super(TestIPDeag, self).setUp()
2171
2172        self.create_pg_interfaces(range(3))
2173
2174        for i in self.pg_interfaces:
2175            i.admin_up()
2176            i.config_ip6()
2177            i.resolve_ndp()
2178
2179    def tearDown(self):
2180        super(TestIPDeag, self).tearDown()
2181        for i in self.pg_interfaces:
2182            i.unconfig_ip6()
2183            i.admin_down()
2184
2185    def test_ip_deag(self):
2186        """ IP Deag Routes """
2187
2188        #
2189        # Create a table to be used for:
2190        #  1 - another destination address lookup
2191        #  2 - a source address lookup
2192        #
2193        table_dst = VppIpTable(self, 1, is_ip6=1)
2194        table_src = VppIpTable(self, 2, is_ip6=1)
2195        table_dst.add_vpp_config()
2196        table_src.add_vpp_config()
2197
2198        #
2199        # Add a route in the default table to point to a deag/
2200        # second lookup in each of these tables
2201        #
2202        route_to_dst = VppIpRoute(self, "1::1", 128,
2203                                  [VppRoutePath("::",
2204                                                0xffffffff,
2205                                                nh_table_id=1)])
2206        route_to_src = VppIpRoute(
2207            self, "1::2", 128,
2208            [VppRoutePath("::",
2209                          0xffffffff,
2210                          nh_table_id=2,
2211                          type=FibPathType.FIB_PATH_TYPE_SOURCE_LOOKUP)])
2212
2213        route_to_dst.add_vpp_config()
2214        route_to_src.add_vpp_config()
2215
2216        #
2217        # packets to these destination are dropped, since they'll
2218        # hit the respective default routes in the second table
2219        #
2220        p_dst = (Ether(src=self.pg0.remote_mac,
2221                       dst=self.pg0.local_mac) /
2222                 IPv6(src="5::5", dst="1::1") /
2223                 inet6.TCP(sport=1234, dport=1234) /
2224                 Raw('\xa5' * 100))
2225        p_src = (Ether(src=self.pg0.remote_mac,
2226                       dst=self.pg0.local_mac) /
2227                 IPv6(src="2::2", dst="1::2") /
2228                 inet6.TCP(sport=1234, dport=1234) /
2229                 Raw('\xa5' * 100))
2230        pkts_dst = p_dst * 257
2231        pkts_src = p_src * 257
2232
2233        self.send_and_assert_no_replies(self.pg0, pkts_dst,
2234                                        "IP in dst table")
2235        self.send_and_assert_no_replies(self.pg0, pkts_src,
2236                                        "IP in src table")
2237
2238        #
2239        # add a route in the dst table to forward via pg1
2240        #
2241        route_in_dst = VppIpRoute(self, "1::1", 128,
2242                                  [VppRoutePath(self.pg1.remote_ip6,
2243                                                self.pg1.sw_if_index)],
2244                                  table_id=1)
2245        route_in_dst.add_vpp_config()
2246
2247        self.send_and_expect(self.pg0, pkts_dst, self.pg1)
2248
2249        #
2250        # add a route in the src table to forward via pg2
2251        #
2252        route_in_src = VppIpRoute(self, "2::2", 128,
2253                                  [VppRoutePath(self.pg2.remote_ip6,
2254                                                self.pg2.sw_if_index)],
2255                                  table_id=2)
2256        route_in_src.add_vpp_config()
2257        self.send_and_expect(self.pg0, pkts_src, self.pg2)
2258
2259        #
2260        # loop in the lookup DP
2261        #
2262        route_loop = VppIpRoute(self, "3::3", 128,
2263                                [VppRoutePath("::",
2264                                              0xffffffff)])
2265        route_loop.add_vpp_config()
2266
2267        p_l = (Ether(src=self.pg0.remote_mac,
2268                     dst=self.pg0.local_mac) /
2269               IPv6(src="3::4", dst="3::3") /
2270               inet6.TCP(sport=1234, dport=1234) /
2271               Raw('\xa5' * 100))
2272
2273        self.send_and_assert_no_replies(self.pg0, p_l * 257,
2274                                        "IP lookup loop")
2275
2276
2277class TestIP6Input(VppTestCase):
2278    """ IPv6 Input Exception Test Cases """
2279
2280    @classmethod
2281    def setUpClass(cls):
2282        super(TestIP6Input, cls).setUpClass()
2283
2284    @classmethod
2285    def tearDownClass(cls):
2286        super(TestIP6Input, cls).tearDownClass()
2287
2288    def setUp(self):
2289        super(TestIP6Input, self).setUp()
2290
2291        self.create_pg_interfaces(range(2))
2292
2293        for i in self.pg_interfaces:
2294            i.admin_up()
2295            i.config_ip6()
2296            i.resolve_ndp()
2297
2298    def tearDown(self):
2299        super(TestIP6Input, self).tearDown()
2300        for i in self.pg_interfaces:
2301            i.unconfig_ip6()
2302            i.admin_down()
2303
2304    def test_ip_input_icmp_reply(self):
2305        """ IP6 Input Exception - Return ICMP (3,0) """
2306        #
2307        # hop limit - ICMP replies
2308        #
2309        p_version = (Ether(src=self.pg0.remote_mac,
2310                           dst=self.pg0.local_mac) /
2311                     IPv6(src=self.pg0.remote_ip6,
2312                          dst=self.pg1.remote_ip6,
2313                          hlim=1) /
2314                     inet6.UDP(sport=1234, dport=1234) /
2315                     Raw('\xa5' * 100))
2316
2317        rx = self.send_and_expect(self.pg0, p_version * NUM_PKTS, self.pg0)
2318        rx = rx[0]
2319        icmp = rx[ICMPv6TimeExceeded]
2320
2321        # 0: "hop limit exceeded in transit",
2322        self.assertEqual((icmp.type, icmp.code), (3, 0))
2323
2324    icmpv6_data = '\x0a' * 18
2325    all_0s = "::"
2326    all_1s = "FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF"
2327
2328    @parameterized.expand([
2329        # Name, src, dst, l4proto, msg, timeout
2330        ("src='iface',   dst='iface'", None, None,
2331         inet6.UDP(sport=1234, dport=1234), "funky version", None),
2332        ("src='All 0's', dst='iface'", all_0s, None,
2333         ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1),
2334        ("src='iface',   dst='All 0's'", None, all_0s,
2335         ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1),
2336        ("src='All 1's', dst='iface'", all_1s, None,
2337         ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1),
2338        ("src='iface',   dst='All 1's'", None, all_1s,
2339         ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1),
2340        ("src='All 1's', dst='All 1's'", all_1s, all_1s,
2341         ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1),
2342
2343    ])
2344    def test_ip_input_no_replies(self, name, src, dst, l4, msg, timeout):
2345
2346        self._testMethodDoc = 'IPv6 Input Exception - %s' % name
2347
2348        p_version = (Ether(src=self.pg0.remote_mac,
2349                           dst=self.pg0.local_mac) /
2350                     IPv6(src=src or self.pg0.remote_ip6,
2351                          dst=dst or self.pg1.remote_ip6,
2352                          version=3) /
2353                     l4 /
2354                     Raw('\xa5' * 100))
2355
2356        self.send_and_assert_no_replies(self.pg0, p_version * NUM_PKTS,
2357                                        remark=msg or "",
2358                                        timeout=timeout)
2359
2360    def test_hop_by_hop(self):
2361        """ Hop-by-hop header test """
2362
2363        p = (Ether(src=self.pg0.remote_mac,
2364                   dst=self.pg0.local_mac) /
2365             IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) /
2366             IPv6ExtHdrHopByHop() /
2367             inet6.UDP(sport=1234, dport=1234) /
2368             Raw('\xa5' * 100))
2369
2370        self.pg0.add_stream(p)
2371        self.pg_enable_capture(self.pg_interfaces)
2372        self.pg_start()
2373
2374if __name__ == '__main__':
2375    unittest.main(testRunner=VppTestRunner)
2376