1#!/usr/bin/env python3
2
3import unittest
4
5from framework import VppTestCase, VppTestRunner
6from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
7from ipaddress import *
8
9import scapy.compat
10from scapy.contrib.mpls import MPLS
11from scapy.layers.inet import IP, UDP, TCP, ICMP, icmptypes, icmpcodes
12from scapy.layers.l2 import Ether
13from scapy.packet import Raw
14from scapy.layers.dns import DNSRR, DNS, DNSQR
15
16
17class TestDns(VppTestCase):
18    """ Dns Test Cases """
19
20    @classmethod
21    def setUpClass(cls):
22        super(TestDns, cls).setUpClass()
23
24    @classmethod
25    def tearDownClass(cls):
26        super(TestDns, cls).tearDownClass()
27
28    def setUp(self):
29        super(TestDns, self).setUp()
30
31        self.create_pg_interfaces(range(1))
32
33        for i in self.pg_interfaces:
34            i.admin_up()
35            i.config_ip4()
36            i.resolve_arp()
37
38    def tearDown(self):
39        super(TestDns, self).tearDown()
40
41    def create_stream(self, src_if):
42        """Create input packet stream for defined interface.
43
44        :param VppInterface src_if: Interface to create packet stream for.
45        """
46        good_request = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
47                        IP(src=src_if.remote_ip4) /
48                        UDP(sport=1234, dport=53) /
49                        DNS(rd=1, qd=DNSQR(qname="bozo.clown.org")))
50
51        bad_request = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
52                       IP(src=src_if.remote_ip4) /
53                       UDP(sport=1234, dport=53) /
54                       DNS(rd=1, qd=DNSQR(qname="no.clown.org")))
55        pkts = [good_request, bad_request]
56        return pkts
57
58    def verify_capture(self, dst_if, capture):
59        """Verify captured input packet stream for defined interface.
60
61        :param VppInterface dst_if: Interface to verify captured packet stream
62            for.
63        :param list capture: Captured packet stream.
64        """
65        self.logger.info("Verifying capture on interface %s" % dst_if.name)
66        for packet in capture:
67            dns = packet[DNS]
68            self.assertEqual(dns.an[0].rdata, '1.2.3.4')
69
70    def test_dns_unittest(self):
71        """ DNS Name Resolver Basic Functional Test """
72
73        # Set up an upstream name resolver. We won't actually go there
74        self.vapi.dns_name_server_add_del(
75            is_ip6=0, is_add=1, server_address=IPv4Address(u'8.8.8.8').packed)
76
77        # Enable name resolution
78        self.vapi.dns_enable_disable(enable=1)
79
80        # Manually add a static dns cache entry
81        self.logger.info(self.vapi.cli("dns cache add bozo.clown.org 1.2.3.4"))
82
83        # Test the binary API
84        rv = self.vapi.dns_resolve_name(name=b'bozo.clown.org')
85        self.assertEqual(rv.ip4_address, IPv4Address(u'1.2.3.4').packed)
86
87        # Configure 127.0.0.1/8 on the pg interface
88        self.vapi.sw_interface_add_del_address(
89            sw_if_index=self.pg0.sw_if_index,
90            prefix="127.0.0.1/8")
91
92        # Send a couple of DNS request packets, one for bozo.clown.org
93        # and one for no.clown.org which won't resolve
94
95        pkts = self.create_stream(self.pg0)
96        self.pg0.add_stream(pkts)
97        self.pg_enable_capture(self.pg_interfaces)
98
99        self.pg_start()
100        pkts = self.pg0.get_capture(1)
101        self.verify_capture(self.pg0, pkts)
102
103        # Make sure that the cache contents are correct
104        str = self.vapi.cli("show dns cache verbose")
105        self.assertIn('1.2.3.4', str)
106        self.assertIn('[P] no.clown.org:', str)
107
108if __name__ == '__main__':
109    unittest.main(testRunner=VppTestRunner)
110