stl_client_test.py revision 8bd778d0
1#!/router/bin/python
2from .stl_general_test import CStlGeneral_Test, CTRexScenario
3from trex_stl_lib.api import *
4import os, sys
5import glob
6
7
8def get_error_in_percentage (golden, value):
9    return abs(golden - value) / float(golden)
10
11def get_stl_profiles ():
12    profiles_path = os.path.join(CTRexScenario.scripts_path, 'stl/')
13    py_profiles = glob.glob(profiles_path + "/*.py")
14    yaml_profiles = glob.glob(profiles_path + "yaml/*.yaml")
15    return py_profiles + yaml_profiles
16
17
18class STLClient_Test(CStlGeneral_Test):
19    """Tests for stateless client"""
20
21    def setUp(self):
22        CStlGeneral_Test.setUp(self)
23
24        if self.is_virt_nics:
25            self.percentage = 5
26            self.pps = 500
27        else:
28            self.percentage = 50
29            self.pps = 50000
30
31        # strict mode is only for 'wire only' connection
32        self.strict = True if (self.is_loopback and not self.is_virt_nics) else False
33
34        assert 'bi' in CTRexScenario.stl_ports_map
35
36        self.c = CTRexScenario.stl_trex
37
38        self.tx_port, self.rx_port = CTRexScenario.stl_ports_map['bi'][0]
39
40        self.c.connect()
41        self.c.reset(ports = [self.tx_port, self.rx_port])
42
43        self.pkt = STLPktBuilder(pkt = Ether()/IP(src="16.0.0.1",dst="48.0.0.1")/UDP(dport=12,sport=1025)/IP()/'a_payload_example')
44        self.profiles = get_stl_profiles()
45
46
47    @classmethod
48    def tearDownClass(cls):
49        if CTRexScenario.stl_init_error:
50            return
51        # connect back at end of tests
52        if not cls.is_connected():
53            CTRexScenario.stl_trex.connect()
54
55
56    def verify (self, expected, got):
57        if self.strict:
58            assert expected == got
59        else:
60            assert get_error_in_percentage(expected, got) < 0.05
61
62
63    def test_basic_connect_disconnect (self):
64        try:
65            self.c.connect()
66            assert self.c.is_connected(), 'client should be connected'
67            self.c.disconnect()
68            assert not self.c.is_connected(), 'client should be disconnected'
69
70        except STLError as e:
71            assert False , '{0}'.format(e)
72
73
74    def test_basic_single_burst (self):
75
76        try:
77            b1 = STLStream(name = 'burst',
78                           packet = self.pkt,
79                           mode = STLTXSingleBurst(total_pkts = 100,
80                                                   percentage = self.percentage)
81                           )
82
83            for i in range(0, 5):
84                self.c.add_streams([b1], ports = [self.tx_port, self.rx_port])
85
86                self.c.clear_stats()
87                self.c.start(ports = [self.tx_port, self.rx_port])
88
89                self.c.wait_on_traffic(ports = [self.tx_port, self.rx_port])
90                stats = self.c.get_stats()
91
92                assert self.tx_port in stats
93                assert self.rx_port in stats
94
95                self.verify(100, stats[self.tx_port]['opackets'])
96                self.verify(100, stats[self.rx_port]['ipackets'])
97
98                self.verify(100, stats[self.rx_port]['opackets'])
99                self.verify(100, stats[self.tx_port]['ipackets'])
100
101
102                self.c.remove_all_streams(ports = [self.tx_port, self.rx_port])
103
104
105
106        except STLError as e:
107            assert False , '{0}'.format(e)
108
109
110    #
111    def test_basic_multi_burst (self):
112        try:
113            b1 = STLStream(name = 'burst',
114                           packet = self.pkt,
115                           mode = STLTXMultiBurst(pkts_per_burst = 10,
116                                                  count =  20,
117                                                  percentage = self.percentage)
118                           )
119
120            for i in range(0, 5):
121                self.c.add_streams([b1], ports = [self.tx_port, self.rx_port])
122
123                self.c.clear_stats()
124                self.c.start(ports = [self.tx_port, self.rx_port])
125
126                self.c.wait_on_traffic(ports = [self.tx_port, self.rx_port])
127                stats = self.c.get_stats()
128
129                assert self.tx_port in stats
130                assert self.rx_port in stats
131
132                self.verify(200, stats[self.tx_port]['opackets'])
133                self.verify(200, stats[self.rx_port]['ipackets'])
134
135                self.verify(200, stats[self.rx_port]['opackets'])
136                self.verify(200, stats[self.tx_port]['ipackets'])
137
138                self.c.remove_all_streams(ports = [self.tx_port, self.rx_port])
139
140
141
142        except STLError as e:
143            assert False , '{0}'.format(e)
144
145
146    #
147    def test_basic_cont (self):
148        pps = self.pps
149        duration = 0.1
150        golden = pps * duration
151
152        try:
153            b1 = STLStream(name = 'burst',
154                           packet = self.pkt,
155                           mode = STLTXCont(pps = pps)
156                           )
157
158            for i in range(0, 5):
159                self.c.add_streams([b1], ports = [self.tx_port, self.rx_port])
160
161                self.c.clear_stats()
162                self.c.start(ports = [self.tx_port, self.rx_port], duration = duration)
163
164                assert self.c.ports[self.tx_port].is_transmitting(), 'port should be active'
165                assert self.c.ports[self.rx_port].is_transmitting(), 'port should be active'
166
167                self.c.wait_on_traffic(ports = [self.tx_port, self.rx_port])
168                stats = self.c.get_stats()
169
170                assert self.tx_port in stats
171                assert self.rx_port in stats
172
173                # cont. with duration should be quite percise - 5% error is relaxed enough
174
175                assert get_error_in_percentage(stats[self.tx_port]['opackets'], golden) < 0.05
176                assert get_error_in_percentage(stats[self.rx_port]['ipackets'], golden) < 0.05
177
178                assert get_error_in_percentage(stats[self.rx_port]['opackets'], golden) < 0.05
179                assert get_error_in_percentage(stats[self.tx_port]['ipackets'], golden) < 0.05
180
181
182                self.c.remove_all_streams(ports = [self.tx_port, self.rx_port])
183
184
185
186        except STLError as e:
187            assert False , '{0}'.format(e)
188
189
190    def test_stress_connect_disconnect (self):
191        try:
192            for i in range(0, 100):
193                self.c.connect()
194                assert self.c.is_connected(), 'client should be connected'
195                self.c.disconnect()
196                assert not self.c.is_connected(), 'client should be disconnected'
197
198
199        except STLError as e:
200            assert False , '{0}'.format(e)
201
202
203
204    def test_stress_tx (self):
205        try:
206            s1 = STLStream(name = 'stress',
207                           packet = self.pkt,
208                           mode = STLTXCont(percentage = self.percentage))
209
210            # add both streams to ports
211            self.c.add_streams([s1], ports = [self.tx_port, self.rx_port])
212            for i in range(0, 100):
213
214                self.c.start(ports = [self.tx_port, self.rx_port])
215
216                assert self.c.ports[self.tx_port].is_transmitting(), 'port should be active'
217                assert self.c.ports[self.rx_port].is_transmitting(), 'port should be active'
218
219                self.c.pause(ports = [self.tx_port, self.rx_port])
220
221                assert self.c.ports[self.tx_port].is_paused(), 'port should be paused'
222                assert self.c.ports[self.rx_port].is_paused(), 'port should be paused'
223
224                self.c.resume(ports = [self.tx_port, self.rx_port])
225
226                assert self.c.ports[self.tx_port].is_transmitting(), 'port should be active'
227                assert self.c.ports[self.rx_port].is_transmitting(), 'port should be active'
228
229                self.c.stop(ports = [self.tx_port, self.rx_port])
230
231                assert not self.c.ports[self.tx_port].is_active(), 'port should be idle'
232                assert not self.c.ports[self.rx_port].is_active(), 'port should be idle'
233
234        except STLError as e:
235            assert False , '{0}'.format(e)
236
237
238    def test_all_profiles (self):
239        if self.is_virt_nics or not self.is_loopback:
240            self.skip('skipping profile tests for virtual / non loopback')
241            return
242
243        default_mult  = self.get_benchmark_param('mult',default="30%")
244        skip_tests     = self.get_benchmark_param('skip',default=[])
245
246        try:
247            print("\n");
248
249
250            for profile in self.profiles:
251
252                skip=False
253                if skip_tests:
254                    for  skip_test in skip_tests:
255                        if skip_test in profile:
256                           skip=True;
257                           break;
258                if skip:
259                    print("skipping testing profile due to config file {0}...\n".format(profile))
260                    continue;
261
262
263                print("now testing profile {0}...\n".format(profile))
264
265                p1 = STLProfile.load(profile, port_id = self.tx_port)
266                p2 = STLProfile.load(profile, port_id = self.rx_port)
267
268                # if profile contains custom MAC addrs we need promiscuous mode
269                # but virtual NICs does not support promiscuous mode
270                self.c.set_port_attr(ports = [self.tx_port, self.rx_port], promiscuous = False)
271
272                if p1.has_custom_mac_addr() or p2.has_custom_mac_addr():
273                    if self.is_virt_nics:
274                        print("\n*** profile needs promiscuous mode but running on virtual NICs - skipping... ***\n")
275                        continue
276                    elif self.is_vf_nics:
277                        print("\n*** profile needs promiscuous mode but running on VF - skipping... ***\n")
278                        continue
279                    else:
280                        self.c.set_port_attr(ports = [self.tx_port, self.rx_port], promiscuous = True)
281
282                if p1.has_flow_stats() or p2.has_flow_stats():
283                    print("\n*** profile needs RX caps - skipping... ***\n")
284                    continue
285
286                self.c.add_streams(p1, ports = self.tx_port)
287                self.c.add_streams(p2, ports = self.rx_port)
288
289                self.c.clear_stats()
290
291                self.c.start(ports = [self.tx_port, self.rx_port], mult = default_mult)
292                time.sleep(0.1)
293
294                if p1.is_pauseable() and p2.is_pauseable():
295                    self.c.pause(ports = [self.tx_port, self.rx_port])
296                    time.sleep(0.1)
297
298                    self.c.resume(ports = [self.tx_port, self.rx_port])
299                    time.sleep(0.1)
300
301                self.c.stop(ports = [self.tx_port, self.rx_port])
302
303                stats = self.c.get_stats()
304
305                assert self.tx_port in stats, '{0} - no stats for TX port'.format(profile)
306                assert self.rx_port in stats, '{0} - no stats for RX port'.format(profile)
307
308                self.verify(stats[self.tx_port]['opackets'], stats[self.rx_port]['ipackets'])
309                self.verify(stats[self.rx_port]['opackets'], stats[self.tx_port]['ipackets'])
310
311                self.c.remove_all_streams(ports = [self.tx_port, self.rx_port])
312
313        except STLError as e:
314            assert False , '{0}'.format(e)
315
316
317        finally:
318            self.c.set_port_attr(ports = [self.tx_port, self.rx_port], promiscuous = False)
319
320
321    # see https://trex-tgn.cisco.com/youtrack/issue/trex-226
322    def test_latency_pause_resume (self):
323
324        try:
325
326            s1 = STLStream(name = 'latency',
327                           packet = self.pkt,
328                           mode = STLTXCont(percentage = self.percentage),
329                           flow_stats = STLFlowLatencyStats(pg_id = 1))
330
331            self.c.add_streams([s1], ports = self.tx_port)
332
333            self.c.clear_stats()
334
335            self.c.start(ports = self.tx_port)
336
337            for i in range(100):
338                self.c.pause()
339                self.c.resume()
340
341            self.c.stop()
342
343        except STLError as e:
344            assert False , '{0}'.format(e)
345
346
347    def test_pcap_remote (self):
348        try:
349            pcap_file = os.path.join(CTRexScenario.scripts_path, 'automation/regression/test_pcaps/pcap_dual_test.erf')
350
351            master = self.tx_port
352            slave = master ^ 0x1
353
354            self.c.reset(ports = [master, slave])
355            self.c.clear_stats()
356            self.c.push_remote(pcap_file,
357                               ports = [master],
358                               ipg_usec = 100,
359                               is_dual = True)
360            self.c.wait_on_traffic(ports = [master])
361
362            stats = self.c.get_stats()
363
364            self.verify(stats[master]['opackets'], 52)
365            self.verify(stats[slave]['opackets'], 48)
366
367        except STLError as e:
368            assert False , '{0}'.format(e)
369
370