stl_capture_test.py revision b04b41a9
1#!/router/bin/python
2from .stl_general_test import CStlGeneral_Test, CTRexScenario
3from trex_stl_lib.api import *
4import os, sys
5import pprint
6
7def ip2num (ip_str):
8    return struct.unpack('>L', socket.inet_pton(socket.AF_INET, ip_str))[0]
9
10def num2ip (ip_num):
11    return socket.inet_ntoa(struct.pack('>L', ip_num))
12
13def ip_add (ip_str, cnt):
14    return num2ip(ip2num(ip_str) + cnt)
15
16
17class STLCapture_Test(CStlGeneral_Test):
18    """Tests for capture packets"""
19
20    def setUp(self):
21        CStlGeneral_Test.setUp(self)
22
23        if not self.is_loopback:
24            self.skip('capture tests are skipped on a non-loopback machine')
25
26        assert 'bi' in CTRexScenario.stl_ports_map
27
28        self.c = CTRexScenario.stl_trex
29
30        self.tx_port, self.rx_port = CTRexScenario.stl_ports_map['bi'][0]
31
32        self.c.connect()
33        self.c.reset(ports = [self.tx_port, self.rx_port])
34
35        self.pkt = STLPktBuilder(pkt = Ether()/IP(src="16.0.0.1",dst="48.0.0.1")/UDP(dport=12,sport=1025)/IP()/'a_payload_example')
36
37        self.percentage = 5 if self.is_virt_nics else 50
38
39
40    @classmethod
41    def tearDownClass(cls):
42        if CTRexScenario.stl_init_error:
43            return
44        # connect back at end of tests
45        if not cls.is_connected():
46            CTRexScenario.stl_trex.connect()
47
48
49    # a simple capture test - inject packets and see the packets arrived the same
50    def test_basic_capture (self):
51        pkt_count = 100
52
53        try:
54            # move to service mode
55            self.c.set_service_mode(ports = self.rx_port)
56            # start a capture
57            rc = self.c.start_capture(rx_ports = [self.rx_port], limit = pkt_count)
58
59            # inject few packets with a VM
60            vm = STLScVmRaw( [STLVmFlowVar ( "ip_src",  min_value="16.0.0.0", max_value="16.255.255.255", size=4, step = 7, op = "inc"),
61                              STLVmWrFlowVar (fv_name="ip_src", pkt_offset= "IP.src"),
62                              STLVmFixIpv4(offset = "IP")
63                              ]
64                             );
65
66            pkt = STLPktBuilder(pkt = Ether()/IP(src="16.0.0.1",dst="48.0.0.1")/UDP(dport=12,sport=1025)/IP()/'a_payload_example',
67                                vm = vm)
68
69            stream = STLStream(name = 'burst',
70                               packet = pkt,
71                               mode = STLTXSingleBurst(total_pkts = pkt_count,
72                                                       percentage = self.percentage)
73                               )
74
75            self.c.add_streams(ports = self.tx_port, streams = [stream])
76
77            self.c.start(ports = self.tx_port, force = True)
78            self.c.wait_on_traffic(ports = self.tx_port)
79
80            pkt_list = []
81            self.c.stop_capture(rc['id'], output = pkt_list)
82
83            assert (len(pkt_list) == pkt_count)
84
85            # generate all the values that should be
86            expected_src_ips = [ip_add('16.0.0.0', i * 7) for i in range(pkt_count)]
87
88            for i, pkt in enumerate(pkt_list):
89                pkt_scapy = Ether(pkt['binary'])
90                pkt_ts    = pkt['ts']
91
92                assert('IP' in pkt_scapy)
93                assert(pkt_scapy['IP'].src in expected_src_ips)
94
95                # remove the match
96                del expected_src_ips[expected_src_ips.index(pkt_scapy['IP'].src)]
97
98
99        except STLError as e:
100            assert False , '{0}'.format(e)
101
102        finally:
103            self.c.remove_all_captures()
104            self.c.set_service_mode(ports = self.rx_port, enabled = False)
105
106
107
108
109    # in this test we apply captures under traffic multiple times
110    def test_stress_capture (self):
111        pkt_count = 100
112
113        try:
114            # move to service mode
115            self.c.set_service_mode(ports = self.rx_port)
116
117            # start heavy traffic
118            pkt = STLPktBuilder(pkt = Ether()/IP(src="16.0.0.1",dst="48.0.0.1")/UDP(dport=12,sport=1025)/IP()/'a_payload_example')
119
120            stream = STLStream(name = 'burst',
121                               packet = pkt,
122                               mode = STLTXCont(percentage = self.percentage)
123                               )
124
125            self.c.add_streams(ports = self.tx_port, streams = [stream])
126            self.c.start(ports = self.tx_port, force = True)
127
128            captures = [{'capture_id': None, 'limit': 50}, {'capture_id': None, 'limit': 80}, {'capture_id': None, 'limit': 100}]
129
130            for i in range(0, 100):
131                # start a few captures
132                for capture in captures:
133                    capture['capture_id'] = self.c.start_capture(rx_ports = [self.rx_port], limit = capture['limit'])['id']
134
135                # a little time to wait for captures to be full
136                server_captures = self.c.get_capture_status()
137
138                for capture in captures:
139                    capture_id = capture['capture_id']
140
141                    # make sure the server registers us and we are full
142                    assert(capture['capture_id'] in server_captures.keys())
143                    assert(server_captures[capture_id]['count'] == capture['limit'])
144
145                    # fetch packets
146                    pkt_list = []
147                    self.c.stop_capture(capture['capture_id'], pkt_list)
148                    assert (len(pkt_list) == capture['limit'])
149
150                    # a little sanity per packet
151                    for pkt in pkt_list:
152                        scapy_pkt = Ether(pkt['binary'])
153                        assert(scapy_pkt['IP'].src == '16.0.0.1')
154                        assert(scapy_pkt['IP'].dst == '48.0.0.1')
155
156        except STLError as e:
157            assert False , '{0}'.format(e)
158
159        finally:
160            self.c.remove_all_captures()
161            self.c.set_service_mode(ports = self.rx_port, enabled = False)
162
163
164    # in this test we capture and analyze the ARP request / response
165    def test_arp_capture (self):
166        if self.c.get_port_attr(self.tx_port)['layer_mode'] != 'IPv4':
167            return self.skip('skipping ARP capture test for non-ipv4 config on port {0}'.format(self.tx_port))
168
169        if self.c.get_port_attr(self.rx_port)['layer_mode'] != 'IPv4':
170            return self.skip('skipping ARP capture test for non-ipv4 config on port {0}'.format(self.rx_port))
171
172        try:
173            # move to service mode
174            self.c.set_service_mode(ports = [self.tx_port, self.rx_port])
175
176            # start a capture
177            capture_info = self.c.start_capture(rx_ports = [self.tx_port, self.rx_port], limit = 2)
178
179            # generate an ARP request
180            self.c.arp(ports = self.tx_port)
181
182            pkts = []
183            self.c.stop_capture(capture_info['id'], output = pkts)
184
185            assert len(pkts) == 2
186
187            # find the correct order
188            if pkts[0]['port'] == self.rx_port:
189                request  = pkts[0]
190                response = pkts[1]
191            else:
192                request  = pkts[1]
193                response = pkts[0]
194
195            assert request['port']  == self.rx_port
196            assert response['port'] == self.tx_port
197
198            arp_request, arp_response = Ether(request['binary']), Ether(response['binary'])
199            assert 'ARP' in arp_request
200            assert 'ARP' in arp_response
201
202            assert arp_request['ARP'].op  == 1
203            assert arp_response['ARP'].op == 2
204
205            assert arp_request['ARP'].pdst == arp_response['ARP'].psrc
206
207
208        except STLError as e:
209            assert False , '{0}'.format(e)
210
211        finally:
212             self.c.remove_all_captures()
213             self.c.set_service_mode(ports = [self.tx_port, self.rx_port], enabled = False)
214
215
216    # test PING
217    def test_ping_capture (self):
218        if self.c.get_port_attr(self.tx_port)['layer_mode'] != 'IPv4':
219            return self.skip('skipping ARP capture test for non-ipv4 config on port {0}'.format(self.tx_port))
220
221        if self.c.get_port_attr(self.rx_port)['layer_mode'] != 'IPv4':
222            return self.skip('skipping ARP capture test for non-ipv4 config on port {0}'.format(self.rx_port))
223
224        try:
225            # move to service mode
226            self.c.set_service_mode(ports = [self.tx_port, self.rx_port])
227
228            # start a capture
229            capture_info = self.c.start_capture(rx_ports = [self.tx_port, self.rx_port], limit = 100)
230
231            # generate an ARP request
232            tx_ipv4 = self.c.get_port_attr(port = self.tx_port)['src_ipv4']
233            rx_ipv4 = self.c.get_port_attr(port = self.rx_port)['src_ipv4']
234
235            count = 50
236
237            self.c.ping_ip(src_port = self.tx_port, dst_ip = rx_ipv4, pkt_size = 1500, count = count, interval_sec = 0.01)
238
239            pkts = []
240            self.c.stop_capture(capture_info['id'], output = pkts)
241
242            req_pkts = [Ether(pkt['binary']) for pkt in pkts if pkt['port'] == self.rx_port]
243            res_pkts = [Ether(pkt['binary']) for pkt in pkts if pkt['port'] == self.tx_port]
244            assert len(req_pkts) == count
245            assert len(res_pkts) == count
246
247            for req_pkt in req_pkts:
248                assert 'ICMP' in req_pkt
249                assert req_pkt['IP'].src == tx_ipv4
250                assert req_pkt['IP'].dst == rx_ipv4
251                assert req_pkt['ICMP'].type == 8
252                assert len(req_pkt) == 1500
253
254            for res_pkt in res_pkts:
255                assert 'ICMP' in res_pkt
256                assert res_pkt['IP'].src == rx_ipv4
257                assert res_pkt['IP'].dst == tx_ipv4
258                assert res_pkt['ICMP'].type == 0
259                assert len(res_pkt) == 1500
260
261
262        except STLError as e:
263            assert False , '{0}'.format(e)
264
265        finally:
266             self.c.remove_all_captures()
267             self.c.set_service_mode(ports = [self.tx_port, self.rx_port], enabled = False)
268
269
270