trex_stl_client.py revision 00bfc58e
1#!/router/bin/python
2
3# for API usage the path name must be full
4from .trex_stl_exceptions import *
5from .trex_stl_streams import *
6
7from .trex_stl_jsonrpc_client import JsonRpcClient, BatchMessage
8from . import trex_stl_stats
9
10from .trex_stl_port import Port
11from .trex_stl_types import *
12from .trex_stl_async_client import CTRexAsyncClient
13
14from .utils import parsing_opts, text_tables, common
15from .utils.common import list_intersect, list_difference, is_sub_list, PassiveTimer
16from .utils.text_opts import *
17from functools import wraps
18
19from collections import namedtuple
20from yaml import YAMLError
21import time
22import datetime
23import re
24import random
25import json
26import traceback
27
28############################     logger     #############################
29############################                #############################
30############################                #############################
31
32# logger API for the client
33class LoggerApi(object):
34    # verbose levels
35    VERBOSE_QUIET   = 0
36    VERBOSE_REGULAR = 1
37    VERBOSE_HIGH    = 2
38
39    def __init__(self):
40        self.level = LoggerApi.VERBOSE_REGULAR
41
42    # implemented by specific logger
43    def write(self, msg, newline = True):
44        raise Exception("Implement this")
45
46    # implemented by specific logger
47    def flush(self):
48        raise Exception("Implement this")
49
50    def set_verbose (self, level):
51        if not level in range(self.VERBOSE_QUIET, self.VERBOSE_HIGH + 1):
52            raise ValueError("Bad value provided for logger")
53
54        self.level = level
55
56    def get_verbose (self):
57        return self.level
58
59
60    def check_verbose (self, level):
61        return (self.level >= level)
62
63
64    # simple log message with verbose
65    def log (self, msg, level = VERBOSE_REGULAR, newline = True):
66        if not self.check_verbose(level):
67            return
68
69        self.write(msg, newline)
70
71    # logging that comes from async event
72    def async_log (self, msg, level = VERBOSE_REGULAR, newline = True):
73        self.log(msg, level, newline)
74
75
76    def pre_cmd (self, desc):
77        self.log(format_text('\n{:<60}'.format(desc), 'bold'), newline = False)
78        self.flush()
79
80    def post_cmd (self, rc):
81        if rc:
82            self.log(format_text("[SUCCESS]\n", 'green', 'bold'))
83        else:
84            self.log(format_text("[FAILED]\n", 'red', 'bold'))
85
86
87    def log_cmd (self, desc):
88        self.pre_cmd(desc)
89        self.post_cmd(True)
90
91
92    # supress object getter
93    def supress (self):
94        class Supress(object):
95            def __init__ (self, logger):
96                self.logger = logger
97
98            def __enter__ (self):
99                self.saved_level = self.logger.get_verbose()
100                self.logger.set_verbose(LoggerApi.VERBOSE_QUIET)
101
102            def __exit__ (self, type, value, traceback):
103                self.logger.set_verbose(self.saved_level)
104
105        return Supress(self)
106
107
108
109# default logger - to stdout
110class DefaultLogger(LoggerApi):
111
112    def __init__ (self):
113        super(DefaultLogger, self).__init__()
114
115    def write (self, msg, newline = True):
116        if newline:
117            print(msg)
118        else:
119            print (msg),
120
121    def flush (self):
122        sys.stdout.flush()
123
124
125############################     async event hander     #############################
126############################                            #############################
127############################                            #############################
128
129# an event
130class Event(object):
131
132    def __init__ (self, origin, ev_type, msg):
133        self.origin = origin
134        self.ev_type = ev_type
135        self.msg = msg
136
137        self.ts = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
138
139    def __str__ (self):
140
141        prefix = "[{:^}][{:^}]".format(self.origin, self.ev_type)
142
143        return "{:<10} - {:18} - {:}".format(self.ts, prefix, format_text(self.msg, 'bold'))
144
145
146# handles different async events given to the client
147class EventsHandler(object):
148
149
150    def __init__ (self, client):
151        self.client = client
152        self.logger = self.client.logger
153
154        self.events = []
155
156    # public functions
157
158    def get_events (self, ev_type_filter = None):
159        if ev_type_filter:
160            return [ev for ev in self.events if ev.ev_type in listify(ev_type_filter)]
161        else:
162            return [ev for ev in self.events]
163
164
165    def clear_events (self):
166        self.events = []
167
168
169    def log_warning (self, msg, show = True):
170        self.__add_event_log('local', 'warning', msg, show)
171
172
173    # events called internally
174
175    def on_async_dead (self):
176        if self.client.connected:
177            msg = 'Lost connection to server'
178            self.__add_event_log('local', 'info', msg, True)
179            self.client.connected = False
180
181
182    def on_async_alive (self):
183        pass
184
185
186
187    def on_async_rx_stats_event (self, data, baseline):
188        self.client.flow_stats.update(data, baseline)
189
190    def on_async_latency_stats_event (self, data, baseline):
191        self.client.latency_stats.update(data, baseline)
192
193    # handles an async stats update from the subscriber
194    def on_async_stats_update(self, dump_data, baseline):
195        global_stats = {}
196        port_stats = {}
197
198        # filter the values per port and general
199        for key, value in dump_data.items():
200            # match a pattern of ports
201            m = re.search('(.*)\-(\d+)', key)
202            if m:
203                port_id = int(m.group(2))
204                field_name = m.group(1)
205                if port_id in self.client.ports:
206                    if not port_id in port_stats:
207                        port_stats[port_id] = {}
208                    port_stats[port_id][field_name] = value
209                else:
210                    continue
211            else:
212                # no port match - general stats
213                global_stats[key] = value
214
215        # update the general object with the snapshot
216        self.client.global_stats.update(global_stats, baseline)
217
218        # update all ports
219        for port_id, data in port_stats.items():
220            self.client.ports[port_id].port_stats.update(data, baseline)
221
222
223
224    # dispatcher for server async events (port started, port stopped and etc.)
225    def on_async_event (self, type, data):
226        # DP stopped
227        show_event = False
228
229        # port started
230        if (type == 0):
231            port_id = int(data['port_id'])
232            ev = "Port {0} has started".format(port_id)
233            self.__async_event_port_started(port_id)
234
235        # port stopped
236        elif (type == 1):
237            port_id = int(data['port_id'])
238            ev = "Port {0} has stopped".format(port_id)
239
240            # call the handler
241            self.__async_event_port_stopped(port_id)
242
243
244        # port paused
245        elif (type == 2):
246            port_id = int(data['port_id'])
247            ev = "Port {0} has paused".format(port_id)
248
249            # call the handler
250            self.__async_event_port_paused(port_id)
251
252        # port resumed
253        elif (type == 3):
254            port_id = int(data['port_id'])
255            ev = "Port {0} has resumed".format(port_id)
256
257            # call the handler
258            self.__async_event_port_resumed(port_id)
259
260        # port finished traffic
261        elif (type == 4):
262            port_id = int(data['port_id'])
263            ev = "Port {0} job done".format(port_id)
264
265            # call the handler
266            self.__async_event_port_job_done(port_id)
267            show_event = True
268
269        # port was acquired - maybe stolen...
270        elif (type == 5):
271            session_id = data['session_id']
272
273            port_id = int(data['port_id'])
274            who     = data['who']
275            force   = data['force']
276
277            # if we hold the port and it was not taken by this session - show it
278            if port_id in self.client.get_acquired_ports() and session_id != self.client.session_id:
279                show_event = True
280
281            # format the thief/us...
282            if session_id == self.client.session_id:
283                user = 'you'
284            elif who == self.client.username:
285                user = 'another session of you'
286            else:
287                user = "'{0}'".format(who)
288
289            if force:
290                ev = "Port {0} was forcely taken by {1}".format(port_id, user)
291            else:
292                ev = "Port {0} was taken by {1}".format(port_id, user)
293
294            # call the handler in case its not this session
295            if session_id != self.client.session_id:
296                self.__async_event_port_acquired(port_id, who)
297
298
299        # port was released
300        elif (type == 6):
301            port_id     = int(data['port_id'])
302            who         = data['who']
303            session_id  = data['session_id']
304
305            if session_id == self.client.session_id:
306                user = 'you'
307            elif who == self.client.username:
308                user = 'another session of you'
309            else:
310                user = "'{0}'".format(who)
311
312            ev = "Port {0} was released by {1}".format(port_id, user)
313
314            # call the handler in case its not this session
315            if session_id != self.client.session_id:
316                self.__async_event_port_released(port_id)
317
318        elif (type == 7):
319            port_id = int(data['port_id'])
320            ev = "port {0} job failed".format(port_id)
321            show_event = True
322
323        # server stopped
324        elif (type == 100):
325            ev = "Server has stopped"
326            self.__async_event_server_stopped()
327            show_event = True
328
329
330        else:
331            # unknown event - ignore
332            return
333
334
335        self.__add_event_log('server', 'info', ev, show_event)
336
337
338    # private functions
339
340    # on rare cases events may come on a non existent prot
341    # (server was re-run with different config)
342    def __async_event_port_job_done (self, port_id):
343        if port_id in self.client.ports:
344            self.client.ports[port_id].async_event_port_job_done()
345
346    def __async_event_port_stopped (self, port_id):
347        if port_id in self.client.ports:
348            self.client.ports[port_id].async_event_port_stopped()
349
350
351    def __async_event_port_started (self, port_id):
352        if port_id in self.client.ports:
353            self.client.ports[port_id].async_event_port_started()
354
355    def __async_event_port_paused (self, port_id):
356        if port_id in self.client.ports:
357            self.client.ports[port_id].async_event_port_paused()
358
359
360    def __async_event_port_resumed (self, port_id):
361        if port_id in self.client.ports:
362            self.client.ports[port_id].async_event_port_resumed()
363
364    def __async_event_port_acquired (self, port_id, who):
365        if port_id in self.client.ports:
366            self.client.ports[port_id].async_event_acquired(who)
367
368    def __async_event_port_released (self, port_id):
369        if port_id in self.client.ports:
370            self.client.ports[port_id].async_event_released()
371
372    def __async_event_server_stopped (self):
373        self.client.connected = False
374
375
376    # add event to log
377    def __add_event_log (self, origin, ev_type, msg, show = False):
378
379        event = Event(origin, ev_type, msg)
380        self.events.append(event)
381        if show:
382            self.logger.async_log("\n\n{0}".format(str(event)))
383
384
385
386
387
388############################     RPC layer     #############################
389############################                   #############################
390############################                   #############################
391
392class CCommLink(object):
393    """Describes the connectivity of the stateless client method"""
394    def __init__(self, server="localhost", port=5050, virtual=False, client = None):
395        self.virtual = virtual
396        self.server = server
397        self.port = port
398        self.rpc_link = JsonRpcClient(self.server, self.port, client)
399
400    @property
401    def is_connected(self):
402        if not self.virtual:
403            return self.rpc_link.connected
404        else:
405            return True
406
407    def get_server (self):
408        return self.server
409
410    def get_port (self):
411        return self.port
412
413    def connect(self):
414        if not self.virtual:
415            return self.rpc_link.connect()
416
417    def disconnect(self):
418        if not self.virtual:
419            return self.rpc_link.disconnect()
420
421    def transmit(self, method_name, params = None, api_class = 'core'):
422        if self.virtual:
423            self._prompt_virtual_tx_msg()
424            _, msg = self.rpc_link.create_jsonrpc_v2(method_name, params, api_class)
425            print(msg)
426            return
427        else:
428            return self.rpc_link.invoke_rpc_method(method_name, params, api_class)
429
430    def transmit_batch(self, batch_list):
431        if self.virtual:
432            self._prompt_virtual_tx_msg()
433            print([msg
434                   for _, msg in [self.rpc_link.create_jsonrpc_v2(command.method, command.params, command.api_class)
435                                  for command in batch_list]])
436        else:
437            batch = self.rpc_link.create_batch()
438            for command in batch_list:
439                batch.add(command.method, command.params, command.api_class)
440            # invoke the batch
441            return batch.invoke()
442
443    def _prompt_virtual_tx_msg(self):
444        print("Transmitting virtually over tcp://{server}:{port}".format(server=self.server,
445                                                                         port=self.port))
446
447
448
449############################     client     #############################
450############################                #############################
451############################                #############################
452
453class STLClient(object):
454    """TRex Stateless client object - gives operations per TRex/user"""
455
456    # different modes for attaching traffic to ports
457    CORE_MASK_SPLIT = 1
458    CORE_MASK_PIN   = 2
459
460    def __init__(self,
461                 username = common.get_current_user(),
462                 server = "localhost",
463                 sync_port = 4501,
464                 async_port = 4500,
465                 verbose_level = LoggerApi.VERBOSE_QUIET,
466                 logger = None,
467                 virtual = False):
468        """
469        Configure the connection settings
470
471        :parameters:
472             username : string
473                the user name, for example imarom
474
475              server  : string
476                the server name or ip
477
478              sync_port : int
479                the RPC port
480
481              async_port : int
482                the ASYNC port
483
484        .. code-block:: python
485
486            # Example
487
488            # connect to local TRex server
489            c = STLClient()
490
491            # connect to remote server trex-remote-server
492            c = STLClient(server = "trex-remote-server" )
493
494            c = STLClient(server = "10.0.0.10" )
495
496            # verbose mode
497            c = STLClient(server = "10.0.0.10", verbose_level = LoggerApi.VERBOSE_HIGH )
498
499            # change user name
500            c = STLClient(username = "root",server = "10.0.0.10", verbose_level = LoggerApi.VERBOSE_HIGH )
501
502            c.connect()
503
504            c.disconnect()
505
506        """
507
508        self.username   = username
509
510        # init objects
511        self.ports = {}
512        self.server_version = {}
513        self.system_info = {}
514        self.session_id = random.getrandbits(32)
515        self.connected = False
516
517        # API classes
518        self.api_vers = [ {'type': 'core', 'major': 2, 'minor': 3 } ]
519        self.api_h = {'core': None}
520
521        # logger
522        self.logger = DefaultLogger() if not logger else logger
523
524        # initial verbose
525        self.logger.set_verbose(verbose_level)
526
527        # low level RPC layer
528        self.comm_link = CCommLink(server,
529                                   sync_port,
530                                   virtual,
531                                   self)
532
533        # async event handler manager
534        self.event_handler = EventsHandler(self)
535
536        # async subscriber level
537        self.async_client = CTRexAsyncClient(server,
538                                             async_port,
539                                             self)
540
541
542
543
544        # stats
545        self.connection_info = {"username":   username,
546                                "server":     server,
547                                "sync_port":  sync_port,
548                                "async_port": async_port,
549                                "virtual":    virtual}
550
551
552        self.global_stats = trex_stl_stats.CGlobalStats(self.connection_info,
553                                                        self.server_version,
554                                                        self.ports,
555                                                        self.event_handler)
556
557        self.flow_stats = trex_stl_stats.CRxStats(self.ports)
558
559        self.latency_stats = trex_stl_stats.CLatencyStats(self.ports)
560
561        self.util_stats = trex_stl_stats.CUtilStats(self)
562
563        self.xstats = trex_stl_stats.CXStats(self)
564
565        self.stats_generator = trex_stl_stats.CTRexInfoGenerator(self.global_stats,
566                                                                 self.ports,
567                                                                 self.flow_stats,
568                                                                 self.latency_stats,
569                                                                 self.util_stats,
570                                                                 self.xstats,
571                                                                 self.async_client.monitor)
572
573
574
575
576    ############# private functions - used by the class itself ###########
577
578    # some preprocessing for port argument
579    def __ports (self, port_id_list):
580
581        # none means all
582        if port_id_list == None:
583            return range(0, self.get_port_count())
584
585        # always list
586        if isinstance(port_id_list, int):
587            port_id_list = [port_id_list]
588
589        if not isinstance(port_id_list, list):
590             raise ValueError("Bad port id list: {0}".format(port_id_list))
591
592        for port_id in port_id_list:
593            if not isinstance(port_id, int) or (port_id < 0) or (port_id > self.get_port_count()):
594                raise ValueError("Bad port id {0}".format(port_id))
595
596        return port_id_list
597
598
599    # sync ports
600    def __sync_ports (self, port_id_list = None, force = False):
601        port_id_list = self.__ports(port_id_list)
602
603        rc = RC()
604
605        for port_id in port_id_list:
606            rc.add(self.ports[port_id].sync())
607
608        return rc
609
610    # acquire ports, if port_list is none - get all
611    def __acquire (self, port_id_list = None, force = False, sync_streams = True):
612        port_id_list = self.__ports(port_id_list)
613
614        rc = RC()
615
616        for port_id in port_id_list:
617            rc.add(self.ports[port_id].acquire(force, sync_streams))
618
619        return rc
620
621    # release ports
622    def __release (self, port_id_list = None):
623        port_id_list = self.__ports(port_id_list)
624
625        rc = RC()
626
627        for port_id in port_id_list:
628            rc.add(self.ports[port_id].release())
629
630        return rc
631
632
633    def __add_streams(self, stream_list, port_id_list = None):
634
635        port_id_list = self.__ports(port_id_list)
636
637        rc = RC()
638
639        for port_id in port_id_list:
640            rc.add(self.ports[port_id].add_streams(stream_list))
641
642        return rc
643
644
645
646    def __remove_streams(self, stream_id_list, port_id_list = None):
647
648        port_id_list = self.__ports(port_id_list)
649
650        rc = RC()
651
652        for port_id in port_id_list:
653            rc.add(self.ports[port_id].remove_streams(stream_id_list))
654
655        return rc
656
657
658
659    def __remove_all_streams(self, port_id_list = None):
660        port_id_list = self.__ports(port_id_list)
661
662        rc = RC()
663
664        for port_id in port_id_list:
665            rc.add(self.ports[port_id].remove_all_streams())
666
667        return rc
668
669
670    def __get_stream(self, stream_id, port_id, get_pkt = False):
671
672        return self.ports[port_id].get_stream(stream_id)
673
674
675    def __get_all_streams(self, port_id, get_pkt = False):
676
677        return self.ports[port_id].get_all_streams()
678
679
680    def __get_stream_id_list(self, port_id):
681
682        return self.ports[port_id].get_stream_id_list()
683
684
685    def __start (self,
686                 multiplier,
687                 duration,
688                 port_id_list,
689                 force,
690                 core_mask):
691
692        port_id_list = self.__ports(port_id_list)
693
694        rc = RC()
695
696
697        for port_id in port_id_list:
698            rc.add(self.ports[port_id].start(multiplier,
699                                             duration,
700                                             force,
701                                             core_mask[port_id]))
702
703        return rc
704
705
706    def __resume (self, port_id_list = None, force = False):
707
708        port_id_list = self.__ports(port_id_list)
709        rc = RC()
710
711        for port_id in port_id_list:
712            rc.add(self.ports[port_id].resume())
713
714        return rc
715
716    def __pause (self, port_id_list = None, force = False):
717
718        port_id_list = self.__ports(port_id_list)
719        rc = RC()
720
721        for port_id in port_id_list:
722            rc.add(self.ports[port_id].pause())
723
724        return rc
725
726
727    def __stop (self, port_id_list = None, force = False):
728
729        port_id_list = self.__ports(port_id_list)
730        rc = RC()
731
732        for port_id in port_id_list:
733            rc.add(self.ports[port_id].stop(force))
734
735        return rc
736
737
738    def __update (self, mult, port_id_list = None, force = False):
739
740        port_id_list = self.__ports(port_id_list)
741        rc = RC()
742
743        for port_id in port_id_list:
744            rc.add(self.ports[port_id].update(mult, force))
745
746        return rc
747
748
749    def __push_remote (self, pcap_filename, port_id_list, ipg_usec, speedup, count, duration, is_dual):
750
751        port_id_list = self.__ports(port_id_list)
752        rc = RC()
753
754        for port_id in port_id_list:
755
756            # for dual, provide the slave handler as well
757            slave_handler = self.ports[port_id ^ 0x1].handler if is_dual else ""
758
759            rc.add(self.ports[port_id].push_remote(pcap_filename,
760                                                   ipg_usec,
761                                                   speedup,
762                                                   count,
763                                                   duration,
764                                                   is_dual,
765                                                   slave_handler))
766
767        return rc
768
769
770    def __validate (self, port_id_list = None):
771        port_id_list = self.__ports(port_id_list)
772
773        rc = RC()
774
775        for port_id in port_id_list:
776            rc.add(self.ports[port_id].validate())
777
778        return rc
779
780
781    def __set_port_attr (self, port_id_list = None, attr_dict = None):
782
783        port_id_list = self.__ports(port_id_list)
784        rc = RC()
785
786        for port_id in port_id_list:
787            rc.add(self.ports[port_id].set_attr(attr_dict))
788
789        return rc
790
791
792
793    # connect to server
794    def __connect(self):
795
796        # first disconnect if already connected
797        if self.is_connected():
798            self.__disconnect()
799
800        # clear this flag
801        self.connected = False
802
803        # connect sync channel
804        self.logger.pre_cmd("Connecting to RPC server on {0}:{1}".format(self.connection_info['server'], self.connection_info['sync_port']))
805        rc = self.comm_link.connect()
806        self.logger.post_cmd(rc)
807
808        if not rc:
809            return rc
810
811
812        # API sync
813        rc = self._transmit("api_sync", params = {'api_vers': self.api_vers}, api_class = None)
814        if not rc:
815            return rc
816
817        # decode
818        for api in rc.data()['api_vers']:
819            self.api_h[ api['type'] ] = api['api_h']
820
821
822        # version
823        rc = self._transmit("get_version")
824        if not rc:
825            return rc
826
827        self.server_version = rc.data()
828        self.global_stats.server_version = rc.data()
829
830        # cache system info
831        rc = self._transmit("get_system_info")
832        if not rc:
833            return rc
834
835        self.system_info = rc.data()
836        self.global_stats.system_info = rc.data()
837
838        # cache supported commands
839        rc = self._transmit("get_supported_cmds")
840        if not rc:
841            return rc
842
843        self.supported_cmds = sorted(rc.data())
844
845        # create ports
846        for port_id in range(self.system_info["port_count"]):
847            info = self.system_info['ports'][port_id]
848
849            self.ports[port_id] = Port(port_id,
850                                       self.username,
851                                       self.comm_link,
852                                       self.session_id,
853                                       info)
854
855
856        # sync the ports
857        rc = self.__sync_ports()
858        if not rc:
859            return rc
860
861
862        # connect async channel
863        self.logger.pre_cmd("Connecting to publisher server on {0}:{1}".format(self.connection_info['server'], self.connection_info['async_port']))
864        rc = self.async_client.connect()
865        self.logger.post_cmd(rc)
866
867        if not rc:
868            return rc
869
870        self.connected = True
871
872        return RC_OK()
873
874
875    # disconenct from server
876    def __disconnect(self, release_ports = True):
877        # release any previous acquired ports
878        if self.is_connected() and release_ports:
879            self.__release(self.get_acquired_ports())
880
881        self.comm_link.disconnect()
882        self.async_client.disconnect()
883
884        self.connected = False
885
886        return RC_OK()
887
888
889    # clear stats
890    def __clear_stats(self, port_id_list, clear_global, clear_flow_stats, clear_latency_stats):
891
892        # we must be sync with the server
893        self.async_client.barrier()
894
895        for port_id in port_id_list:
896            self.ports[port_id].clear_stats()
897
898        if clear_global:
899            self.global_stats.clear_stats()
900
901        if clear_flow_stats:
902            self.flow_stats.clear_stats()
903
904        if clear_latency_stats:
905            self.latency_stats.clear_stats()
906
907        self.logger.log_cmd("Clearing stats on port(s) {0}:".format(port_id_list))
908
909        return RC
910
911
912    # get stats
913    def __get_stats (self, port_id_list):
914        stats = {}
915
916        stats['global'] = self.global_stats.get_stats()
917
918        total = {}
919        for port_id in port_id_list:
920            port_stats = self.ports[port_id].get_stats()
921            stats[port_id] = port_stats
922
923            for k, v in port_stats.items():
924                if not k in total:
925                    total[k] = v
926                else:
927                    total[k] += v
928
929        stats['total'] = total
930
931        stats['flow_stats'] = self.flow_stats.get_stats()
932        stats['latency'] = self.latency_stats.get_stats()
933
934        return stats
935
936
937    def __decode_core_mask (self, ports, core_mask):
938
939        # predefined modes
940        if isinstance(core_mask, int):
941            if core_mask not in [self.CORE_MASK_PIN, self.CORE_MASK_SPLIT]:
942                raise STLError("'core_mask' can be either CORE_MASK_PIN, CORE_MASK_SPLIT or a list of masks")
943
944            decoded_mask = {}
945            for port in ports:
946                # a pin mode was requested and we have
947                # the second port from the group in the start list
948                if (core_mask == self.CORE_MASK_PIN) and ( (port ^ 0x1) in ports ):
949                    decoded_mask[port] = 0x55555555 if( port % 2) == 0 else 0xAAAAAAAA
950                else:
951                    decoded_mask[port] = None
952
953            return decoded_mask
954
955        # list of masks
956        elif isinstance(core_mask, list):
957            if len(ports) != len(core_mask):
958                raise STLError("'core_mask' list must be the same length as 'ports' list")
959
960            decoded_mask = {}
961            for i, port in enumerate(ports):
962                decoded_mask[port] = core_mask[i]
963
964            return decoded_mask
965
966
967
968    ############ functions used by other classes but not users ##############
969
970    def _validate_port_list (self, port_id_list):
971        # listfiy single int
972        if isinstance(port_id_list, int):
973            port_id_list = [port_id_list]
974
975        # should be a list
976        if not isinstance(port_id_list, list):
977            raise STLTypeError('port_id_list', type(port_id_list), list)
978
979        if not port_id_list:
980            raise STLError('No ports provided')
981
982        valid_ports = self.get_all_ports()
983        for port_id in port_id_list:
984            if not port_id in valid_ports:
985                raise STLError("Port ID '{0}' is not a valid port ID - valid values: {1}".format(port_id, valid_ports))
986
987        return port_id_list
988
989
990    # transmit request on the RPC link
991    def _transmit(self, method_name, params = None, api_class = 'core'):
992        return self.comm_link.transmit(method_name, params, api_class)
993
994    # transmit batch request on the RPC link
995    def _transmit_batch(self, batch_list):
996        return self.comm_link.transmit_batch(batch_list)
997
998    # stats
999    def _get_formatted_stats(self, port_id_list, stats_mask = trex_stl_stats.COMPACT):
1000
1001        stats_opts = common.list_intersect(trex_stl_stats.ALL_STATS_OPTS, stats_mask)
1002
1003        stats_obj = OrderedDict()
1004        for stats_type in stats_opts:
1005            stats_obj.update(self.stats_generator.generate_single_statistic(port_id_list, stats_type))
1006
1007        return stats_obj
1008
1009    def _get_streams(self, port_id_list, streams_mask=set()):
1010
1011        streams_obj = self.stats_generator.generate_streams_info(port_id_list, streams_mask)
1012
1013        return streams_obj
1014
1015
1016    def _invalidate_stats (self, port_id_list):
1017        for port_id in port_id_list:
1018            self.ports[port_id].invalidate_stats()
1019
1020        self.global_stats.invalidate()
1021        self.flow_stats.invalidate()
1022
1023        return RC_OK()
1024
1025
1026    # remove all RX filters in a safe manner
1027    def _remove_rx_filters (self, ports, rx_delay_ms):
1028
1029        # get the enabled RX ports
1030        rx_ports = [port_id for port_id in ports if self.ports[port_id].has_rx_enabled()]
1031
1032        if not rx_ports:
1033            return RC_OK()
1034
1035        # block while any RX configured port has not yet have it's delay expired
1036        while any([not self.ports[port_id].has_rx_delay_expired(rx_delay_ms) for port_id in rx_ports]):
1037            time.sleep(0.01)
1038
1039        # remove RX filters
1040        rc = RC()
1041        for port_id in rx_ports:
1042            rc.add(self.ports[port_id].remove_rx_filters())
1043
1044        return rc
1045
1046
1047    #################################
1048    # ------ private methods ------ #
1049    @staticmethod
1050    def __get_mask_keys(ok_values={True}, **kwargs):
1051        masked_keys = set()
1052        for key, val in kwargs.items():
1053            if val in ok_values:
1054                masked_keys.add(key)
1055        return masked_keys
1056
1057    @staticmethod
1058    def __filter_namespace_args(namespace, ok_values):
1059        return {k: v for k, v in namespace.__dict__.items() if k in ok_values}
1060
1061
1062    # API decorator - double wrap because of argument
1063    def __api_check(connected = True):
1064
1065        def wrap (f):
1066            @wraps(f)
1067            def wrap2(*args, **kwargs):
1068                client = args[0]
1069
1070                func_name = f.__name__
1071
1072                # check connection
1073                if connected and not client.is_connected():
1074                    raise STLStateError(func_name, 'disconnected')
1075
1076                try:
1077                    ret = f(*args, **kwargs)
1078                except KeyboardInterrupt as e:
1079                    raise STLError("Interrupted by a keyboard signal (probably ctrl + c)")
1080
1081                return ret
1082            return wrap2
1083
1084        return wrap
1085
1086
1087
1088    ############################     API     #############################
1089    ############################             #############################
1090    ############################             #############################
1091    def __enter__ (self):
1092        self.connect()
1093        self.acquire(force = True)
1094        self.reset()
1095        return self
1096
1097    def __exit__ (self, type, value, traceback):
1098        if self.get_active_ports():
1099            self.stop(self.get_active_ports())
1100        self.disconnect()
1101
1102    ############################   Getters   #############################
1103    ############################             #############################
1104    ############################             #############################
1105
1106
1107    # return verbose level of the logger
1108    def get_verbose (self):
1109        """
1110        Get the verbose mode
1111
1112        :parameters:
1113          none
1114
1115        :return:
1116            Get the verbose mode as Bool
1117
1118        :raises:
1119          None
1120
1121        """
1122        return self.logger.get_verbose()
1123
1124    # is the client on read only mode ?
1125    def is_all_ports_acquired (self):
1126        """
1127         is_all_ports_acquired
1128
1129        :parameters:
1130          None
1131
1132        :return:
1133            Returns True if all ports are acquired
1134
1135        :raises:
1136          None
1137
1138        """
1139
1140        return (self.get_all_ports() == self.get_acquired_ports())
1141
1142
1143    # is the client connected ?
1144    def is_connected (self):
1145        """
1146
1147        :parameters:
1148          None
1149
1150        :return:
1151            is_connected
1152
1153        :raises:
1154          None
1155
1156        """
1157
1158        return self.connected and self.comm_link.is_connected
1159
1160
1161    # get connection info
1162    def get_connection_info (self):
1163        """
1164
1165        :parameters:
1166          None
1167
1168        :return:
1169            Connection dict
1170
1171        :raises:
1172          None
1173
1174        """
1175
1176        return self.connection_info
1177
1178
1179    # get supported commands by the server
1180    def get_server_supported_cmds(self):
1181        """
1182
1183        :parameters:
1184          None
1185
1186        :return:
1187            Connection dict
1188
1189        :raises:
1190          None
1191
1192        """
1193
1194        return self.supported_cmds
1195
1196    # get server version
1197    def get_server_version(self):
1198        """
1199
1200        :parameters:
1201          None
1202
1203        :return:
1204            Connection dict
1205
1206        :raises:
1207          None
1208
1209        """
1210
1211        return self.server_version
1212
1213    # get server system info
1214    def get_server_system_info(self):
1215        """
1216
1217        :parameters:
1218          None
1219
1220        :return:
1221            Connection dict
1222
1223        :raises:
1224          None
1225
1226        """
1227
1228        return self.system_info
1229
1230    # get port count
1231    def get_port_count(self):
1232        """
1233
1234        :parameters:
1235          None
1236
1237        :return:
1238            Connection dict
1239
1240        :raises:
1241          None
1242
1243        """
1244
1245        return len(self.ports)
1246
1247
1248    # returns the port object
1249    def get_port (self, port_id):
1250        port = self.ports.get(port_id, None)
1251        if (port != None):
1252            return port
1253        else:
1254            raise STLArgumentError('port id', port_id, valid_values = self.get_all_ports())
1255
1256
1257    # get all ports as IDs
1258    def get_all_ports (self):
1259        """
1260
1261        :parameters:
1262          None
1263
1264        :return:
1265            Connection dict
1266
1267        :raises:
1268          None
1269
1270        """
1271
1272        return list(self.ports)
1273
1274    # get all acquired ports
1275    def get_acquired_ports(self):
1276        return [port_id
1277                for port_id, port_obj in self.ports.items()
1278                if port_obj.is_acquired()]
1279
1280    # get all active ports (TX or pause)
1281    def get_active_ports(self, owned = True):
1282        if owned:
1283            return [port_id
1284                    for port_id, port_obj in self.ports.items()
1285                    if port_obj.is_active() and port_obj.is_acquired()]
1286        else:
1287            return [port_id
1288                    for port_id, port_obj in self.ports.items()
1289                    if port_obj.is_active()]
1290
1291
1292    # get paused ports
1293    def get_paused_ports (self, owned = True):
1294        if owned:
1295            return [port_id
1296                    for port_id, port_obj in self.ports.items()
1297                    if port_obj.is_paused() and port_obj.is_acquired()]
1298        else:
1299            return [port_id
1300                    for port_id, port_obj in self.ports.items()
1301                    if port_obj.is_paused()]
1302
1303
1304    # get all TX ports
1305    def get_transmitting_ports (self, owned = True):
1306        if owned:
1307            return [port_id
1308                    for port_id, port_obj in self.ports.items()
1309                    if port_obj.is_transmitting() and port_obj.is_acquired()]
1310        else:
1311            return [port_id
1312                    for port_id, port_obj in self.ports.items()
1313                    if port_obj.is_transmitting()]
1314
1315
1316    # get stats
1317    def get_stats (self, ports = None, sync_now = True):
1318        """
1319        Return dictionary containing statistics information gathered from the server.
1320
1321        :parameters:
1322
1323          ports - List of ports to retreive stats on.
1324                  If None, assume the request is for all acquired ports.
1325
1326          sync_now - Boolean - If true, create a call to the server to get latest stats, and wait for result to arrive. Otherwise, return last stats saved in client cache.
1327                            Downside of putting True is a slight delay (few 10th msecs) in getting the result. For practical uses, value should be True.
1328        :return:
1329            Statistics dictionary of dictionaries with the following format:
1330
1331            ===============================  ===============
1332            key                               Meaning
1333            ===============================  ===============
1334            :ref:`numbers (0,1,..<total>`    Statistcs per port number
1335            :ref:`total <total>`             Sum of port statistics
1336            :ref:`flow_stats <flow_stats>`   Per flow statistics
1337            :ref:`global <global>`           Global statistics
1338            :ref:`latency <latency>`         Per flow statistics regarding flow latency
1339            ===============================  ===============
1340
1341            Below is description of each of the inner dictionaries.
1342
1343            .. _total:
1344
1345            **total** and per port statistics contain dictionary with following format.
1346
1347            Most of the bytes counters (unless specified otherwise) are in L2 layer, including the Ethernet FCS. e.g. minimum packet size is 64 bytes
1348
1349            ===============================  ===============
1350            key                               Meaning
1351            ===============================  ===============
1352            ibytes                           Number of input bytes
1353            ierrors                          Number of input errors
1354            ipackets                         Number of input packets
1355            obytes                           Number of output bytes
1356            oerrors                          Number of output errors
1357            opackets                         Number of output packets
1358            rx_bps                           Receive bytes per second rate (L2 layer)
1359            rx_pps                           Receive packet per second rate
1360            tx_bps                           Transmit bytes per second rate (L2 layer)
1361            tx_pps                           Transmit packet per second rate
1362            ===============================  ===============
1363
1364            .. _flow_stats:
1365
1366            **flow_stats** contains :ref:`global dictionary <flow_stats_global>`, and dictionaries per packet group id (pg id). See structures below.
1367
1368            **per pg_id flow stat** dictionaries have following structure:
1369
1370            =================   ===============
1371            key                 Meaning
1372            =================   ===============
1373            rx_bps              Received bytes per second rate
1374            rx_bps_l1           Received bytes per second rate, including layer one
1375            rx_bytes            Total number of received bytes
1376            rx_pkts             Total number of received packets
1377            rx_pps              Received packets per second
1378            tx_bps              Transmit bytes per second rate
1379            tx_bps_l1           Transmit bytes per second rate, including layer one
1380            tx_bytes            Total number of sent bytes
1381            tx_pkts             Total number of sent packets
1382            tx_pps              Transmit packets per second rate
1383            =================   ===============
1384
1385            .. _flow_stats_global:
1386
1387            **global flow stats** dictionary has the following structure:
1388
1389            =================   ===============
1390            key                 Meaning
1391            =================   ===============
1392            rx_err              Number of flow statistics packets received that we could not associate to any pg_id. This can happen if latency on the used setup is large. See :ref:`wait_on_traffic <wait_on_traffic>` rx_delay_ms parameter for details.
1393            tx_err              Number of flow statistics packets transmitted that we could not associate to any pg_id. This is never expected. If you see this different than 0, please report.
1394            =================   ===============
1395
1396            .. _global:
1397
1398            **global**
1399
1400            =================   ===============
1401            key                 Meaning
1402            =================   ===============
1403            bw_per_core         Estimated byte rate Trex can support per core. This is calculated by extrapolation of current rate and load on transmitting cores.
1404            cpu_util            Estimate of the average utilization percentage of the transimitting cores
1405            queue_full          Total number of packets transmitted while the NIC TX queue was full. The packets will be transmitted, eventually, but will create high CPU%due to polling the queue.  This usually indicates that the rate we trying to transmit is too high for this port.
1406            rx_cpu_util         Estimate of the utilization percentage of the core handling RX traffic. Too high value of this CPU utilization could cause drop of latency streams.
1407            rx_drop_bps         Received bytes per second drop rate
1408            rx_bps              Received bytes per second rate
1409            rx_pps              Received packets per second rate
1410            tx_bps              Transmit bytes per second rate
1411            tx_pps              Transmit packets per second rate
1412            =================   ===============
1413
1414            .. _latency:
1415
1416            **latency** contains :ref:`global dictionary <lat_stats_global>`, and dictionaries per packet group id (pg id). Each one with the following structure.
1417
1418            **per pg_id latency stat** dictionaries have following structure:
1419
1420            ===========================          ===============
1421            key                                  Meaning
1422            ===========================          ===============
1423            :ref:`err_cntrs<err-cntrs>`          Counters describing errors that occured with this pg id
1424            :ref:`latency<lat_inner>`            Information regarding packet latency
1425            ===========================          ===============
1426
1427            Following are the inner dictionaries of latency
1428
1429            .. _err-cntrs:
1430
1431            **err-cntrs**
1432
1433            =================   ===============
1434            key                 Meaning (see better explanation below the table)
1435            =================   ===============
1436            dropped             How many packets were dropped (estimation)
1437            dup                 How many packets were duplicated.
1438            out_of_order        How many packets we received out of order.
1439            seq_too_high        How many events of packet with sequence number too high we saw.
1440            seq_too_low         How many events of packet with sequence number too low we saw.
1441            =================   ===============
1442
1443            For calculating packet error events, we add sequence number to each packet's payload. We decide what went wrong only according to sequence number
1444            of last packet received and that of the previous packet. 'seq_too_low' and 'seq_too_high' count events we see. 'dup', 'out_of_order' and 'dropped'
1445            are heuristics we apply to try and understand what happened. They will be accurate in common error scenarios.
1446            We describe few scenarios below to help understand this.
1447
1448            Scenario 1: Received packet with seq num 10, and another one with seq num 10. We increment 'dup' and 'seq_too_low' by 1.
1449
1450            Scenario 2: Received pacekt with seq num 10 and then packet with seq num 15. We assume 4 packets were dropped, and increment 'dropped' by 4, and 'seq_too_high' by 1.
1451            We expect next packet to arrive with sequence number 16.
1452
1453            Scenario 2 continue: Received packet with seq num 11. We increment 'seq_too_low' by 1. We increment 'out_of_order' by 1. We *decrement* 'dropped' by 1.
1454            (We assume here that one of the packets we considered as dropped before, actually arrived out of order).
1455
1456
1457            .. _lat_inner:
1458
1459            **latency**
1460
1461            =================   ===============
1462            key                 Meaning
1463            =================   ===============
1464            average             Average latency over the stream lifetime (usec).Low pass filter is applied to the last window average.It is computed each sampling period by following formula: <average> = <prev average>/2 + <last sampling period average>/2
1465            histogram           Dictionary describing logarithmic distribution histogram of packet latencies. Keys in the dictionary represent range of latencies (in usec). Values are the total number of packets received in this latency range. For example, an entry {100:13} would mean that we saw 13 packets with latency in the range between 100 and 200 usec.
1466            jitter              Jitter of latency samples, computed as described in :rfc:`3550#appendix-A.8`
1467            last_max            Maximum latency measured between last two data reads from server (0.5 sec window).
1468            total_max           Maximum latency measured over the stream lifetime (in usec).
1469            total_min           Minimum latency measured over the stream lifetime (in usec).
1470            =================   ===============
1471
1472            .. _lat_stats_global:
1473
1474            **global latency stats** dictionary has the following structure:
1475
1476            =================   ===============
1477            key                 Meaning
1478            =================   ===============
1479            old_flow            Number of latency statistics packets received that we could not associate to any pg_id. This can happen if latency on the used setup is large. See :ref:`wait_on_traffic <wait_on_traffic>` rx_delay_ms parameter for details.
1480            bad_hdr             Number of latency packets received with bad latency data. This can happen becuase of garbage packets in the network, or if the DUT causes packet corruption.
1481            =================   ===============
1482
1483        :raises:
1484          None
1485
1486        """
1487        # by default use all acquired ports
1488        ports = ports if ports is not None else self.get_acquired_ports()
1489        ports = self._validate_port_list(ports)
1490
1491        # check async barrier
1492        if not type(sync_now) is bool:
1493            raise STLArgumentError('sync_now', sync_now)
1494
1495
1496        # if the user requested a barrier - use it
1497        if sync_now:
1498            rc = self.async_client.barrier()
1499            if not rc:
1500                raise STLError(rc)
1501
1502        return self.__get_stats(ports)
1503
1504
1505    def get_events (self, ev_type_filter = None):
1506        """
1507        returns all the logged events
1508
1509        :parameters:
1510          ev_type_filter - 'info', 'warning' or a list of those
1511                           default: no filter
1512
1513        :return:
1514            logged events
1515
1516        :raises:
1517          None
1518
1519        """
1520        return self.event_handler.get_events(ev_type_filter)
1521
1522
1523    def get_warnings (self):
1524        """
1525        returns all the warnings logged events
1526
1527        :parameters:
1528          None
1529
1530        :return:
1531            warning logged events
1532
1533        :raises:
1534          None
1535
1536        """
1537        return self.get_events(ev_type_filter = 'warning')
1538
1539
1540    def get_info (self):
1541        """
1542        returns all the info logged events
1543
1544        :parameters:
1545          None
1546
1547        :return:
1548            warning logged events
1549
1550        :raises:
1551          None
1552
1553        """
1554        return self.get_events(ev_type_filter = 'info')
1555
1556
1557    # get port(s) info as a list of dicts
1558    @__api_check(True)
1559    def get_port_info (self, ports = None):
1560
1561        ports = ports if ports is not None else self.get_all_ports()
1562        ports = self._validate_port_list(ports)
1563
1564        return [self.ports[port_id].get_info() for port_id in ports]
1565
1566
1567    ############################   Commands   #############################
1568    ############################              #############################
1569    ############################              #############################
1570
1571
1572    def set_verbose (self, level):
1573        """
1574            Sets verbose level
1575
1576            :parameters:
1577                level : str
1578                    "high"
1579                    "low"
1580                    "normal"
1581
1582            :raises:
1583                None
1584
1585        """
1586        modes = {'low' : LoggerApi.VERBOSE_QUIET, 'normal': LoggerApi.VERBOSE_REGULAR, 'high': LoggerApi.VERBOSE_HIGH}
1587
1588        if not level in modes.keys():
1589            raise STLArgumentError('level', level)
1590
1591        self.logger.set_verbose(modes[level])
1592
1593
1594    @__api_check(False)
1595    def connect (self):
1596        """
1597
1598            Connects to the TRex server
1599
1600            :parameters:
1601                None
1602
1603            :raises:
1604                + :exc:`STLError`
1605
1606        """
1607
1608        rc = self.__connect()
1609        if not rc:
1610            raise STLError(rc)
1611
1612
1613    @__api_check(False)
1614    def disconnect (self, stop_traffic = True, release_ports = True):
1615        """
1616            Disconnects from the server
1617
1618            :parameters:
1619                stop_traffic : bool
1620                    Attempts to stop traffic before disconnecting.
1621                release_ports : bool
1622                    Attempts to release all the acquired ports.
1623
1624        """
1625
1626        # try to stop ports but do nothing if not possible
1627        if stop_traffic:
1628            try:
1629                self.stop()
1630            except STLError:
1631                pass
1632
1633
1634        self.logger.pre_cmd("Disconnecting from server at '{0}':'{1}'".format(self.connection_info['server'],
1635                                                                              self.connection_info['sync_port']))
1636        rc = self.__disconnect(release_ports)
1637        self.logger.post_cmd(rc)
1638
1639
1640
1641    @__api_check(True)
1642    def acquire (self, ports = None, force = False, sync_streams = True):
1643        """
1644            Acquires ports for executing commands
1645
1646            :parameters:
1647                ports : list
1648                    Ports on which to execute the command
1649
1650                force : bool
1651                    Force acquire the ports.
1652
1653                sync_streams: bool
1654                    sync with the server about the configured streams
1655
1656            :raises:
1657                + :exc:`STLError`
1658
1659        """
1660
1661        # by default use all ports
1662        ports = ports if ports is not None else self.get_all_ports()
1663        ports = self._validate_port_list(ports)
1664
1665        if force:
1666            self.logger.pre_cmd("Force acquiring ports {0}:".format(ports))
1667        else:
1668            self.logger.pre_cmd("Acquiring ports {0}:".format(ports))
1669
1670        rc = self.__acquire(ports, force, sync_streams)
1671
1672        self.logger.post_cmd(rc)
1673
1674        if not rc:
1675            # cleanup
1676            self.__release(ports)
1677            raise STLError(rc)
1678
1679
1680    @__api_check(True)
1681    def release (self, ports = None):
1682        """
1683            Release ports
1684
1685            :parameters:
1686                ports : list
1687                    Ports on which to execute the command
1688
1689            :raises:
1690                + :exc:`STLError`
1691
1692        """
1693
1694        ports = ports if ports is not None else self.get_acquired_ports()
1695        ports = self._validate_port_list(ports)
1696
1697        self.logger.pre_cmd("Releasing ports {0}:".format(ports))
1698        rc = self.__release(ports)
1699        self.logger.post_cmd(rc)
1700
1701        if not rc:
1702            raise STLError(rc)
1703
1704    @__api_check(True)
1705    def ping(self):
1706        """
1707            Pings the server
1708
1709            :parameters:
1710                None
1711
1712
1713            :raises:
1714                + :exc:`STLError`
1715
1716        """
1717
1718        self.logger.pre_cmd("Pinging the server on '{0}' port '{1}': ".format(self.connection_info['server'],
1719                                                                              self.connection_info['sync_port']))
1720        rc = self._transmit("ping", api_class = None)
1721
1722        self.logger.post_cmd(rc)
1723
1724        if not rc:
1725            raise STLError(rc)
1726
1727    @__api_check(True)
1728    def server_shutdown (self, force = False):
1729        """
1730            Sends the server a request for total shutdown
1731
1732            :parameters:
1733                force - shutdown server even if some ports are owned by another
1734                        user
1735
1736            :raises:
1737                + :exc:`STLError`
1738
1739        """
1740
1741        self.logger.pre_cmd("Sending shutdown request for the server")
1742
1743        rc = self._transmit("shutdown", params = {'force': force, 'user': self.username})
1744
1745        self.logger.post_cmd(rc)
1746
1747        if not rc:
1748            raise STLError(rc)
1749
1750
1751    @__api_check(True)
1752    def get_active_pgids(self):
1753        """
1754            Get active group IDs
1755
1756            :parameters:
1757                None
1758
1759
1760            :raises:
1761                + :exc:`STLError`
1762
1763        """
1764
1765        self.logger.pre_cmd( "Getting active packet group ids")
1766
1767        rc = self._transmit("get_active_pgids")
1768
1769        self.logger.post_cmd(rc)
1770
1771        if not rc:
1772            raise STLError(rc)
1773
1774    @__api_check(True)
1775    def get_util_stats(self):
1776        """
1777            Get utilization stats:
1778            History of TRex CPU utilization per thread (list of lists)
1779            MBUFs memory consumption per CPU socket.
1780
1781            :parameters:
1782                None
1783
1784            :raises:
1785                + :exc:`STLError`
1786
1787        """
1788        self.logger.pre_cmd('Getting Utilization stats')
1789        return self.util_stats.get_stats()
1790
1791    @__api_check(True)
1792    def get_xstats(self, port_id):
1793        print(port_id)
1794        """
1795            Get extended stats of port: all the counters as dict.
1796
1797            :parameters:
1798                port_id: int
1799
1800            :returns:
1801                Dict with names of counters as keys and values of uint64. Actual keys may vary per NIC.
1802
1803            :raises:
1804                + :exc:`STLError`
1805
1806        """
1807        self.logger.pre_cmd('Getting xstats')
1808        return self.xstats.get_stats(port_id)
1809
1810
1811    @__api_check(True)
1812    def reset(self, ports = None):
1813        """
1814            Force acquire ports, stop the traffic, remove all streams and clear stats
1815
1816            :parameters:
1817                ports : list
1818                   Ports on which to execute the command
1819
1820
1821            :raises:
1822                + :exc:`STLError`
1823
1824        """
1825
1826
1827        ports = ports if ports is not None else self.get_all_ports()
1828        ports = self._validate_port_list(ports)
1829
1830        # force take the port and ignore any streams on it
1831        self.acquire(ports, force = True, sync_streams = False)
1832        self.stop(ports, rx_delay_ms = 0)
1833        self.remove_all_streams(ports)
1834        self.clear_stats(ports)
1835
1836
1837    @__api_check(True)
1838    def remove_all_streams (self, ports = None):
1839        """
1840            remove all streams from port(s)
1841
1842            :parameters:
1843                ports : list
1844                    Ports on which to execute the command
1845
1846
1847            :raises:
1848                + :exc:`STLError`
1849
1850        """
1851
1852
1853        ports = ports if ports is not None else self.get_acquired_ports()
1854        ports = self._validate_port_list(ports)
1855
1856        self.logger.pre_cmd("Removing all streams from port(s) {0}:".format(ports))
1857        rc = self.__remove_all_streams(ports)
1858        self.logger.post_cmd(rc)
1859
1860        if not rc:
1861            raise STLError(rc)
1862
1863
1864    @__api_check(True)
1865    def add_streams (self, streams, ports = None):
1866        """
1867            Add a list of streams to port(s)
1868
1869            :parameters:
1870                ports : list
1871                    Ports on which to execute the command
1872                streams: list
1873                    Streams to attach (or profile)
1874
1875            :returns:
1876                List of stream IDs in order of the stream list
1877
1878            :raises:
1879                + :exc:`STLError`
1880
1881        """
1882
1883
1884        ports = ports if ports is not None else self.get_acquired_ports()
1885        ports = self._validate_port_list(ports)
1886
1887        if isinstance(streams, STLProfile):
1888            streams = streams.get_streams()
1889
1890        # transform single stream
1891        if not isinstance(streams, list):
1892            streams = [streams]
1893
1894        # check streams
1895        if not all([isinstance(stream, STLStream) for stream in streams]):
1896            raise STLArgumentError('streams', streams)
1897
1898        self.logger.pre_cmd("Attaching {0} streams to port(s) {1}:".format(len(streams), ports))
1899        rc = self.__add_streams(streams, ports)
1900        self.logger.post_cmd(rc)
1901
1902        if not rc:
1903            raise STLError(rc)
1904
1905        # return the stream IDs
1906        return rc.data()
1907
1908    @__api_check(True)
1909    def add_profile(self, filename, ports = None, **kwargs):
1910        """ |  Add streams from profile by its type. Supported types are:
1911            |  .py
1912            |  .yaml
1913            |  .pcap file that converted to profile automatically
1914
1915            :parameters:
1916                filename : string
1917                    filename (with path) of the profile
1918                ports : list
1919                    list of ports to add the profile (default: all acquired)
1920                kwargs : dict
1921                    forward those key-value pairs to the profile (tunables)
1922
1923            :returns:
1924                List of stream IDs in order of the stream list
1925
1926            :raises:
1927                + :exc:`STLError`
1928
1929        """
1930
1931        validate_type('filename', filename, basestring)
1932        profile = STLProfile.load(filename, **kwargs)
1933        return self.add_streams(profile.get_streams(), ports)
1934
1935
1936    @__api_check(True)
1937    def remove_streams (self, stream_id_list, ports = None):
1938        """
1939            Remove a list of streams from ports
1940
1941            :parameters:
1942                ports : list
1943                    Ports on which to execute the command
1944                stream_id_list: list
1945                    Stream id list to remove
1946
1947
1948            :raises:
1949                + :exc:`STLError`
1950
1951        """
1952
1953
1954        ports = ports if ports is not None else self.get_acquired_ports()
1955        ports = self._validate_port_list(ports)
1956
1957        # transform single stream
1958        if not isinstance(stream_id_list, list):
1959            stream_id_list = [stream_id_list]
1960
1961        # check streams
1962        for stream_id in stream_id_list:
1963            validate_type('stream_id', stream_id, int)
1964
1965        # remove streams
1966        self.logger.pre_cmd("Removing {0} streams from port(s) {1}:".format(len(stream_id_list), ports))
1967        rc = self.__remove_streams(stream_id_list, ports)
1968        self.logger.post_cmd(rc)
1969
1970        if not rc:
1971            raise STLError(rc)
1972
1973
1974
1975    @__api_check(True)
1976    def start (self,
1977               ports = None,
1978               mult = "1",
1979               force = False,
1980               duration = -1,
1981               total = False,
1982               core_mask = CORE_MASK_SPLIT):
1983        """
1984            Start traffic on port(s)
1985
1986            :parameters:
1987                ports : list
1988                    Ports on which to execute the command
1989
1990                mult : str
1991                    Multiplier in a form of pps, bps, or line util in %
1992                    Examples: "5kpps", "10gbps", "85%", "32mbps"
1993
1994                force : bool
1995                    If the ports are not in stopped mode or do not have sufficient bandwidth for the traffic, determines whether to stop the current traffic and force start.
1996                    True: Force start
1997                    False: Do not force start
1998
1999                duration : int
2000                    Limit the run time (seconds)
2001                    -1 = unlimited
2002
2003                total : bool
2004                    Determines whether to divide the configured bandwidth among the ports, or to duplicate the bandwidth for each port.
2005                    True: Divide bandwidth among the ports
2006                    False: Duplicate
2007
2008                core_mask: CORE_MASK_SPLIT, CORE_MASK_PIN or a list of masks (one per port)
2009                    Determine the allocation of cores per port
2010                    In CORE_MASK_SPLIT all the traffic will be divided equally between all the cores
2011                    associated with each port
2012                    In CORE_MASK_PIN, for each dual ports (a group that shares the same cores)
2013                    the cores will be divided half pinned for each port
2014
2015            :raises:
2016                + :exc:`STLError`
2017
2018        """
2019
2020        ports = ports if ports is not None else self.get_acquired_ports()
2021        ports = self._validate_port_list(ports)
2022
2023        validate_type('mult', mult, basestring)
2024        validate_type('force', force, bool)
2025        validate_type('duration', duration, (int, float))
2026        validate_type('total', total, bool)
2027        validate_type('core_mask', core_mask, (int, list))
2028
2029        #########################
2030        # decode core mask argument
2031        decoded_mask = self.__decode_core_mask(ports, core_mask)
2032        #######################
2033
2034        # verify multiplier
2035        mult_obj = parsing_opts.decode_multiplier(mult,
2036                                                  allow_update = False,
2037                                                  divide_count = len(ports) if total else 1)
2038        if not mult_obj:
2039            raise STLArgumentError('mult', mult)
2040
2041
2042        # verify ports are stopped or force stop them
2043        active_ports = list(set(self.get_active_ports()).intersection(ports))
2044        if active_ports:
2045            if not force:
2046                raise STLError("Port(s) {0} are active - please stop them or specify 'force'".format(active_ports))
2047            else:
2048                rc = self.stop(active_ports)
2049                if not rc:
2050                    raise STLError(rc)
2051
2052
2053        # start traffic
2054        self.logger.pre_cmd("Starting traffic on port(s) {0}:".format(ports))
2055        rc = self.__start(mult_obj, duration, ports, force, decoded_mask)
2056        self.logger.post_cmd(rc)
2057
2058        if not rc:
2059            raise STLError(rc)
2060
2061
2062    @__api_check(True)
2063    def stop (self, ports = None, rx_delay_ms = 10):
2064        """
2065            Stop port(s)
2066
2067            :parameters:
2068                ports : list
2069                    Ports on which to execute the command
2070
2071                rx_delay_ms : int
2072                    time to wait until RX filters are removed
2073                    this value should reflect the time it takes
2074                    packets which were transmitted to arrive
2075                    to the destination.
2076                    after this time the RX filters will be removed
2077
2078            :raises:
2079                + :exc:`STLError`
2080
2081        """
2082
2083        if ports is None:
2084            ports = self.get_active_ports()
2085            if not ports:
2086                return
2087
2088        ports = self._validate_port_list(ports)
2089
2090        self.logger.pre_cmd("Stopping traffic on port(s) {0}:".format(ports))
2091        rc = self.__stop(ports)
2092        self.logger.post_cmd(rc)
2093
2094        if not rc:
2095            raise STLError(rc)
2096
2097        # remove any RX filters
2098        rc = self._remove_rx_filters(ports, rx_delay_ms = rx_delay_ms)
2099        if not rc:
2100            raise STLError(rc)
2101
2102
2103    @__api_check(True)
2104    def update (self, ports = None, mult = "1", total = False, force = False):
2105        """
2106            Update traffic on port(s)
2107
2108            :parameters:
2109                ports : list
2110                    Ports on which to execute the command
2111
2112                mult : str
2113                    Multiplier in a form of pps, bps, or line util in %
2114                    Can also specify +/-
2115                    Examples: "5kpps+", "10gbps-", "85%", "32mbps", "20%+"
2116
2117                force : bool
2118                    If the ports are not in stopped mode or do not have sufficient bandwidth for the traffic, determines whether to stop the current traffic and force start.
2119                    True: Force start
2120                    False: Do not force start
2121
2122                total : bool
2123                    Determines whether to divide the configured bandwidth among the ports, or to duplicate the bandwidth for each port.
2124                    True: Divide bandwidth among the ports
2125                    False: Duplicate
2126
2127
2128            :raises:
2129                + :exc:`STLError`
2130
2131        """
2132
2133
2134        ports = ports if ports is not None else self.get_active_ports()
2135        ports = self._validate_port_list(ports)
2136
2137        validate_type('mult', mult, basestring)
2138        validate_type('force', force, bool)
2139        validate_type('total', total, bool)
2140
2141        # verify multiplier
2142        mult_obj = parsing_opts.decode_multiplier(mult,
2143                                                  allow_update = True,
2144                                                  divide_count = len(ports) if total else 1)
2145        if not mult_obj:
2146            raise STLArgumentError('mult', mult)
2147
2148
2149        # call low level functions
2150        self.logger.pre_cmd("Updating traffic on port(s) {0}:".format(ports))
2151        rc = self.__update(mult_obj, ports, force)
2152        self.logger.post_cmd(rc)
2153
2154        if not rc:
2155            raise STLError(rc)
2156
2157
2158
2159    @__api_check(True)
2160    def pause (self, ports = None):
2161        """
2162            Pause traffic on port(s). Works only for ports that are active, and only if all streams are in Continuous mode.
2163
2164            :parameters:
2165                ports : list
2166                    Ports on which to execute the command
2167
2168            :raises:
2169                + :exc:`STLError`
2170
2171        """
2172
2173
2174        ports = ports if ports is not None else self.get_transmitting_ports()
2175        ports = self._validate_port_list(ports)
2176
2177        self.logger.pre_cmd("Pausing traffic on port(s) {0}:".format(ports))
2178        rc = self.__pause(ports)
2179        self.logger.post_cmd(rc)
2180
2181        if not rc:
2182            raise STLError(rc)
2183
2184    @__api_check(True)
2185    def resume (self, ports = None):
2186        """
2187            Resume traffic on port(s)
2188
2189            :parameters:
2190                ports : list
2191                    Ports on which to execute the command
2192
2193            :raises:
2194                + :exc:`STLError`
2195
2196        """
2197
2198
2199        ports = ports if ports is not None else self.get_paused_ports()
2200        ports = self._validate_port_list(ports)
2201
2202
2203        self.logger.pre_cmd("Resume traffic on port(s) {0}:".format(ports))
2204        rc = self.__resume(ports)
2205        self.logger.post_cmd(rc)
2206
2207        if not rc:
2208            raise STLError(rc)
2209
2210
2211    @__api_check(True)
2212    def push_remote (self,
2213                     pcap_filename,
2214                     ports = None,
2215                     ipg_usec = None,
2216                     speedup = 1.0,
2217                     count = 1,
2218                     duration = -1,
2219                     is_dual = False):
2220        """
2221            Push a remote server-reachable PCAP file
2222            the path must be fullpath accessible to the server
2223
2224            :parameters:
2225                pcap_filename : str
2226                    PCAP file name in full path and accessible to the server
2227
2228                ports : list
2229                    Ports on which to execute the command
2230
2231                ipg_usec : float
2232                    Inter-packet gap in microseconds
2233
2234                speedup : float
2235                    A factor to adjust IPG. effectively IPG = IPG / speedup
2236
2237                count: int
2238                    How many times to transmit the cap
2239
2240                duration: float
2241                    Limit runtime by duration in seconds
2242
2243                is_dual: bool
2244                    Inject from both directions.
2245                    requires ERF file with meta data for direction.
2246                    also requires that all the ports will be in master mode
2247                    with their adjacent ports as slaves
2248
2249            :raises:
2250                + :exc:`STLError`
2251
2252        """
2253        ports = ports if ports is not None else self.get_acquired_ports()
2254        ports = self._validate_port_list(ports)
2255
2256        validate_type('pcap_filename', pcap_filename, basestring)
2257        validate_type('ipg_usec', ipg_usec, (float, int, type(None)))
2258        validate_type('speedup',  speedup, (float, int))
2259        validate_type('count',  count, int)
2260        validate_type('duration', duration, (float, int))
2261        validate_type('is_dual', is_dual, bool)
2262
2263        # for dual mode check that all are masters
2264        if is_dual:
2265            if not pcap_filename.endswith('erf'):
2266                raise STLError("dual mode: only ERF format is supported for dual mode")
2267
2268            for port in ports:
2269                master = port
2270                slave = port ^ 0x1
2271
2272                if slave in ports:
2273                    raise STLError("dual mode: cannot provide adjacent ports ({0}, {1}) in a batch".format(master, slave))
2274
2275                if not slave in self.get_acquired_ports():
2276                    raise STLError("dual mode: adjacent port {0} must be owned during dual mode".format(slave))
2277
2278
2279        self.logger.pre_cmd("Pushing remote PCAP on port(s) {0}:".format(ports))
2280        rc = self.__push_remote(pcap_filename, ports, ipg_usec, speedup, count, duration, is_dual)
2281        self.logger.post_cmd(rc)
2282
2283        if not rc:
2284            raise STLError(rc)
2285
2286
2287    @__api_check(True)
2288    def push_pcap (self,
2289                   pcap_filename,
2290                   ports = None,
2291                   ipg_usec = None,
2292                   speedup = 1.0,
2293                   count = 1,
2294                   duration = -1,
2295                   force = False,
2296                   vm = None,
2297                   packet_hook = None,
2298                   is_dual = False):
2299        """
2300            Push a local PCAP to the server
2301            This is equivalent to loading a PCAP file to a profile
2302            and attaching the profile to port(s)
2303
2304            file size is limited to 1MB
2305
2306            :parameters:
2307                pcap_filename : str
2308                    PCAP filename (accessible locally)
2309
2310                ports : list
2311                    Ports on which to execute the command
2312
2313                ipg_usec : float
2314                    Inter-packet gap in microseconds
2315
2316                speedup : float
2317                    A factor to adjust IPG. effectively IPG = IPG / speedup
2318
2319                count: int
2320                    How many times to transmit the cap
2321
2322                duration: float
2323                    Limit runtime by duration in seconds
2324
2325                force: bool
2326                    Ignore file size limit - push any file size to the server
2327
2328                vm: list of VM instructions
2329                    VM instructions to apply for every packet
2330
2331                packet_hook : Callable or function
2332                    Will be applied to every packet
2333
2334                is_dual: bool
2335                    Inject from both directions.
2336                    requires ERF file with meta data for direction.
2337                    also requires that all the ports will be in master mode
2338                    with their adjacent ports as slaves
2339
2340            :raises:
2341                + :exc:`STLError`
2342
2343        """
2344        ports = ports if ports is not None else self.get_acquired_ports()
2345        ports = self._validate_port_list(ports)
2346
2347        validate_type('pcap_filename', pcap_filename, basestring)
2348        validate_type('ipg_usec', ipg_usec, (float, int, type(None)))
2349        validate_type('speedup',  speedup, (float, int))
2350        validate_type('count',  count, int)
2351        validate_type('duration', duration, (float, int))
2352        validate_type('vm', vm, (list, type(None)))
2353        validate_type('is_dual', is_dual, bool)
2354
2355
2356        # no support for > 1MB PCAP - use push remote
2357        if not force and os.path.getsize(pcap_filename) > (1024 * 1024):
2358            raise STLError("PCAP size of {:} is too big for local push - consider using remote push or provide 'force'".format(format_num(os.path.getsize(pcap_filename), suffix = 'B')))
2359
2360        if is_dual:
2361            for port in ports:
2362                master = port
2363                slave = port ^ 0x1
2364
2365                if slave in ports:
2366                    raise STLError("dual mode: cannot provide adjacent ports ({0}, {1}) in a batch".format(master, slave))
2367
2368                if not slave in self.get_acquired_ports():
2369                    raise STLError("dual mode: adjacent port {0} must be owned during dual mode".format(slave))
2370
2371        # regular push
2372        if not is_dual:
2373
2374            # create the profile from the PCAP
2375            try:
2376                self.logger.pre_cmd("Converting '{0}' to streams:".format(pcap_filename))
2377                profile = STLProfile.load_pcap(pcap_filename,
2378                                               ipg_usec,
2379                                               speedup,
2380                                               count,
2381                                               vm = vm,
2382                                               packet_hook = packet_hook)
2383                self.logger.post_cmd(RC_OK)
2384            except STLError as e:
2385                self.logger.post_cmd(RC_ERR(e))
2386                raise
2387
2388
2389            self.remove_all_streams(ports = ports)
2390            id_list = self.add_streams(profile.get_streams(), ports)
2391
2392            return self.start(ports = ports, duration = duration)
2393
2394        else:
2395
2396            # create a dual profile
2397            split_mode = 'MAC'
2398
2399            try:
2400                self.logger.pre_cmd("Analyzing '{0}' for dual ports based on {1}:".format(pcap_filename, split_mode))
2401                profile_a, profile_b = STLProfile.load_pcap(pcap_filename,
2402                                                            ipg_usec,
2403                                                            speedup,
2404                                                            count,
2405                                                            vm = vm,
2406                                                            packet_hook = packet_hook,
2407                                                            split_mode = split_mode)
2408
2409                self.logger.post_cmd(RC_OK())
2410
2411            except STLError as e:
2412                self.logger.post_cmd(RC_ERR(e))
2413                raise
2414
2415            all_ports = ports + [p ^ 0x1 for p in ports]
2416
2417            self.remove_all_streams(ports = all_ports)
2418
2419            for port in ports:
2420                master = port
2421                slave = port ^ 0x1
2422
2423                self.add_streams(profile_a.get_streams(), master)
2424                self.add_streams(profile_b.get_streams(), slave)
2425
2426            return self.start(ports = all_ports, duration = duration)
2427
2428
2429
2430
2431
2432    @__api_check(True)
2433    def validate (self, ports = None, mult = "1", duration = -1, total = False):
2434        """
2435            Validate port(s) configuration
2436
2437            :parameters:
2438                ports : list
2439                    Ports on which to execute the command
2440
2441             mult : str
2442                    Multiplier in a form of pps, bps, or line util in %
2443                    Examples: "5kpps", "10gbps", "85%", "32mbps"
2444
2445            duration : int
2446                    Limit the run time (seconds)
2447                    -1 = unlimited
2448
2449            total : bool
2450                    Determines whether to divide the configured bandwidth among the ports, or to duplicate the bandwidth for each port.
2451                    True: Divide bandwidth among the ports
2452                    False: Duplicate
2453
2454            :raises:
2455                + :exc:`STLError`
2456
2457        """
2458
2459
2460        ports = ports if ports is not None else self.get_acquired_ports()
2461        ports = self._validate_port_list(ports)
2462
2463        validate_type('mult', mult, basestring)
2464        validate_type('duration', duration, (int, float))
2465        validate_type('total', total, bool)
2466
2467
2468        # verify multiplier
2469        mult_obj = parsing_opts.decode_multiplier(mult,
2470                                                  allow_update = True,
2471                                                  divide_count = len(ports) if total else 1)
2472        if not mult_obj:
2473            raise STLArgumentError('mult', mult)
2474
2475        self.logger.pre_cmd("Validating streams on port(s) {0}:".format(ports))
2476        rc = self.__validate(ports)
2477        self.logger.post_cmd(rc)
2478
2479        if not rc:
2480            raise STLError(rc)
2481
2482        for port in ports:
2483            self.ports[port].print_profile(mult_obj, duration)
2484
2485
2486    @__api_check(False)
2487    def clear_stats (self, ports = None, clear_global = True, clear_flow_stats = True, clear_latency_stats = True, clear_xstats = True):
2488        """
2489            Clear stats on port(s)
2490
2491            :parameters:
2492                ports : list
2493                    Ports on which to execute the command
2494
2495                clear_global : bool
2496                    Clear the global stats
2497
2498                clear_flow_stats : bool
2499                    Clear the flow stats
2500
2501                clear_latency_stats : bool
2502                    Clear the latency stats
2503
2504            :raises:
2505                + :exc:`STLError`
2506
2507        """
2508
2509        ports = ports if ports is not None else self.get_all_ports()
2510        ports = self._validate_port_list(ports)
2511
2512        # verify clear global
2513        if not type(clear_global) is bool:
2514            raise STLArgumentError('clear_global', clear_global)
2515
2516        rc = self.__clear_stats(ports, clear_global, clear_flow_stats, clear_latency_stats)
2517        if not rc:
2518            raise STLError(rc)
2519
2520
2521
2522    @__api_check(True)
2523    def is_traffic_active (self, ports = None):
2524        """
2525            Return if specified port(s) have traffic
2526
2527            :parameters:
2528                ports : list
2529                    Ports on which to execute the command
2530
2531
2532            :raises:
2533                + :exc:`STLTimeoutError` - in case timeout has expired
2534                + :exe:'STLError'
2535
2536        """
2537
2538        ports = ports if ports is not None else self.get_acquired_ports()
2539        ports = self._validate_port_list(ports)
2540
2541        return set(self.get_active_ports()).intersection(ports)
2542
2543
2544
2545    @__api_check(True)
2546    def wait_on_traffic (self, ports = None, timeout = None, rx_delay_ms = 10):
2547        """
2548            .. _wait_on_traffic:
2549
2550            Block until traffic on specified port(s) has ended
2551
2552            :parameters:
2553                ports : list
2554                    Ports on which to execute the command
2555
2556                timeout : int
2557                    timeout in seconds
2558                    default will be blocking
2559
2560                rx_delay_ms : int
2561                    Time to wait (in milliseconds) after last packet was sent, until RX filters used for
2562                    measuring flow statistics and latency are removed.
2563                    This value should reflect the time it takes packets which were transmitted to arrive
2564                    to the destination.
2565                    After this time, RX filters will be removed, and packets arriving for per flow statistics feature and latency flows will be counted as errors.
2566
2567            :raises:
2568                + :exc:`STLTimeoutError` - in case timeout has expired
2569                + :exe:'STLError'
2570
2571        """
2572
2573        ports = ports if ports is not None else self.get_acquired_ports()
2574        ports = self._validate_port_list(ports)
2575
2576
2577        timer = PassiveTimer(timeout)
2578
2579        # wait while any of the required ports are active
2580        while set(self.get_active_ports()).intersection(ports):
2581
2582            # make sure ASYNC thread is still alive - otherwise we will be stuck forever
2583            if not self.async_client.is_thread_alive():
2584                raise STLError("subscriber thread is dead")
2585
2586            time.sleep(0.01)
2587            if timer.has_expired():
2588                raise STLTimeoutError(timeout)
2589
2590        # remove any RX filters
2591        rc = self._remove_rx_filters(ports, rx_delay_ms = rx_delay_ms)
2592        if not rc:
2593            raise STLError(rc)
2594
2595
2596    @__api_check(True)
2597    def set_port_attr (self, ports = None, promiscuous = None, link_up = None, led_on = None, flow_ctrl = None):
2598        """
2599            Set port attributes
2600
2601            :parameters:
2602                promiscuous - True or False
2603                link_up     - True or False
2604                led_on      - True or False
2605                flow_ctrl   - 0: disable all, 1: enable tx side, 2: enable rx side, 3: full enable
2606
2607            :raises:
2608                None
2609
2610        """
2611
2612        ports = ports if ports is not None else self.get_acquired_ports()
2613        ports = self._validate_port_list(ports)
2614
2615        # check arguments
2616        validate_type('promiscuous', promiscuous, (bool, type(None)))
2617        validate_type('link_up', link_up, (bool, type(None)))
2618        validate_type('led_on', led_on, (bool, type(None)))
2619        validate_type('flow_ctrl', flow_ctrl, (int, type(None)))
2620
2621        # build attributes
2622        attr_dict = {}
2623        if promiscuous is not None:
2624            attr_dict['promiscuous'] = {'enabled': promiscuous}
2625        if link_up is not None:
2626            attr_dict['link_status'] = {'up': link_up}
2627        if led_on is not None:
2628            attr_dict['led_status'] = {'on': led_on}
2629        if flow_ctrl is not None:
2630            attr_dict['flow_ctrl_mode'] = {'mode': flow_ctrl}
2631
2632        # no attributes to set
2633        if not attr_dict:
2634            return
2635
2636        self.logger.pre_cmd("Applying attributes on port(s) {0}:".format(ports))
2637        rc = self.__set_port_attr(ports, attr_dict)
2638        self.logger.post_cmd(rc)
2639
2640        if not rc:
2641            raise STLError(rc)
2642
2643    def clear_events (self):
2644        """
2645            Clear all events
2646
2647            :parameters:
2648                None
2649
2650            :raises:
2651                None
2652
2653        """
2654        self.event_handler.clear_events()
2655
2656
2657    ############################   Line       #############################
2658    ############################   Commands   #############################
2659    ############################              #############################
2660
2661    # console decorator
2662    def __console(f):
2663        @wraps(f)
2664        def wrap(*args):
2665            client = args[0]
2666
2667            time1 = time.time()
2668
2669            try:
2670                rc = f(*args)
2671            except STLError as e:
2672                client.logger.log("Log:\n" + format_text(e.brief() + "\n", 'bold'))
2673                return RC_ERR(e.brief())
2674
2675            # if got true - print time
2676            if rc:
2677                delta = time.time() - time1
2678                client.logger.log(format_time(delta) + "\n")
2679
2680            return rc
2681
2682        return wrap
2683
2684    @__console
2685    def ping_line (self, line):
2686        '''pings the server'''
2687        self.ping()
2688        return RC_OK()
2689
2690    @__console
2691    def shutdown_line (self, line):
2692        '''shutdown the server'''
2693        parser = parsing_opts.gen_parser(self,
2694                                         "shutdown",
2695                                         self.shutdown_line.__doc__,
2696                                         parsing_opts.FORCE)
2697
2698        opts = parser.parse_args(line.split())
2699        if not opts:
2700            return opts
2701
2702        self.server_shutdown(force = opts.force)
2703        return RC_OK()
2704
2705    @__console
2706    def connect_line (self, line):
2707        '''Connects to the TRex server and acquire ports'''
2708        parser = parsing_opts.gen_parser(self,
2709                                         "connect",
2710                                         self.connect_line.__doc__,
2711                                         parsing_opts.PORT_LIST_WITH_ALL,
2712                                         parsing_opts.FORCE)
2713
2714        opts = parser.parse_args(line.split(), default_ports = self.get_all_ports())
2715        if not opts:
2716            return opts
2717
2718        self.connect()
2719        self.acquire(ports = opts.ports, force = opts.force)
2720
2721        return RC_OK()
2722
2723
2724    @__console
2725    def acquire_line (self, line):
2726        '''Acquire ports\n'''
2727
2728        # define a parser
2729        parser = parsing_opts.gen_parser(self,
2730                                         "acquire",
2731                                         self.acquire_line.__doc__,
2732                                         parsing_opts.PORT_LIST_WITH_ALL,
2733                                         parsing_opts.FORCE)
2734
2735        opts = parser.parse_args(line.split(), default_ports = self.get_all_ports())
2736        if not opts:
2737            return opts
2738
2739        # filter out all the already owned ports
2740        ports = list_difference(opts.ports, self.get_acquired_ports())
2741        if not ports:
2742            msg = "acquire - all of port(s) {0} are already acquired".format(opts.ports)
2743            self.logger.log(format_text(msg, 'bold'))
2744            return RC_ERR(msg)
2745
2746        self.acquire(ports = ports, force = opts.force)
2747
2748        return RC_OK()
2749
2750
2751    #
2752    @__console
2753    def release_line (self, line):
2754        '''Release ports\n'''
2755
2756        parser = parsing_opts.gen_parser(self,
2757                                         "release",
2758                                         self.release_line.__doc__,
2759                                         parsing_opts.PORT_LIST_WITH_ALL)
2760
2761        opts = parser.parse_args(line.split(), default_ports = self.get_acquired_ports())
2762        if not opts:
2763            return opts
2764
2765        ports = list_intersect(opts.ports, self.get_acquired_ports())
2766        if not ports:
2767            if not opts.ports:
2768                msg = "release - no acquired ports"
2769                self.logger.log(format_text(msg, 'bold'))
2770                return RC_ERR(msg)
2771            else:
2772                msg = "release - none of port(s) {0} are acquired".format(opts.ports)
2773                self.logger.log(format_text(msg, 'bold'))
2774                return RC_ERR(msg)
2775
2776
2777        self.release(ports = ports)
2778
2779        return RC_OK()
2780
2781
2782    @__console
2783    def reacquire_line (self, line):
2784        '''reacquire all the ports under your username which are not acquired by your session'''
2785
2786        parser = parsing_opts.gen_parser(self,
2787                                         "reacquire",
2788                                         self.reacquire_line.__doc__)
2789
2790        opts = parser.parse_args(line.split())
2791        if not opts:
2792            return opts
2793
2794        # find all the on-owned ports under your name
2795        my_unowned_ports = list_difference([k for k, v in self.ports.items() if v.get_owner() == self.username], self.get_acquired_ports())
2796        if not my_unowned_ports:
2797            msg = "reacquire - no unowned ports under '{0}'".format(self.username)
2798            self.logger.log(msg)
2799            return RC_ERR(msg)
2800
2801        self.acquire(ports = my_unowned_ports, force = True)
2802        return RC_OK()
2803
2804
2805    @__console
2806    def disconnect_line (self, line):
2807        self.disconnect()
2808
2809
2810    @__console
2811    def reset_line (self, line):
2812        '''Reset ports - if no ports are provided all acquired ports will be reset'''
2813
2814        parser = parsing_opts.gen_parser(self,
2815                                         "reset",
2816                                         self.reset_line.__doc__,
2817                                         parsing_opts.PORT_LIST_WITH_ALL)
2818
2819        opts = parser.parse_args(line.split(), default_ports = self.get_acquired_ports(), verify_acquired = True)
2820        if not opts:
2821            return opts
2822
2823        self.reset(ports = opts.ports)
2824
2825        return RC_OK()
2826
2827
2828
2829    @__console
2830    def start_line (self, line):
2831        '''Start selected traffic on specified ports on TRex\n'''
2832        # define a parser
2833        parser = parsing_opts.gen_parser(self,
2834                                         "start",
2835                                         self.start_line.__doc__,
2836                                         parsing_opts.PORT_LIST_WITH_ALL,
2837                                         parsing_opts.TOTAL,
2838                                         parsing_opts.FORCE,
2839                                         parsing_opts.FILE_PATH,
2840                                         parsing_opts.DURATION,
2841                                         parsing_opts.TUNABLES,
2842                                         parsing_opts.MULTIPLIER_STRICT,
2843                                         parsing_opts.DRY_RUN,
2844                                         parsing_opts.CORE_MASK_GROUP)
2845
2846        opts = parser.parse_args(line.split(), default_ports = self.get_acquired_ports(), verify_acquired = True)
2847        if not opts:
2848            return opts
2849
2850        # core mask
2851        if opts.core_mask is not None:
2852            core_mask =  opts.core_mask
2853        else:
2854            core_mask = self.CORE_MASK_PIN if opts.pin_cores else self.CORE_MASK_SPLIT
2855
2856        # just for sanity - will be checked on the API as well
2857        self.__decode_core_mask(opts.ports, core_mask)
2858
2859        active_ports = list_intersect(self.get_active_ports(), opts.ports)
2860        if active_ports:
2861            if not opts.force:
2862                msg = "Port(s) {0} are active - please stop them or add '--force'\n".format(active_ports)
2863                self.logger.log(format_text(msg, 'bold'))
2864                return RC_ERR(msg)
2865            else:
2866                self.stop(active_ports)
2867
2868
2869        # process tunables
2870        if type(opts.tunables) is dict:
2871            tunables = opts.tunables
2872        else:
2873            tunables = {}
2874
2875
2876        # remove all streams
2877        self.remove_all_streams(opts.ports)
2878
2879        # pack the profile
2880        try:
2881            for port in opts.ports:
2882
2883                profile = STLProfile.load(opts.file[0],
2884                                          direction = tunables.get('direction', port % 2),
2885                                          port_id = port,
2886                                          **tunables)
2887
2888                self.add_streams(profile.get_streams(), ports = port)
2889
2890        except STLError as e:
2891            error = 'Unknown error.'
2892            for line in e.brief().split('\n'):
2893                if line:
2894                    error = line
2895            msg = format_text("\nError loading profile '{0}'".format(opts.file[0]), 'bold')
2896            self.logger.log(msg + '\n')
2897            self.logger.log(e.brief() + "\n")
2898            return RC_ERR("%s: %s" % (msg, error))
2899
2900
2901        if opts.dry:
2902            self.validate(opts.ports, opts.mult, opts.duration, opts.total)
2903        else:
2904
2905            self.start(opts.ports,
2906                       opts.mult,
2907                       opts.force,
2908                       opts.duration,
2909                       opts.total,
2910                       core_mask)
2911
2912        return RC_OK()
2913
2914
2915
2916    @__console
2917    def stop_line (self, line):
2918        '''Stop active traffic on specified ports on TRex\n'''
2919        parser = parsing_opts.gen_parser(self,
2920                                         "stop",
2921                                         self.stop_line.__doc__,
2922                                         parsing_opts.PORT_LIST_WITH_ALL)
2923
2924        opts = parser.parse_args(line.split(), default_ports = self.get_active_ports(), verify_acquired = True)
2925        if not opts:
2926            return opts
2927
2928
2929        # find the relevant ports
2930        ports = list_intersect(opts.ports, self.get_active_ports())
2931        if not ports:
2932            if not opts.ports:
2933                msg = 'stop - no active ports'
2934            else:
2935                msg = 'stop - no active traffic on ports {0}'.format(opts.ports)
2936
2937            self.logger.log(msg)
2938            return RC_ERR(msg)
2939
2940        # call API
2941        self.stop(ports)
2942
2943        return RC_OK()
2944
2945
2946    @__console
2947    def update_line (self, line):
2948        '''Update port(s) speed currently active\n'''
2949        parser = parsing_opts.gen_parser(self,
2950                                         "update",
2951                                         self.update_line.__doc__,
2952                                         parsing_opts.PORT_LIST_WITH_ALL,
2953                                         parsing_opts.MULTIPLIER,
2954                                         parsing_opts.TOTAL,
2955                                         parsing_opts.FORCE)
2956
2957        opts = parser.parse_args(line.split(), default_ports = self.get_active_ports(), verify_acquired = True)
2958        if not opts:
2959            return opts
2960
2961
2962        # find the relevant ports
2963        ports = list_intersect(opts.ports, self.get_active_ports())
2964        if not ports:
2965            if not opts.ports:
2966                msg = 'update - no active ports'
2967            else:
2968                msg = 'update - no active traffic on ports {0}'.format(opts.ports)
2969
2970            self.logger.log(msg)
2971            return RC_ERR(msg)
2972
2973        self.update(ports, opts.mult, opts.total, opts.force)
2974
2975        return RC_OK()
2976
2977
2978    @__console
2979    def pause_line (self, line):
2980        '''Pause active traffic on specified ports on TRex\n'''
2981        parser = parsing_opts.gen_parser(self,
2982                                         "pause",
2983                                         self.pause_line.__doc__,
2984                                         parsing_opts.PORT_LIST_WITH_ALL)
2985
2986        opts = parser.parse_args(line.split(), default_ports = self.get_transmitting_ports(), verify_acquired = True)
2987        if not opts:
2988            return opts
2989
2990        # check for already paused case
2991        if opts.ports and is_sub_list(opts.ports, self.get_paused_ports()):
2992            msg = 'pause - all of port(s) {0} are already paused'.format(opts.ports)
2993            self.logger.log(msg)
2994            return RC_ERR(msg)
2995
2996        # find the relevant ports
2997        ports = list_intersect(opts.ports, self.get_transmitting_ports())
2998        if not ports:
2999            if not opts.ports:
3000                msg = 'pause - no transmitting ports'
3001            else:
3002                msg = 'pause - none of ports {0} are transmitting'.format(opts.ports)
3003
3004            self.logger.log(msg)
3005            return RC_ERR(msg)
3006
3007        self.pause(ports)
3008
3009        return RC_OK()
3010
3011
3012    @__console
3013    def resume_line (self, line):
3014        '''Resume active traffic on specified ports on TRex\n'''
3015        parser = parsing_opts.gen_parser(self,
3016                                         "resume",
3017                                         self.resume_line.__doc__,
3018                                         parsing_opts.PORT_LIST_WITH_ALL)
3019
3020        opts = parser.parse_args(line.split(), default_ports = self.get_paused_ports(), verify_acquired = True)
3021        if not opts:
3022            return opts
3023
3024        # find the relevant ports
3025        ports = list_intersect(opts.ports, self.get_paused_ports())
3026        if not ports:
3027            if not opts.ports:
3028                msg = 'resume - no paused ports'
3029            else:
3030                msg = 'resume - none of ports {0} are paused'.format(opts.ports)
3031
3032            self.logger.log(msg)
3033            return RC_ERR(msg)
3034
3035
3036        self.resume(ports)
3037
3038        # true means print time
3039        return RC_OK()
3040
3041
3042    @__console
3043    def clear_stats_line (self, line):
3044        '''Clear cached local statistics\n'''
3045        # define a parser
3046        parser = parsing_opts.gen_parser(self,
3047                                         "clear",
3048                                         self.clear_stats_line.__doc__,
3049                                         parsing_opts.PORT_LIST_WITH_ALL)
3050
3051        opts = parser.parse_args(line.split())
3052
3053        if not opts:
3054            return opts
3055
3056        self.clear_stats(opts.ports)
3057
3058        return RC_OK()
3059
3060
3061    @__console
3062    def show_stats_line (self, line):
3063        '''Get statistics from TRex server by port\n'''
3064        # define a parser
3065        parser = parsing_opts.gen_parser(self,
3066                                         "stats",
3067                                         self.show_stats_line.__doc__,
3068                                         parsing_opts.PORT_LIST_WITH_ALL,
3069                                         parsing_opts.STATS_MASK)
3070
3071        opts = parser.parse_args(line.split())
3072
3073        if not opts:
3074            return opts
3075
3076        # determine stats mask
3077        mask = self.__get_mask_keys(**self.__filter_namespace_args(opts, trex_stl_stats.ALL_STATS_OPTS))
3078        if not mask:
3079            # set to show all stats if no filter was given
3080            mask = trex_stl_stats.COMPACT
3081
3082        stats_opts = common.list_intersect(trex_stl_stats.ALL_STATS_OPTS, mask)
3083
3084        stats = self._get_formatted_stats(opts.ports, mask)
3085
3086
3087        # print stats to screen
3088        for stat_type, stat_data in stats.items():
3089            text_tables.print_table_with_header(stat_data.text_table, stat_type)
3090
3091
3092    @__console
3093    def show_streams_line(self, line):
3094        '''Get stream statistics from TRex server by port\n'''
3095        # define a parser
3096        parser = parsing_opts.gen_parser(self,
3097                                         "streams",
3098                                         self.show_streams_line.__doc__,
3099                                         parsing_opts.PORT_LIST_WITH_ALL,
3100                                         parsing_opts.STREAMS_MASK)
3101
3102        opts = parser.parse_args(line.split())
3103
3104        if not opts:
3105            return opts
3106
3107        streams = self._get_streams(opts.ports, set(opts.streams))
3108        if not streams:
3109            self.logger.log(format_text("No streams found with desired filter.\n", "bold", "magenta"))
3110
3111        else:
3112            # print stats to screen
3113            for stream_hdr, port_streams_data in streams.items():
3114                text_tables.print_table_with_header(port_streams_data.text_table,
3115                                                    header= stream_hdr.split(":")[0] + ":",
3116                                                    untouched_header= stream_hdr.split(":")[1])
3117
3118
3119
3120
3121    @__console
3122    def validate_line (self, line):
3123        '''Validates port(s) stream configuration\n'''
3124
3125        parser = parsing_opts.gen_parser(self,
3126                                         "validate",
3127                                         self.validate_line.__doc__,
3128                                         parsing_opts.PORT_LIST_WITH_ALL)
3129
3130        opts = parser.parse_args(line.split())
3131        if not opts:
3132            return opts
3133
3134        self.validate(opts.ports)
3135
3136
3137
3138
3139    @__console
3140    def push_line (self, line):
3141        '''Push a pcap file '''
3142
3143        parser = parsing_opts.gen_parser(self,
3144                                         "push",
3145                                         self.push_line.__doc__,
3146                                         parsing_opts.FILE_PATH,
3147                                         parsing_opts.REMOTE_FILE,
3148                                         parsing_opts.PORT_LIST_WITH_ALL,
3149                                         parsing_opts.COUNT,
3150                                         parsing_opts.DURATION,
3151                                         parsing_opts.IPG,
3152                                         parsing_opts.SPEEDUP,
3153                                         parsing_opts.FORCE,
3154                                         parsing_opts.DUAL)
3155
3156        opts = parser.parse_args(line.split(), verify_acquired = True)
3157        if not opts:
3158            return opts
3159
3160        active_ports = list(set(self.get_active_ports()).intersection(opts.ports))
3161
3162        if active_ports:
3163            if not opts.force:
3164                msg = "Port(s) {0} are active - please stop them or add '--force'\n".format(active_ports)
3165                self.logger.log(format_text(msg, 'bold'))
3166                return RC_ERR(msg)
3167            else:
3168                self.stop(active_ports)
3169
3170
3171        if opts.remote:
3172            self.push_remote(opts.file[0],
3173                             ports     = opts.ports,
3174                             ipg_usec  = opts.ipg_usec,
3175                             speedup   = opts.speedup,
3176                             count     = opts.count,
3177                             duration  = opts.duration,
3178                             is_dual   = opts.dual)
3179
3180        else:
3181            self.push_pcap(opts.file[0],
3182                           ports     = opts.ports,
3183                           ipg_usec  = opts.ipg_usec,
3184                           speedup   = opts.speedup,
3185                           count     = opts.count,
3186                           duration  = opts.duration,
3187                           force     = opts.force,
3188                           is_dual   = opts.dual)
3189
3190
3191
3192        return RC_OK()
3193
3194
3195
3196    @__console
3197    def set_port_attr_line (self, line):
3198        '''Sets port attributes '''
3199
3200        parser = parsing_opts.gen_parser(self,
3201                                         "port_attr",
3202                                         self.set_port_attr_line.__doc__,
3203                                         parsing_opts.PORT_LIST_WITH_ALL,
3204                                         parsing_opts.PROMISCUOUS,
3205                                         parsing_opts.LINK_STATUS,
3206                                         parsing_opts.LED_STATUS,
3207                                         parsing_opts.FLOW_CTRL,
3208                                         )
3209
3210        opts = parser.parse_args(line.split(), default_ports = self.get_acquired_ports(), verify_acquired = True)
3211        if not opts:
3212            return opts
3213
3214        opts.prom      = parsing_opts.on_off_dict.get(opts.prom)
3215        opts.link      = parsing_opts.on_off_dict.get(opts.link)
3216        opts.led       = parsing_opts.on_off_dict.get(opts.led)
3217        opts.flow_ctrl = parsing_opts.flow_ctrl_dict.get(opts.flow_ctrl)
3218
3219        # if no attributes - fall back to printing the status
3220        if not filter(lambda x:x is not None, [opts.prom, opts.link, opts.led, opts.flow_ctrl]):
3221            self.show_stats_line("--ps --port {0}".format(' '.join(str(port) for port in opts.ports)))
3222            return
3223
3224        return self.set_port_attr(opts.ports, opts.prom, opts.link, opts.led, opts.flow_ctrl)
3225
3226
3227    @__console
3228    def show_profile_line (self, line):
3229        '''Shows profile information'''
3230
3231        parser = parsing_opts.gen_parser(self,
3232                                         "port",
3233                                         self.show_profile_line.__doc__,
3234                                         parsing_opts.FILE_PATH)
3235
3236        opts = parser.parse_args(line.split())
3237        if not opts:
3238            return opts
3239
3240        info = STLProfile.get_info(opts.file[0])
3241
3242        self.logger.log(format_text('\nProfile Information:\n', 'bold'))
3243
3244        # general info
3245        self.logger.log(format_text('\nGeneral Information:', 'underline'))
3246        self.logger.log('Filename:         {:^12}'.format(opts.file[0]))
3247        self.logger.log('Stream count:     {:^12}'.format(info['stream_count']))
3248
3249        # specific info
3250        profile_type = info['type']
3251        self.logger.log(format_text('\nSpecific Information:', 'underline'))
3252
3253        if profile_type == 'python':
3254            self.logger.log('Type:             {:^12}'.format('Python Module'))
3255            self.logger.log('Tunables:         {:^12}'.format(str(['{0} = {1}'.format(k ,v) for k, v in info['tunables'].items()])))
3256
3257        elif profile_type == 'yaml':
3258            self.logger.log('Type:             {:^12}'.format('YAML'))
3259
3260        elif profile_type == 'pcap':
3261            self.logger.log('Type:             {:^12}'.format('PCAP file'))
3262
3263        self.logger.log("")
3264
3265
3266    @__console
3267    def get_events_line (self, line):
3268        '''shows events recieved from server\n'''
3269
3270        x = [parsing_opts.ArgumentPack(['-c','--clear'],
3271                                      {'action' : "store_true",
3272                                       'default': False,
3273                                       'help': "clear the events log"}),
3274
3275             parsing_opts.ArgumentPack(['-i','--info'],
3276                                      {'action' : "store_true",
3277                                       'default': False,
3278                                       'help': "show info events"}),
3279
3280             parsing_opts.ArgumentPack(['-w','--warn'],
3281                                      {'action' : "store_true",
3282                                       'default': False,
3283                                       'help': "show warning events"}),
3284
3285             ]
3286
3287
3288        parser = parsing_opts.gen_parser(self,
3289                                         "events",
3290                                         self.get_events_line.__doc__,
3291                                         *x)
3292
3293        opts = parser.parse_args(line.split())
3294        if not opts:
3295            return opts
3296
3297
3298        ev_type_filter = []
3299
3300        if opts.info:
3301            ev_type_filter.append('info')
3302
3303        if opts.warn:
3304            ev_type_filter.append('warning')
3305
3306        if not ev_type_filter:
3307            ev_type_filter = None
3308
3309        events = self.get_events(ev_type_filter)
3310        for ev in events:
3311            self.logger.log(ev)
3312
3313        if opts.clear:
3314            self.clear_events()
3315
3316    def generate_prompt (self, prefix = 'trex'):
3317        if not self.is_connected():
3318            return "{0}(offline)>".format(prefix)
3319
3320        elif not self.get_acquired_ports():
3321            return "{0}(read-only)>".format(prefix)
3322
3323        elif self.is_all_ports_acquired():
3324            return "{0}>".format(prefix)
3325
3326        else:
3327            return "{0} {1}>".format(prefix, self.get_acquired_ports())
3328