trex_stl_client.py revision 5591bec0
1#!/router/bin/python
2
3# for API usage the path name must be full
4from .trex_stl_exceptions import *
5from .trex_stl_streams import *
6
7from .trex_stl_jsonrpc_client import JsonRpcClient, BatchMessage
8from . import trex_stl_stats
9
10from .trex_stl_port import Port
11from .trex_stl_types import *
12from .trex_stl_async_client import CTRexAsyncClient
13
14from .utils import parsing_opts, text_tables, common
15from .utils.common import list_intersect, list_difference, is_sub_list, PassiveTimer
16from .utils.text_opts import *
17from functools import wraps
18
19from collections import namedtuple
20from yaml import YAMLError
21import time
22import datetime
23import re
24import random
25import json
26import traceback
27
28############################     logger     #############################
29############################                #############################
30############################                #############################
31
32# logger API for the client
33class LoggerApi(object):
34    # verbose levels
35    VERBOSE_QUIET   = 0
36    VERBOSE_REGULAR = 1
37    VERBOSE_HIGH    = 2
38
39    def __init__(self):
40        self.level = LoggerApi.VERBOSE_REGULAR
41
42    # implemented by specific logger
43    def write(self, msg, newline = True):
44        raise Exception("Implement this")
45
46    # implemented by specific logger
47    def flush(self):
48        raise Exception("Implement this")
49
50    def set_verbose (self, level):
51        if not level in range(self.VERBOSE_QUIET, self.VERBOSE_HIGH + 1):
52            raise ValueError("Bad value provided for logger")
53
54        self.level = level
55
56    def get_verbose (self):
57        return self.level
58
59
60    def check_verbose (self, level):
61        return (self.level >= level)
62
63
64    # simple log message with verbose
65    def log (self, msg, level = VERBOSE_REGULAR, newline = True):
66        if not self.check_verbose(level):
67            return
68
69        self.write(msg, newline)
70
71    # logging that comes from async event
72    def async_log (self, msg, level = VERBOSE_REGULAR, newline = True):
73        self.log(msg, level, newline)
74
75
76    def pre_cmd (self, desc):
77        self.log(format_text('\n{:<60}'.format(desc), 'bold'), newline = False)
78        self.flush()
79
80    def post_cmd (self, rc):
81        if rc:
82            self.log(format_text("[SUCCESS]\n", 'green', 'bold'))
83        else:
84            self.log(format_text("[FAILED]\n", 'red', 'bold'))
85
86
87    def log_cmd (self, desc):
88        self.pre_cmd(desc)
89        self.post_cmd(True)
90
91
92    # supress object getter
93    def supress (self):
94        class Supress(object):
95            def __init__ (self, logger):
96                self.logger = logger
97
98            def __enter__ (self):
99                self.saved_level = self.logger.get_verbose()
100                self.logger.set_verbose(LoggerApi.VERBOSE_QUIET)
101
102            def __exit__ (self, type, value, traceback):
103                self.logger.set_verbose(self.saved_level)
104
105        return Supress(self)
106
107
108
109# default logger - to stdout
110class DefaultLogger(LoggerApi):
111
112    def __init__ (self):
113        super(DefaultLogger, self).__init__()
114
115    def write (self, msg, newline = True):
116        if newline:
117            print(msg)
118        else:
119            print (msg),
120
121    def flush (self):
122        sys.stdout.flush()
123
124
125############################     async event hander     #############################
126############################                            #############################
127############################                            #############################
128
129# an event
130class Event(object):
131
132    def __init__ (self, origin, ev_type, msg):
133        self.origin = origin
134        self.ev_type = ev_type
135        self.msg = msg
136
137        self.ts = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
138
139    def __str__ (self):
140
141        prefix = "[{:^}][{:^}]".format(self.origin, self.ev_type)
142
143        return "{:<10} - {:18} - {:}".format(self.ts, prefix, format_text(self.msg, 'bold'))
144
145
146# handles different async events given to the client
147class EventsHandler(object):
148
149
150    def __init__ (self, client):
151        self.client = client
152        self.logger = self.client.logger
153
154        self.events = []
155
156    # public functions
157
158    def get_events (self, ev_type_filter = None):
159        if ev_type_filter:
160            return [ev for ev in self.events if ev.ev_type in listify(ev_type_filter)]
161        else:
162            return [ev for ev in self.events]
163
164
165    def clear_events (self):
166        self.events = []
167
168
169    def log_warning (self, msg, show = True):
170        self.__add_event_log('local', 'warning', msg, show)
171
172
173    # events called internally
174
175    def on_async_dead (self):
176        if self.client.connected:
177            msg = 'Lost connection to server'
178            self.__add_event_log('local', 'info', msg, True)
179            self.client.connected = False
180
181
182    def on_async_alive (self):
183        pass
184
185
186
187    def on_async_rx_stats_event (self, data, baseline):
188        self.client.flow_stats.update(data, baseline)
189
190    def on_async_latency_stats_event (self, data, baseline):
191        self.client.latency_stats.update(data, baseline)
192
193    # handles an async stats update from the subscriber
194    def on_async_stats_update(self, dump_data, baseline):
195        global_stats = {}
196        port_stats = {}
197
198        # filter the values per port and general
199        for key, value in dump_data.items():
200            # match a pattern of ports
201            m = re.search('(.*)\-(\d+)', key)
202            if m:
203                port_id = int(m.group(2))
204                field_name = m.group(1)
205                if port_id in self.client.ports:
206                    if not port_id in port_stats:
207                        port_stats[port_id] = {}
208                    port_stats[port_id][field_name] = value
209                else:
210                    continue
211            else:
212                # no port match - general stats
213                global_stats[key] = value
214
215        # update the general object with the snapshot
216        self.client.global_stats.update(global_stats, baseline)
217
218        # update all ports
219        for port_id, data in port_stats.items():
220            self.client.ports[port_id].port_stats.update(data, baseline)
221
222
223
224    # dispatcher for server async events (port started, port stopped and etc.)
225    def on_async_event (self, type, data):
226        # DP stopped
227        show_event = False
228
229        # port started
230        if (type == 0):
231            port_id = int(data['port_id'])
232            ev = "Port {0} has started".format(port_id)
233            self.__async_event_port_started(port_id)
234
235        # port stopped
236        elif (type == 1):
237            port_id = int(data['port_id'])
238            ev = "Port {0} has stopped".format(port_id)
239
240            # call the handler
241            self.__async_event_port_stopped(port_id)
242
243
244        # port paused
245        elif (type == 2):
246            port_id = int(data['port_id'])
247            ev = "Port {0} has paused".format(port_id)
248
249            # call the handler
250            self.__async_event_port_paused(port_id)
251
252        # port resumed
253        elif (type == 3):
254            port_id = int(data['port_id'])
255            ev = "Port {0} has resumed".format(port_id)
256
257            # call the handler
258            self.__async_event_port_resumed(port_id)
259
260        # port finished traffic
261        elif (type == 4):
262            port_id = int(data['port_id'])
263            ev = "Port {0} job done".format(port_id)
264
265            # call the handler
266            self.__async_event_port_job_done(port_id)
267            show_event = True
268
269        # port was acquired - maybe stolen...
270        elif (type == 5):
271            session_id = data['session_id']
272
273            port_id = int(data['port_id'])
274            who     = data['who']
275            force   = data['force']
276
277            # if we hold the port and it was not taken by this session - show it
278            if port_id in self.client.get_acquired_ports() and session_id != self.client.session_id:
279                show_event = True
280
281            # format the thief/us...
282            if session_id == self.client.session_id:
283                user = 'you'
284            elif who == self.client.username:
285                user = 'another session of you'
286            else:
287                user = "'{0}'".format(who)
288
289            if force:
290                ev = "Port {0} was forcely taken by {1}".format(port_id, user)
291            else:
292                ev = "Port {0} was taken by {1}".format(port_id, user)
293
294            # call the handler in case its not this session
295            if session_id != self.client.session_id:
296                self.__async_event_port_acquired(port_id, who)
297
298
299        # port was released
300        elif (type == 6):
301            port_id     = int(data['port_id'])
302            who         = data['who']
303            session_id  = data['session_id']
304
305            if session_id == self.client.session_id:
306                user = 'you'
307            elif who == self.client.username:
308                user = 'another session of you'
309            else:
310                user = "'{0}'".format(who)
311
312            ev = "Port {0} was released by {1}".format(port_id, user)
313
314            # call the handler in case its not this session
315            if session_id != self.client.session_id:
316                self.__async_event_port_released(port_id)
317
318        elif (type == 7):
319            port_id = int(data['port_id'])
320            ev = "port {0} job failed".format(port_id)
321            show_event = True
322
323        # server stopped
324        elif (type == 100):
325            ev = "Server has stopped"
326            self.__async_event_server_stopped()
327            show_event = True
328
329
330        else:
331            # unknown event - ignore
332            return
333
334
335        self.__add_event_log('server', 'info', ev, show_event)
336
337
338    # private functions
339
340    # on rare cases events may come on a non existent prot
341    # (server was re-run with different config)
342    def __async_event_port_job_done (self, port_id):
343        if port_id in self.client.ports:
344            self.client.ports[port_id].async_event_port_job_done()
345
346    def __async_event_port_stopped (self, port_id):
347        if port_id in self.client.ports:
348            self.client.ports[port_id].async_event_port_stopped()
349
350
351    def __async_event_port_started (self, port_id):
352        if port_id in self.client.ports:
353            self.client.ports[port_id].async_event_port_started()
354
355    def __async_event_port_paused (self, port_id):
356        if port_id in self.client.ports:
357            self.client.ports[port_id].async_event_port_paused()
358
359
360    def __async_event_port_resumed (self, port_id):
361        if port_id in self.client.ports:
362            self.client.ports[port_id].async_event_port_resumed()
363
364    def __async_event_port_acquired (self, port_id, who):
365        if port_id in self.client.ports:
366            self.client.ports[port_id].async_event_acquired(who)
367
368    def __async_event_port_released (self, port_id):
369        if port_id in self.client.ports:
370            self.client.ports[port_id].async_event_released()
371
372    def __async_event_server_stopped (self):
373        self.client.connected = False
374
375
376    # add event to log
377    def __add_event_log (self, origin, ev_type, msg, show = False):
378
379        event = Event(origin, ev_type, msg)
380        self.events.append(event)
381        if show:
382            self.logger.async_log("\n\n{0}".format(str(event)))
383
384
385
386
387
388############################     RPC layer     #############################
389############################                   #############################
390############################                   #############################
391
392class CCommLink(object):
393    """Describes the connectivity of the stateless client method"""
394    def __init__(self, server="localhost", port=5050, virtual=False, client = None):
395        self.virtual = virtual
396        self.server = server
397        self.port = port
398        self.rpc_link = JsonRpcClient(self.server, self.port, client)
399
400    @property
401    def is_connected(self):
402        if not self.virtual:
403            return self.rpc_link.connected
404        else:
405            return True
406
407    def get_server (self):
408        return self.server
409
410    def get_port (self):
411        return self.port
412
413    def connect(self):
414        if not self.virtual:
415            return self.rpc_link.connect()
416
417    def disconnect(self):
418        if not self.virtual:
419            return self.rpc_link.disconnect()
420
421    def transmit(self, method_name, params = None, api_class = 'core'):
422        if self.virtual:
423            self._prompt_virtual_tx_msg()
424            _, msg = self.rpc_link.create_jsonrpc_v2(method_name, params, api_class)
425            print(msg)
426            return
427        else:
428            return self.rpc_link.invoke_rpc_method(method_name, params, api_class)
429
430    def transmit_batch(self, batch_list):
431        if self.virtual:
432            self._prompt_virtual_tx_msg()
433            print([msg
434                   for _, msg in [self.rpc_link.create_jsonrpc_v2(command.method, command.params, command.api_class)
435                                  for command in batch_list]])
436        else:
437            batch = self.rpc_link.create_batch()
438            for command in batch_list:
439                batch.add(command.method, command.params, command.api_class)
440            # invoke the batch
441            return batch.invoke()
442
443    def _prompt_virtual_tx_msg(self):
444        print("Transmitting virtually over tcp://{server}:{port}".format(server=self.server,
445                                                                         port=self.port))
446
447
448
449############################     client     #############################
450############################                #############################
451############################                #############################
452
453class STLClient(object):
454    """TRex Stateless client object - gives operations per TRex/user"""
455
456    # different modes for attaching traffic to ports
457    CORE_MASK_SPLIT = 1
458    CORE_MASK_PIN   = 2
459
460    def __init__(self,
461                 username = common.get_current_user(),
462                 server = "localhost",
463                 sync_port = 4501,
464                 async_port = 4500,
465                 verbose_level = LoggerApi.VERBOSE_QUIET,
466                 logger = None,
467                 virtual = False):
468        """
469        Configure the connection settings
470
471        :parameters:
472             username : string
473                the user name, for example imarom
474
475              server  : string
476                the server name or ip
477
478              sync_port : int
479                the RPC port
480
481              async_port : int
482                the ASYNC port
483
484        .. code-block:: python
485
486            # Example
487
488            # connect to local TRex server
489            c = STLClient()
490
491            # connect to remote server trex-remote-server
492            c = STLClient(server = "trex-remote-server" )
493
494            c = STLClient(server = "10.0.0.10" )
495
496            # verbose mode
497            c = STLClient(server = "10.0.0.10", verbose_level = LoggerApi.VERBOSE_HIGH )
498
499            # change user name
500            c = STLClient(username = "root",server = "10.0.0.10", verbose_level = LoggerApi.VERBOSE_HIGH )
501
502            c.connect()
503
504            c.disconnect()
505
506        """
507
508        self.username   = username
509
510        # init objects
511        self.ports = {}
512        self.server_version = {}
513        self.system_info = {}
514        self.session_id = random.getrandbits(32)
515        self.connected = False
516
517        # API classes
518        self.api_vers = [ {'type': 'core', 'major': 2, 'minor': 3 } ]
519        self.api_h = {'core': None}
520
521        # logger
522        self.logger = DefaultLogger() if not logger else logger
523
524        # initial verbose
525        self.logger.set_verbose(verbose_level)
526
527        # low level RPC layer
528        self.comm_link = CCommLink(server,
529                                   sync_port,
530                                   virtual,
531                                   self)
532
533        # async event handler manager
534        self.event_handler = EventsHandler(self)
535
536        # async subscriber level
537        self.async_client = CTRexAsyncClient(server,
538                                             async_port,
539                                             self)
540
541
542
543
544        # stats
545        self.connection_info = {"username":   username,
546                                "server":     server,
547                                "sync_port":  sync_port,
548                                "async_port": async_port,
549                                "virtual":    virtual}
550
551
552        self.global_stats = trex_stl_stats.CGlobalStats(self.connection_info,
553                                                        self.server_version,
554                                                        self.ports,
555                                                        self.event_handler)
556
557        self.flow_stats = trex_stl_stats.CRxStats(self.ports)
558
559        self.latency_stats = trex_stl_stats.CLatencyStats(self.ports)
560
561        self.util_stats = trex_stl_stats.CUtilStats(self)
562
563        self.stats_generator = trex_stl_stats.CTRexInfoGenerator(self.global_stats,
564                                                                 self.ports,
565                                                                 self.flow_stats,
566                                                                 self.latency_stats,
567                                                                 self.util_stats,
568                                                                 self.async_client.monitor)
569
570
571
572
573    ############# private functions - used by the class itself ###########
574
575    # some preprocessing for port argument
576    def __ports (self, port_id_list):
577
578        # none means all
579        if port_id_list == None:
580            return range(0, self.get_port_count())
581
582        # always list
583        if isinstance(port_id_list, int):
584            port_id_list = [port_id_list]
585
586        if not isinstance(port_id_list, list):
587             raise ValueError("Bad port id list: {0}".format(port_id_list))
588
589        for port_id in port_id_list:
590            if not isinstance(port_id, int) or (port_id < 0) or (port_id > self.get_port_count()):
591                raise ValueError("Bad port id {0}".format(port_id))
592
593        return port_id_list
594
595
596    # sync ports
597    def __sync_ports (self, port_id_list = None, force = False):
598        port_id_list = self.__ports(port_id_list)
599
600        rc = RC()
601
602        for port_id in port_id_list:
603            rc.add(self.ports[port_id].sync())
604
605        return rc
606
607    # acquire ports, if port_list is none - get all
608    def __acquire (self, port_id_list = None, force = False, sync_streams = True):
609        port_id_list = self.__ports(port_id_list)
610
611        rc = RC()
612
613        for port_id in port_id_list:
614            rc.add(self.ports[port_id].acquire(force, sync_streams))
615
616        return rc
617
618    # release ports
619    def __release (self, port_id_list = None):
620        port_id_list = self.__ports(port_id_list)
621
622        rc = RC()
623
624        for port_id in port_id_list:
625            rc.add(self.ports[port_id].release())
626
627        return rc
628
629
630    def __add_streams(self, stream_list, port_id_list = None):
631
632        port_id_list = self.__ports(port_id_list)
633
634        rc = RC()
635
636        for port_id in port_id_list:
637            rc.add(self.ports[port_id].add_streams(stream_list))
638
639        return rc
640
641
642
643    def __remove_streams(self, stream_id_list, port_id_list = None):
644
645        port_id_list = self.__ports(port_id_list)
646
647        rc = RC()
648
649        for port_id in port_id_list:
650            rc.add(self.ports[port_id].remove_streams(stream_id_list))
651
652        return rc
653
654
655
656    def __remove_all_streams(self, port_id_list = None):
657        port_id_list = self.__ports(port_id_list)
658
659        rc = RC()
660
661        for port_id in port_id_list:
662            rc.add(self.ports[port_id].remove_all_streams())
663
664        return rc
665
666
667    def __get_stream(self, stream_id, port_id, get_pkt = False):
668
669        return self.ports[port_id].get_stream(stream_id)
670
671
672    def __get_all_streams(self, port_id, get_pkt = False):
673
674        return self.ports[port_id].get_all_streams()
675
676
677    def __get_stream_id_list(self, port_id):
678
679        return self.ports[port_id].get_stream_id_list()
680
681
682    def __start (self,
683                 multiplier,
684                 duration,
685                 port_id_list,
686                 force,
687                 core_mask):
688
689        port_id_list = self.__ports(port_id_list)
690
691        rc = RC()
692
693
694        for port_id in port_id_list:
695            rc.add(self.ports[port_id].start(multiplier,
696                                             duration,
697                                             force,
698                                             core_mask[port_id]))
699
700        return rc
701
702
703    def __resume (self, port_id_list = None, force = False):
704
705        port_id_list = self.__ports(port_id_list)
706        rc = RC()
707
708        for port_id in port_id_list:
709            rc.add(self.ports[port_id].resume())
710
711        return rc
712
713    def __pause (self, port_id_list = None, force = False):
714
715        port_id_list = self.__ports(port_id_list)
716        rc = RC()
717
718        for port_id in port_id_list:
719            rc.add(self.ports[port_id].pause())
720
721        return rc
722
723
724    def __stop (self, port_id_list = None, force = False):
725
726        port_id_list = self.__ports(port_id_list)
727        rc = RC()
728
729        for port_id in port_id_list:
730            rc.add(self.ports[port_id].stop(force))
731
732        return rc
733
734
735    def __update (self, mult, port_id_list = None, force = False):
736
737        port_id_list = self.__ports(port_id_list)
738        rc = RC()
739
740        for port_id in port_id_list:
741            rc.add(self.ports[port_id].update(mult, force))
742
743        return rc
744
745
746    def __push_remote (self, pcap_filename, port_id_list, ipg_usec, speedup, count, duration, is_dual):
747
748        port_id_list = self.__ports(port_id_list)
749        rc = RC()
750
751        for port_id in port_id_list:
752
753            # for dual, provide the slave handler as well
754            slave_handler = self.ports[port_id ^ 0x1].handler if is_dual else ""
755
756            rc.add(self.ports[port_id].push_remote(pcap_filename,
757                                                   ipg_usec,
758                                                   speedup,
759                                                   count,
760                                                   duration,
761                                                   is_dual,
762                                                   slave_handler))
763
764        return rc
765
766
767    def __validate (self, port_id_list = None):
768        port_id_list = self.__ports(port_id_list)
769
770        rc = RC()
771
772        for port_id in port_id_list:
773            rc.add(self.ports[port_id].validate())
774
775        return rc
776
777
778    def __set_port_attr (self, port_id_list = None, attr_dict = None):
779
780        port_id_list = self.__ports(port_id_list)
781        rc = RC()
782
783        for port_id in port_id_list:
784            rc.add(self.ports[port_id].set_attr(attr_dict))
785
786        return rc
787
788
789
790    # connect to server
791    def __connect(self):
792
793        # first disconnect if already connected
794        if self.is_connected():
795            self.__disconnect()
796
797        # clear this flag
798        self.connected = False
799
800        # connect sync channel
801        self.logger.pre_cmd("Connecting to RPC server on {0}:{1}".format(self.connection_info['server'], self.connection_info['sync_port']))
802        rc = self.comm_link.connect()
803        self.logger.post_cmd(rc)
804
805        if not rc:
806            return rc
807
808
809        # API sync
810        rc = self._transmit("api_sync", params = {'api_vers': self.api_vers}, api_class = None)
811        if not rc:
812            return rc
813
814        # decode
815        for api in rc.data()['api_vers']:
816            self.api_h[ api['type'] ] = api['api_h']
817
818
819        # version
820        rc = self._transmit("get_version")
821        if not rc:
822            return rc
823
824        self.server_version = rc.data()
825        self.global_stats.server_version = rc.data()
826
827        # cache system info
828        rc = self._transmit("get_system_info")
829        if not rc:
830            return rc
831
832        self.system_info = rc.data()
833        self.global_stats.system_info = rc.data()
834
835        # cache supported commands
836        rc = self._transmit("get_supported_cmds")
837        if not rc:
838            return rc
839
840        self.supported_cmds = sorted(rc.data())
841
842        # create ports
843        for port_id in range(self.system_info["port_count"]):
844            info = self.system_info['ports'][port_id]
845
846            self.ports[port_id] = Port(port_id,
847                                       self.username,
848                                       self.comm_link,
849                                       self.session_id,
850                                       info)
851
852
853        # sync the ports
854        rc = self.__sync_ports()
855        if not rc:
856            return rc
857
858
859        # connect async channel
860        self.logger.pre_cmd("Connecting to publisher server on {0}:{1}".format(self.connection_info['server'], self.connection_info['async_port']))
861        rc = self.async_client.connect()
862        self.logger.post_cmd(rc)
863
864        if not rc:
865            return rc
866
867        self.connected = True
868
869        return RC_OK()
870
871
872    # disconenct from server
873    def __disconnect(self, release_ports = True):
874        # release any previous acquired ports
875        if self.is_connected() and release_ports:
876            self.__release(self.get_acquired_ports())
877
878        self.comm_link.disconnect()
879        self.async_client.disconnect()
880
881        self.connected = False
882
883        return RC_OK()
884
885
886    # clear stats
887    def __clear_stats(self, port_id_list, clear_global, clear_flow_stats, clear_latency_stats):
888
889        # we must be sync with the server
890        self.async_client.barrier()
891
892        for port_id in port_id_list:
893            self.ports[port_id].clear_stats()
894
895        if clear_global:
896            self.global_stats.clear_stats()
897
898        if clear_flow_stats:
899            self.flow_stats.clear_stats()
900
901        if clear_latency_stats:
902            self.latency_stats.clear_stats()
903
904        self.logger.log_cmd("Clearing stats on port(s) {0}:".format(port_id_list))
905
906        return RC
907
908
909    # get stats
910    def __get_stats (self, port_id_list):
911        stats = {}
912
913        stats['global'] = self.global_stats.get_stats()
914
915        total = {}
916        for port_id in port_id_list:
917            port_stats = self.ports[port_id].get_stats()
918            stats[port_id] = port_stats
919
920            for k, v in port_stats.items():
921                if not k in total:
922                    total[k] = v
923                else:
924                    total[k] += v
925
926        stats['total'] = total
927
928        stats['flow_stats'] = self.flow_stats.get_stats()
929        stats['latency'] = self.latency_stats.get_stats()
930
931        return stats
932
933
934    def __decode_core_mask (self, ports, core_mask):
935
936        # predefined modes
937        if isinstance(core_mask, int):
938            if core_mask not in [self.CORE_MASK_PIN, self.CORE_MASK_SPLIT]:
939                raise STLError("'core_mask' can be either CORE_MASK_PIN, CORE_MASK_SPLIT or a list of masks")
940
941            decoded_mask = {}
942            for port in ports:
943                # a pin mode was requested and we have
944                # the second port from the group in the start list
945                if (core_mask == self.CORE_MASK_PIN) and ( (port ^ 0x1) in ports ):
946                    decoded_mask[port] = 0x55555555 if( port % 2) == 0 else 0xAAAAAAAA
947                else:
948                    decoded_mask[port] = None
949
950            return decoded_mask
951
952        # list of masks
953        elif isinstance(core_mask, list):
954            if len(ports) != len(core_mask):
955                raise STLError("'core_mask' list must be the same length as 'ports' list")
956
957            decoded_mask = {}
958            for i, port in enumerate(ports):
959                decoded_mask[port] = core_mask[i]
960
961            return decoded_mask
962
963
964
965    ############ functions used by other classes but not users ##############
966
967    def _validate_port_list (self, port_id_list):
968        # listfiy single int
969        if isinstance(port_id_list, int):
970            port_id_list = [port_id_list]
971
972        # should be a list
973        if not isinstance(port_id_list, list):
974            raise STLTypeError('port_id_list', type(port_id_list), list)
975
976        if not port_id_list:
977            raise STLError('No ports provided')
978
979        valid_ports = self.get_all_ports()
980        for port_id in port_id_list:
981            if not port_id in valid_ports:
982                raise STLError("Port ID '{0}' is not a valid port ID - valid values: {1}".format(port_id, valid_ports))
983
984        return port_id_list
985
986
987    # transmit request on the RPC link
988    def _transmit(self, method_name, params = None, api_class = 'core'):
989        return self.comm_link.transmit(method_name, params, api_class)
990
991    # transmit batch request on the RPC link
992    def _transmit_batch(self, batch_list):
993        return self.comm_link.transmit_batch(batch_list)
994
995    # stats
996    def _get_formatted_stats(self, port_id_list, stats_mask = trex_stl_stats.COMPACT):
997
998        stats_opts = common.list_intersect(trex_stl_stats.ALL_STATS_OPTS, stats_mask)
999
1000        stats_obj = OrderedDict()
1001        for stats_type in stats_opts:
1002            stats_obj.update(self.stats_generator.generate_single_statistic(port_id_list, stats_type))
1003
1004        return stats_obj
1005
1006    def _get_streams(self, port_id_list, streams_mask=set()):
1007
1008        streams_obj = self.stats_generator.generate_streams_info(port_id_list, streams_mask)
1009
1010        return streams_obj
1011
1012
1013    def _invalidate_stats (self, port_id_list):
1014        for port_id in port_id_list:
1015            self.ports[port_id].invalidate_stats()
1016
1017        self.global_stats.invalidate()
1018        self.flow_stats.invalidate()
1019
1020        return RC_OK()
1021
1022
1023    # remove all RX filters in a safe manner
1024    def _remove_rx_filters (self, ports, rx_delay_ms):
1025
1026        # get the enabled RX ports
1027        rx_ports = [port_id for port_id in ports if self.ports[port_id].has_rx_enabled()]
1028
1029        if not rx_ports:
1030            return RC_OK()
1031
1032        # block while any RX configured port has not yet have it's delay expired
1033        while any([not self.ports[port_id].has_rx_delay_expired(rx_delay_ms) for port_id in rx_ports]):
1034            time.sleep(0.01)
1035
1036        # remove RX filters
1037        rc = RC()
1038        for port_id in rx_ports:
1039            rc.add(self.ports[port_id].remove_rx_filters())
1040
1041        return rc
1042
1043
1044    #################################
1045    # ------ private methods ------ #
1046    @staticmethod
1047    def __get_mask_keys(ok_values={True}, **kwargs):
1048        masked_keys = set()
1049        for key, val in kwargs.items():
1050            if val in ok_values:
1051                masked_keys.add(key)
1052        return masked_keys
1053
1054    @staticmethod
1055    def __filter_namespace_args(namespace, ok_values):
1056        return {k: v for k, v in namespace.__dict__.items() if k in ok_values}
1057
1058
1059    # API decorator - double wrap because of argument
1060    def __api_check(connected = True):
1061
1062        def wrap (f):
1063            @wraps(f)
1064            def wrap2(*args, **kwargs):
1065                client = args[0]
1066
1067                func_name = f.__name__
1068
1069                # check connection
1070                if connected and not client.is_connected():
1071                    raise STLStateError(func_name, 'disconnected')
1072
1073                try:
1074                    ret = f(*args, **kwargs)
1075                except KeyboardInterrupt as e:
1076                    raise STLError("Interrupted by a keyboard signal (probably ctrl + c)")
1077
1078                return ret
1079            return wrap2
1080
1081        return wrap
1082
1083
1084
1085    ############################     API     #############################
1086    ############################             #############################
1087    ############################             #############################
1088    def __enter__ (self):
1089        self.connect()
1090        self.acquire(force = True)
1091        self.reset()
1092        return self
1093
1094    def __exit__ (self, type, value, traceback):
1095        if self.get_active_ports():
1096            self.stop(self.get_active_ports())
1097        self.disconnect()
1098
1099    ############################   Getters   #############################
1100    ############################             #############################
1101    ############################             #############################
1102
1103
1104    # return verbose level of the logger
1105    def get_verbose (self):
1106        """
1107        Get the verbose mode
1108
1109        :parameters:
1110          none
1111
1112        :return:
1113            Get the verbose mode as Bool
1114
1115        :raises:
1116          None
1117
1118        """
1119        return self.logger.get_verbose()
1120
1121    # is the client on read only mode ?
1122    def is_all_ports_acquired (self):
1123        """
1124         is_all_ports_acquired
1125
1126        :parameters:
1127          None
1128
1129        :return:
1130            Returns True if all ports are acquired
1131
1132        :raises:
1133          None
1134
1135        """
1136
1137        return (self.get_all_ports() == self.get_acquired_ports())
1138
1139
1140    # is the client connected ?
1141    def is_connected (self):
1142        """
1143
1144        :parameters:
1145          None
1146
1147        :return:
1148            is_connected
1149
1150        :raises:
1151          None
1152
1153        """
1154
1155        return self.connected and self.comm_link.is_connected
1156
1157
1158    # get connection info
1159    def get_connection_info (self):
1160        """
1161
1162        :parameters:
1163          None
1164
1165        :return:
1166            Connection dict
1167
1168        :raises:
1169          None
1170
1171        """
1172
1173        return self.connection_info
1174
1175
1176    # get supported commands by the server
1177    def get_server_supported_cmds(self):
1178        """
1179
1180        :parameters:
1181          None
1182
1183        :return:
1184            Connection dict
1185
1186        :raises:
1187          None
1188
1189        """
1190
1191        return self.supported_cmds
1192
1193    # get server version
1194    def get_server_version(self):
1195        """
1196
1197        :parameters:
1198          None
1199
1200        :return:
1201            Connection dict
1202
1203        :raises:
1204          None
1205
1206        """
1207
1208        return self.server_version
1209
1210    # get server system info
1211    def get_server_system_info(self):
1212        """
1213
1214        :parameters:
1215          None
1216
1217        :return:
1218            Connection dict
1219
1220        :raises:
1221          None
1222
1223        """
1224
1225        return self.system_info
1226
1227    # get port count
1228    def get_port_count(self):
1229        """
1230
1231        :parameters:
1232          None
1233
1234        :return:
1235            Connection dict
1236
1237        :raises:
1238          None
1239
1240        """
1241
1242        return len(self.ports)
1243
1244
1245    # returns the port object
1246    def get_port (self, port_id):
1247        port = self.ports.get(port_id, None)
1248        if (port != None):
1249            return port
1250        else:
1251            raise STLArgumentError('port id', port_id, valid_values = self.get_all_ports())
1252
1253
1254    # get all ports as IDs
1255    def get_all_ports (self):
1256        """
1257
1258        :parameters:
1259          None
1260
1261        :return:
1262            Connection dict
1263
1264        :raises:
1265          None
1266
1267        """
1268
1269        return list(self.ports)
1270
1271    # get all acquired ports
1272    def get_acquired_ports(self):
1273        return [port_id
1274                for port_id, port_obj in self.ports.items()
1275                if port_obj.is_acquired()]
1276
1277    # get all active ports (TX or pause)
1278    def get_active_ports(self, owned = True):
1279        if owned:
1280            return [port_id
1281                    for port_id, port_obj in self.ports.items()
1282                    if port_obj.is_active() and port_obj.is_acquired()]
1283        else:
1284            return [port_id
1285                    for port_id, port_obj in self.ports.items()
1286                    if port_obj.is_active()]
1287
1288
1289    # get paused ports
1290    def get_paused_ports (self, owned = True):
1291        if owned:
1292            return [port_id
1293                    for port_id, port_obj in self.ports.items()
1294                    if port_obj.is_paused() and port_obj.is_acquired()]
1295        else:
1296            return [port_id
1297                    for port_id, port_obj in self.ports.items()
1298                    if port_obj.is_paused()]
1299
1300
1301    # get all TX ports
1302    def get_transmitting_ports (self, owned = True):
1303        if owned:
1304            return [port_id
1305                    for port_id, port_obj in self.ports.items()
1306                    if port_obj.is_transmitting() and port_obj.is_acquired()]
1307        else:
1308            return [port_id
1309                    for port_id, port_obj in self.ports.items()
1310                    if port_obj.is_transmitting()]
1311
1312
1313    # get stats
1314    def get_stats (self, ports = None, sync_now = True):
1315        """
1316        Return dictionary containing statistics information gathered from the server.
1317
1318        :parameters:
1319
1320          ports - List of ports to retreive stats on.
1321                  If None, assume the request is for all acquired ports.
1322
1323          sync_now - Boolean - If true, create a call to the server to get latest stats, and wait for result to arrive. Otherwise, return last stats saved in client cache.
1324                            Downside of putting True is a slight delay (few 10th msecs) in getting the result. For practical uses, value should be True.
1325        :return:
1326            Statistics dictionary of dictionaries with the following format:
1327
1328            ===============================  ===============
1329            key                               Meaning
1330            ===============================  ===============
1331            :ref:`numbers (0,1,..<total>`    Statistcs per port number
1332            :ref:`total <total>`             Sum of port statistics
1333            :ref:`flow_stats <flow_stats>`   Per flow statistics
1334            :ref:`global <global>`           Global statistics
1335            :ref:`latency <latency>`         Per flow statistics regarding flow latency
1336            ===============================  ===============
1337
1338            Below is description of each of the inner dictionaries.
1339
1340            .. _total:
1341
1342            **total** and per port statistics contain dictionary with following format.
1343
1344            Most of the bytes counters (unless specified otherwise) are in L2 layer, including the Ethernet FCS. e.g. minimum packet size is 64 bytes
1345
1346            ===============================  ===============
1347            key                               Meaning
1348            ===============================  ===============
1349            ibytes                           Number of input bytes
1350            ierrors                          Number of input errors
1351            ipackets                         Number of input packets
1352            obytes                           Number of output bytes
1353            oerrors                          Number of output errors
1354            opackets                         Number of output packets
1355            rx_bps                           Receive bytes per second rate (L2 layer)
1356            rx_pps                           Receive packet per second rate
1357            tx_bps                           Transmit bytes per second rate (L2 layer)
1358            tx_pps                           Transmit packet per second rate
1359            ===============================  ===============
1360
1361            .. _flow_stats:
1362
1363            **flow_stats** contains :ref:`global dictionary <flow_stats_global>`, and dictionaries per packet group id (pg id). See structures below.
1364
1365            **per pg_id flow stat** dictionaries have following structure:
1366
1367            =================   ===============
1368            key                 Meaning
1369            =================   ===============
1370            rx_bps              Received bytes per second rate
1371            rx_bps_l1           Received bytes per second rate, including layer one
1372            rx_bytes            Total number of received bytes
1373            rx_pkts             Total number of received packets
1374            rx_pps              Received packets per second
1375            tx_bps              Transmit bytes per second rate
1376            tx_bps_l1           Transmit bytes per second rate, including layer one
1377            tx_bytes            Total number of sent bytes
1378            tx_pkts             Total number of sent packets
1379            tx_pps              Transmit packets per second rate
1380            =================   ===============
1381
1382            .. _flow_stats_global:
1383
1384            **global flow stats** dictionary has the following structure:
1385
1386            =================   ===============
1387            key                 Meaning
1388            =================   ===============
1389            rx_err              Number of flow statistics packets received that we could not associate to any pg_id. This can happen if latency on the used setup is large. See :ref:`wait_on_traffic <wait_on_traffic>` rx_delay_ms parameter for details.
1390            tx_err              Number of flow statistics packets transmitted that we could not associate to any pg_id. This is never expected. If you see this different than 0, please report.
1391            =================   ===============
1392
1393            .. _global:
1394
1395            **global**
1396
1397            =================   ===============
1398            key                 Meaning
1399            =================   ===============
1400            bw_per_core         Estimated byte rate Trex can support per core. This is calculated by extrapolation of current rate and load on transmitting cores.
1401            cpu_util            Estimate of the average utilization percentage of the transimitting cores
1402            queue_full          Total number of packets transmitted while the NIC TX queue was full. The packets will be transmitted, eventually, but will create high CPU%due to polling the queue.  This usually indicates that the rate we trying to transmit is too high for this port.
1403            rx_cpu_util         Estimate of the utilization percentage of the core handling RX traffic. Too high value of this CPU utilization could cause drop of latency streams.
1404            rx_drop_bps         Received bytes per second drop rate
1405            rx_bps              Received bytes per second rate
1406            rx_pps              Received packets per second rate
1407            tx_bps              Transmit bytes per second rate
1408            tx_pps              Transmit packets per second rate
1409            =================   ===============
1410
1411            .. _latency:
1412
1413            **latency** contains :ref:`global dictionary <lat_stats_global>`, and dictionaries per packet group id (pg id). Each one with the following structure.
1414
1415            **per pg_id latency stat** dictionaries have following structure:
1416
1417            ===========================          ===============
1418            key                                  Meaning
1419            ===========================          ===============
1420            :ref:`err_cntrs<err-cntrs>`          Counters describing errors that occured with this pg id
1421            :ref:`latency<lat_inner>`            Information regarding packet latency
1422            ===========================          ===============
1423
1424            Following are the inner dictionaries of latency
1425
1426            .. _err-cntrs:
1427
1428            **err-cntrs**
1429
1430            =================   ===============
1431            key                 Meaning (see better explanation below the table)
1432            =================   ===============
1433            dropped             How many packets were dropped (estimation)
1434            dup                 How many packets were duplicated.
1435            out_of_order        How many packets we received out of order.
1436            seq_too_high        How many events of packet with sequence number too high we saw.
1437            seq_too_low         How many events of packet with sequence number too low we saw.
1438            =================   ===============
1439
1440            For calculating packet error events, we add sequence number to each packet's payload. We decide what went wrong only according to sequence number
1441            of last packet received and that of the previous packet. 'seq_too_low' and 'seq_too_high' count events we see. 'dup', 'out_of_order' and 'dropped'
1442            are heuristics we apply to try and understand what happened. They will be accurate in common error scenarios.
1443            We describe few scenarios below to help understand this.
1444
1445            Scenario 1: Received packet with seq num 10, and another one with seq num 10. We increment 'dup' and 'seq_too_low' by 1.
1446
1447            Scenario 2: Received pacekt with seq num 10 and then packet with seq num 15. We assume 4 packets were dropped, and increment 'dropped' by 4, and 'seq_too_high' by 1.
1448            We expect next packet to arrive with sequence number 16.
1449
1450            Scenario 2 continue: Received packet with seq num 11. We increment 'seq_too_low' by 1. We increment 'out_of_order' by 1. We *decrement* 'dropped' by 1.
1451            (We assume here that one of the packets we considered as dropped before, actually arrived out of order).
1452
1453
1454            .. _lat_inner:
1455
1456            **latency**
1457
1458            =================   ===============
1459            key                 Meaning
1460            =================   ===============
1461            average             Average latency over the stream lifetime (usec).Low pass filter is applied to the last window average.It is computed each sampling period by following formula: <average> = <prev average>/2 + <last sampling period average>/2
1462            histogram           Dictionary describing logarithmic distribution histogram of packet latencies. Keys in the dictionary represent range of latencies (in usec). Values are the total number of packets received in this latency range. For example, an entry {100:13} would mean that we saw 13 packets with latency in the range between 100 and 200 usec.
1463            jitter              Jitter of latency samples, computed as described in :rfc:`3550#appendix-A.8`
1464            last_max            Maximum latency measured between last two data reads from server (0.5 sec window).
1465            total_max           Maximum latency measured over the stream lifetime (in usec).
1466            total_min           Minimum latency measured over the stream lifetime (in usec).
1467            =================   ===============
1468
1469            .. _lat_stats_global:
1470
1471            **global latency stats** dictionary has the following structure:
1472
1473            =================   ===============
1474            key                 Meaning
1475            =================   ===============
1476            old_flow            Number of latency statistics packets received that we could not associate to any pg_id. This can happen if latency on the used setup is large. See :ref:`wait_on_traffic <wait_on_traffic>` rx_delay_ms parameter for details.
1477            bad_hdr             Number of latency packets received with bad latency data. This can happen becuase of garbage packets in the network, or if the DUT causes packet corruption.
1478            =================   ===============
1479
1480        :raises:
1481          None
1482
1483        """
1484        # by default use all acquired ports
1485        ports = ports if ports is not None else self.get_acquired_ports()
1486        ports = self._validate_port_list(ports)
1487
1488        # check async barrier
1489        if not type(sync_now) is bool:
1490            raise STLArgumentError('sync_now', sync_now)
1491
1492
1493        # if the user requested a barrier - use it
1494        if sync_now:
1495            rc = self.async_client.barrier()
1496            if not rc:
1497                raise STLError(rc)
1498
1499        return self.__get_stats(ports)
1500
1501
1502    def get_events (self, ev_type_filter = None):
1503        """
1504        returns all the logged events
1505
1506        :parameters:
1507          ev_type_filter - 'info', 'warning' or a list of those
1508                           default: no filter
1509
1510        :return:
1511            logged events
1512
1513        :raises:
1514          None
1515
1516        """
1517        return self.event_handler.get_events(ev_type_filter)
1518
1519
1520    def get_warnings (self):
1521        """
1522        returns all the warnings logged events
1523
1524        :parameters:
1525          None
1526
1527        :return:
1528            warning logged events
1529
1530        :raises:
1531          None
1532
1533        """
1534        return self.get_events(ev_type_filter = 'warning')
1535
1536
1537    def get_info (self):
1538        """
1539        returns all the info logged events
1540
1541        :parameters:
1542          None
1543
1544        :return:
1545            warning logged events
1546
1547        :raises:
1548          None
1549
1550        """
1551        return self.get_events(ev_type_filter = 'info')
1552
1553
1554    # get port(s) info as a list of dicts
1555    @__api_check(True)
1556    def get_port_info (self, ports = None):
1557
1558        ports = ports if ports is not None else self.get_all_ports()
1559        ports = self._validate_port_list(ports)
1560
1561        return [self.ports[port_id].get_info() for port_id in ports]
1562
1563
1564    ############################   Commands   #############################
1565    ############################              #############################
1566    ############################              #############################
1567
1568
1569    def set_verbose (self, level):
1570        """
1571            Sets verbose level
1572
1573            :parameters:
1574                level : str
1575                    "high"
1576                    "low"
1577                    "normal"
1578
1579            :raises:
1580                None
1581
1582        """
1583        modes = {'low' : LoggerApi.VERBOSE_QUIET, 'normal': LoggerApi.VERBOSE_REGULAR, 'high': LoggerApi.VERBOSE_HIGH}
1584
1585        if not level in modes.keys():
1586            raise STLArgumentError('level', level)
1587
1588        self.logger.set_verbose(modes[level])
1589
1590
1591    @__api_check(False)
1592    def connect (self):
1593        """
1594
1595            Connects to the TRex server
1596
1597            :parameters:
1598                None
1599
1600            :raises:
1601                + :exc:`STLError`
1602
1603        """
1604
1605        rc = self.__connect()
1606        if not rc:
1607            raise STLError(rc)
1608
1609
1610    @__api_check(False)
1611    def disconnect (self, stop_traffic = True, release_ports = True):
1612        """
1613            Disconnects from the server
1614
1615            :parameters:
1616                stop_traffic : bool
1617                    Attempts to stop traffic before disconnecting.
1618                release_ports : bool
1619                    Attempts to release all the acquired ports.
1620
1621        """
1622
1623        # try to stop ports but do nothing if not possible
1624        if stop_traffic:
1625            try:
1626                self.stop()
1627            except STLError:
1628                pass
1629
1630
1631        self.logger.pre_cmd("Disconnecting from server at '{0}':'{1}'".format(self.connection_info['server'],
1632                                                                              self.connection_info['sync_port']))
1633        rc = self.__disconnect(release_ports)
1634        self.logger.post_cmd(rc)
1635
1636
1637
1638    @__api_check(True)
1639    def acquire (self, ports = None, force = False, sync_streams = True):
1640        """
1641            Acquires ports for executing commands
1642
1643            :parameters:
1644                ports : list
1645                    Ports on which to execute the command
1646
1647                force : bool
1648                    Force acquire the ports.
1649
1650                sync_streams: bool
1651                    sync with the server about the configured streams
1652
1653            :raises:
1654                + :exc:`STLError`
1655
1656        """
1657
1658        # by default use all ports
1659        ports = ports if ports is not None else self.get_all_ports()
1660        ports = self._validate_port_list(ports)
1661
1662        if force:
1663            self.logger.pre_cmd("Force acquiring ports {0}:".format(ports))
1664        else:
1665            self.logger.pre_cmd("Acquiring ports {0}:".format(ports))
1666
1667        rc = self.__acquire(ports, force, sync_streams)
1668
1669        self.logger.post_cmd(rc)
1670
1671        if not rc:
1672            # cleanup
1673            self.__release(ports)
1674            raise STLError(rc)
1675
1676
1677    @__api_check(True)
1678    def release (self, ports = None):
1679        """
1680            Release ports
1681
1682            :parameters:
1683                ports : list
1684                    Ports on which to execute the command
1685
1686            :raises:
1687                + :exc:`STLError`
1688
1689        """
1690
1691        ports = ports if ports is not None else self.get_acquired_ports()
1692        ports = self._validate_port_list(ports)
1693
1694        self.logger.pre_cmd("Releasing ports {0}:".format(ports))
1695        rc = self.__release(ports)
1696        self.logger.post_cmd(rc)
1697
1698        if not rc:
1699            raise STLError(rc)
1700
1701    @__api_check(True)
1702    def ping(self):
1703        """
1704            Pings the server
1705
1706            :parameters:
1707                None
1708
1709
1710            :raises:
1711                + :exc:`STLError`
1712
1713        """
1714
1715        self.logger.pre_cmd("Pinging the server on '{0}' port '{1}': ".format(self.connection_info['server'],
1716                                                                              self.connection_info['sync_port']))
1717        rc = self._transmit("ping", api_class = None)
1718
1719        self.logger.post_cmd(rc)
1720
1721        if not rc:
1722            raise STLError(rc)
1723
1724    @__api_check(True)
1725    def server_shutdown (self, force = False):
1726        """
1727            Sends the server a request for total shutdown
1728
1729            :parameters:
1730                force - shutdown server even if some ports are owned by another
1731                        user
1732
1733            :raises:
1734                + :exc:`STLError`
1735
1736        """
1737
1738        self.logger.pre_cmd("Sending shutdown request for the server")
1739
1740        rc = self._transmit("shutdown", params = {'force': force, 'user': self.username})
1741
1742        self.logger.post_cmd(rc)
1743
1744        if not rc:
1745            raise STLError(rc)
1746
1747
1748    @__api_check(True)
1749    def get_active_pgids(self):
1750        """
1751            Get active group IDs
1752
1753            :parameters:
1754                None
1755
1756
1757            :raises:
1758                + :exc:`STLError`
1759
1760        """
1761
1762        self.logger.pre_cmd( "Getting active packet group ids")
1763
1764        rc = self._transmit("get_active_pgids")
1765
1766        self.logger.post_cmd(rc)
1767
1768        if not rc:
1769            raise STLError(rc)
1770
1771    @__api_check(True)
1772    def get_util_stats(self):
1773        """
1774            Get utilization stats:
1775            History of TRex CPU utilization per thread (list of lists)
1776            MBUFs memory consumption per CPU socket.
1777
1778            :parameters:
1779                None
1780
1781            :raises:
1782                + :exc:`STLError`
1783
1784        """
1785        self.logger.pre_cmd('Getting Utilization stats')
1786        return self.util_stats.get_stats()
1787
1788
1789    @__api_check(True)
1790    def reset(self, ports = None):
1791        """
1792            Force acquire ports, stop the traffic, remove all streams and clear stats
1793
1794            :parameters:
1795                ports : list
1796                   Ports on which to execute the command
1797
1798
1799            :raises:
1800                + :exc:`STLError`
1801
1802        """
1803
1804
1805        ports = ports if ports is not None else self.get_all_ports()
1806        ports = self._validate_port_list(ports)
1807
1808        # force take the port and ignore any streams on it
1809        self.acquire(ports, force = True, sync_streams = False)
1810        self.stop(ports, rx_delay_ms = 0)
1811        self.remove_all_streams(ports)
1812        self.clear_stats(ports)
1813
1814
1815    @__api_check(True)
1816    def remove_all_streams (self, ports = None):
1817        """
1818            remove all streams from port(s)
1819
1820            :parameters:
1821                ports : list
1822                    Ports on which to execute the command
1823
1824
1825            :raises:
1826                + :exc:`STLError`
1827
1828        """
1829
1830
1831        ports = ports if ports is not None else self.get_acquired_ports()
1832        ports = self._validate_port_list(ports)
1833
1834        self.logger.pre_cmd("Removing all streams from port(s) {0}:".format(ports))
1835        rc = self.__remove_all_streams(ports)
1836        self.logger.post_cmd(rc)
1837
1838        if not rc:
1839            raise STLError(rc)
1840
1841
1842    @__api_check(True)
1843    def add_streams (self, streams, ports = None):
1844        """
1845            Add a list of streams to port(s)
1846
1847            :parameters:
1848                ports : list
1849                    Ports on which to execute the command
1850                streams: list
1851                    Streams to attach (or profile)
1852
1853            :returns:
1854                List of stream IDs in order of the stream list
1855
1856            :raises:
1857                + :exc:`STLError`
1858
1859        """
1860
1861
1862        ports = ports if ports is not None else self.get_acquired_ports()
1863        ports = self._validate_port_list(ports)
1864
1865        if isinstance(streams, STLProfile):
1866            streams = streams.get_streams()
1867
1868        # transform single stream
1869        if not isinstance(streams, list):
1870            streams = [streams]
1871
1872        # check streams
1873        if not all([isinstance(stream, STLStream) for stream in streams]):
1874            raise STLArgumentError('streams', streams)
1875
1876        self.logger.pre_cmd("Attaching {0} streams to port(s) {1}:".format(len(streams), ports))
1877        rc = self.__add_streams(streams, ports)
1878        self.logger.post_cmd(rc)
1879
1880        if not rc:
1881            raise STLError(rc)
1882
1883        # return the stream IDs
1884        return rc.data()
1885
1886    @__api_check(True)
1887    def add_profile(self, filename, ports = None, **kwargs):
1888        """ |  Add streams from profile by its type. Supported types are:
1889            |  .py
1890            |  .yaml
1891            |  .pcap file that converted to profile automatically
1892
1893            :parameters:
1894                filename : string
1895                    filename (with path) of the profile
1896                ports : list
1897                    list of ports to add the profile (default: all acquired)
1898                kwargs : dict
1899                    forward those key-value pairs to the profile (tunables)
1900
1901            :returns:
1902                List of stream IDs in order of the stream list
1903
1904            :raises:
1905                + :exc:`STLError`
1906
1907        """
1908
1909        validate_type('filename', filename, basestring)
1910        profile = STLProfile.load(filename, **kwargs)
1911        return self.add_streams(profile.get_streams(), ports)
1912
1913
1914    @__api_check(True)
1915    def remove_streams (self, stream_id_list, ports = None):
1916        """
1917            Remove a list of streams from ports
1918
1919            :parameters:
1920                ports : list
1921                    Ports on which to execute the command
1922                stream_id_list: list
1923                    Stream id list to remove
1924
1925
1926            :raises:
1927                + :exc:`STLError`
1928
1929        """
1930
1931
1932        ports = ports if ports is not None else self.get_acquired_ports()
1933        ports = self._validate_port_list(ports)
1934
1935        # transform single stream
1936        if not isinstance(stream_id_list, list):
1937            stream_id_list = [stream_id_list]
1938
1939        # check streams
1940        for stream_id in stream_id_list:
1941            validate_type('stream_id', stream_id, int)
1942
1943        # remove streams
1944        self.logger.pre_cmd("Removing {0} streams from port(s) {1}:".format(len(stream_id_list), ports))
1945        rc = self.__remove_streams(stream_id_list, ports)
1946        self.logger.post_cmd(rc)
1947
1948        if not rc:
1949            raise STLError(rc)
1950
1951
1952
1953    @__api_check(True)
1954    def start (self,
1955               ports = None,
1956               mult = "1",
1957               force = False,
1958               duration = -1,
1959               total = False,
1960               core_mask = CORE_MASK_SPLIT):
1961        """
1962            Start traffic on port(s)
1963
1964            :parameters:
1965                ports : list
1966                    Ports on which to execute the command
1967
1968                mult : str
1969                    Multiplier in a form of pps, bps, or line util in %
1970                    Examples: "5kpps", "10gbps", "85%", "32mbps"
1971
1972                force : bool
1973                    If the ports are not in stopped mode or do not have sufficient bandwidth for the traffic, determines whether to stop the current traffic and force start.
1974                    True: Force start
1975                    False: Do not force start
1976
1977                duration : int
1978                    Limit the run time (seconds)
1979                    -1 = unlimited
1980
1981                total : bool
1982                    Determines whether to divide the configured bandwidth among the ports, or to duplicate the bandwidth for each port.
1983                    True: Divide bandwidth among the ports
1984                    False: Duplicate
1985
1986                core_mask: CORE_MASK_SPLIT, CORE_MASK_PIN or a list of masks (one per port)
1987                    Determine the allocation of cores per port
1988                    In CORE_MASK_SPLIT all the traffic will be divided equally between all the cores
1989                    associated with each port
1990                    In CORE_MASK_PIN, for each dual ports (a group that shares the same cores)
1991                    the cores will be divided half pinned for each port
1992
1993            :raises:
1994                + :exc:`STLError`
1995
1996        """
1997
1998        ports = ports if ports is not None else self.get_acquired_ports()
1999        ports = self._validate_port_list(ports)
2000
2001        validate_type('mult', mult, basestring)
2002        validate_type('force', force, bool)
2003        validate_type('duration', duration, (int, float))
2004        validate_type('total', total, bool)
2005        validate_type('core_mask', core_mask, (int, list))
2006
2007        #########################
2008        # decode core mask argument
2009        decoded_mask = self.__decode_core_mask(ports, core_mask)
2010        #######################
2011
2012        # verify multiplier
2013        mult_obj = parsing_opts.decode_multiplier(mult,
2014                                                  allow_update = False,
2015                                                  divide_count = len(ports) if total else 1)
2016        if not mult_obj:
2017            raise STLArgumentError('mult', mult)
2018
2019
2020        # verify ports are stopped or force stop them
2021        active_ports = list(set(self.get_active_ports()).intersection(ports))
2022        if active_ports:
2023            if not force:
2024                raise STLError("Port(s) {0} are active - please stop them or specify 'force'".format(active_ports))
2025            else:
2026                rc = self.stop(active_ports)
2027                if not rc:
2028                    raise STLError(rc)
2029
2030
2031        # start traffic
2032        self.logger.pre_cmd("Starting traffic on port(s) {0}:".format(ports))
2033        rc = self.__start(mult_obj, duration, ports, force, decoded_mask)
2034        self.logger.post_cmd(rc)
2035
2036        if not rc:
2037            raise STLError(rc)
2038
2039
2040    @__api_check(True)
2041    def stop (self, ports = None, rx_delay_ms = 10):
2042        """
2043            Stop port(s)
2044
2045            :parameters:
2046                ports : list
2047                    Ports on which to execute the command
2048
2049                rx_delay_ms : int
2050                    time to wait until RX filters are removed
2051                    this value should reflect the time it takes
2052                    packets which were transmitted to arrive
2053                    to the destination.
2054                    after this time the RX filters will be removed
2055
2056            :raises:
2057                + :exc:`STLError`
2058
2059        """
2060
2061        if ports is None:
2062            ports = self.get_active_ports()
2063            if not ports:
2064                return
2065
2066        ports = self._validate_port_list(ports)
2067
2068        self.logger.pre_cmd("Stopping traffic on port(s) {0}:".format(ports))
2069        rc = self.__stop(ports)
2070        self.logger.post_cmd(rc)
2071
2072        if not rc:
2073            raise STLError(rc)
2074
2075        # remove any RX filters
2076        rc = self._remove_rx_filters(ports, rx_delay_ms = rx_delay_ms)
2077        if not rc:
2078            raise STLError(rc)
2079
2080
2081    @__api_check(True)
2082    def update (self, ports = None, mult = "1", total = False, force = False):
2083        """
2084            Update traffic on port(s)
2085
2086            :parameters:
2087                ports : list
2088                    Ports on which to execute the command
2089
2090                mult : str
2091                    Multiplier in a form of pps, bps, or line util in %
2092                    Can also specify +/-
2093                    Examples: "5kpps+", "10gbps-", "85%", "32mbps", "20%+"
2094
2095                force : bool
2096                    If the ports are not in stopped mode or do not have sufficient bandwidth for the traffic, determines whether to stop the current traffic and force start.
2097                    True: Force start
2098                    False: Do not force start
2099
2100                total : bool
2101                    Determines whether to divide the configured bandwidth among the ports, or to duplicate the bandwidth for each port.
2102                    True: Divide bandwidth among the ports
2103                    False: Duplicate
2104
2105
2106            :raises:
2107                + :exc:`STLError`
2108
2109        """
2110
2111
2112        ports = ports if ports is not None else self.get_active_ports()
2113        ports = self._validate_port_list(ports)
2114
2115        validate_type('mult', mult, basestring)
2116        validate_type('force', force, bool)
2117        validate_type('total', total, bool)
2118
2119        # verify multiplier
2120        mult_obj = parsing_opts.decode_multiplier(mult,
2121                                                  allow_update = True,
2122                                                  divide_count = len(ports) if total else 1)
2123        if not mult_obj:
2124            raise STLArgumentError('mult', mult)
2125
2126
2127        # call low level functions
2128        self.logger.pre_cmd("Updating traffic on port(s) {0}:".format(ports))
2129        rc = self.__update(mult_obj, ports, force)
2130        self.logger.post_cmd(rc)
2131
2132        if not rc:
2133            raise STLError(rc)
2134
2135
2136
2137    @__api_check(True)
2138    def pause (self, ports = None):
2139        """
2140            Pause traffic on port(s). Works only for ports that are active, and only if all streams are in Continuous mode.
2141
2142            :parameters:
2143                ports : list
2144                    Ports on which to execute the command
2145
2146            :raises:
2147                + :exc:`STLError`
2148
2149        """
2150
2151
2152        ports = ports if ports is not None else self.get_transmitting_ports()
2153        ports = self._validate_port_list(ports)
2154
2155        self.logger.pre_cmd("Pausing traffic on port(s) {0}:".format(ports))
2156        rc = self.__pause(ports)
2157        self.logger.post_cmd(rc)
2158
2159        if not rc:
2160            raise STLError(rc)
2161
2162    @__api_check(True)
2163    def resume (self, ports = None):
2164        """
2165            Resume traffic on port(s)
2166
2167            :parameters:
2168                ports : list
2169                    Ports on which to execute the command
2170
2171            :raises:
2172                + :exc:`STLError`
2173
2174        """
2175
2176
2177        ports = ports if ports is not None else self.get_paused_ports()
2178        ports = self._validate_port_list(ports)
2179
2180
2181        self.logger.pre_cmd("Resume traffic on port(s) {0}:".format(ports))
2182        rc = self.__resume(ports)
2183        self.logger.post_cmd(rc)
2184
2185        if not rc:
2186            raise STLError(rc)
2187
2188
2189    @__api_check(True)
2190    def push_remote (self,
2191                     pcap_filename,
2192                     ports = None,
2193                     ipg_usec = None,
2194                     speedup = 1.0,
2195                     count = 1,
2196                     duration = -1,
2197                     is_dual = False):
2198        """
2199            Push a remote server-reachable PCAP file
2200            the path must be fullpath accessible to the server
2201
2202            :parameters:
2203                pcap_filename : str
2204                    PCAP file name in full path and accessible to the server
2205
2206                ports : list
2207                    Ports on which to execute the command
2208
2209                ipg_usec : float
2210                    Inter-packet gap in microseconds
2211
2212                speedup : float
2213                    A factor to adjust IPG. effectively IPG = IPG / speedup
2214
2215                count: int
2216                    How many times to transmit the cap
2217
2218                duration: float
2219                    Limit runtime by duration in seconds
2220
2221                is_dual: bool
2222                    Inject from both directions.
2223                    requires ERF file with meta data for direction.
2224                    also requires that all the ports will be in master mode
2225                    with their adjacent ports as slaves
2226
2227            :raises:
2228                + :exc:`STLError`
2229
2230        """
2231        ports = ports if ports is not None else self.get_acquired_ports()
2232        ports = self._validate_port_list(ports)
2233
2234        validate_type('pcap_filename', pcap_filename, basestring)
2235        validate_type('ipg_usec', ipg_usec, (float, int, type(None)))
2236        validate_type('speedup',  speedup, (float, int))
2237        validate_type('count',  count, int)
2238        validate_type('duration', duration, (float, int))
2239        validate_type('is_dual', is_dual, bool)
2240
2241        # for dual mode check that all are masters
2242        if is_dual:
2243            if not pcap_filename.endswith('erf'):
2244                raise STLError("dual mode: only ERF format is supported for dual mode")
2245
2246            for port in ports:
2247                master = port
2248                slave = port ^ 0x1
2249
2250                if slave in ports:
2251                    raise STLError("dual mode: cannot provide adjacent ports ({0}, {1}) in a batch".format(master, slave))
2252
2253                if not slave in self.get_acquired_ports():
2254                    raise STLError("dual mode: adjacent port {0} must be owned during dual mode".format(slave))
2255
2256
2257        self.logger.pre_cmd("Pushing remote PCAP on port(s) {0}:".format(ports))
2258        rc = self.__push_remote(pcap_filename, ports, ipg_usec, speedup, count, duration, is_dual)
2259        self.logger.post_cmd(rc)
2260
2261        if not rc:
2262            raise STLError(rc)
2263
2264
2265    @__api_check(True)
2266    def push_pcap (self,
2267                   pcap_filename,
2268                   ports = None,
2269                   ipg_usec = None,
2270                   speedup = 1.0,
2271                   count = 1,
2272                   duration = -1,
2273                   force = False,
2274                   vm = None,
2275                   packet_hook = None,
2276                   is_dual = False):
2277        """
2278            Push a local PCAP to the server
2279            This is equivalent to loading a PCAP file to a profile
2280            and attaching the profile to port(s)
2281
2282            file size is limited to 1MB
2283
2284            :parameters:
2285                pcap_filename : str
2286                    PCAP filename (accessible locally)
2287
2288                ports : list
2289                    Ports on which to execute the command
2290
2291                ipg_usec : float
2292                    Inter-packet gap in microseconds
2293
2294                speedup : float
2295                    A factor to adjust IPG. effectively IPG = IPG / speedup
2296
2297                count: int
2298                    How many times to transmit the cap
2299
2300                duration: float
2301                    Limit runtime by duration in seconds
2302
2303                force: bool
2304                    Ignore file size limit - push any file size to the server
2305
2306                vm: list of VM instructions
2307                    VM instructions to apply for every packet
2308
2309                packet_hook : Callable or function
2310                    Will be applied to every packet
2311
2312                is_dual: bool
2313                    Inject from both directions.
2314                    requires ERF file with meta data for direction.
2315                    also requires that all the ports will be in master mode
2316                    with their adjacent ports as slaves
2317
2318            :raises:
2319                + :exc:`STLError`
2320
2321        """
2322        ports = ports if ports is not None else self.get_acquired_ports()
2323        ports = self._validate_port_list(ports)
2324
2325        validate_type('pcap_filename', pcap_filename, basestring)
2326        validate_type('ipg_usec', ipg_usec, (float, int, type(None)))
2327        validate_type('speedup',  speedup, (float, int))
2328        validate_type('count',  count, int)
2329        validate_type('duration', duration, (float, int))
2330        validate_type('vm', vm, (list, type(None)))
2331        validate_type('is_dual', is_dual, bool)
2332
2333
2334        # no support for > 1MB PCAP - use push remote
2335        if not force and os.path.getsize(pcap_filename) > (1024 * 1024):
2336            raise STLError("PCAP size of {:} is too big for local push - consider using remote push or provide 'force'".format(format_num(os.path.getsize(pcap_filename), suffix = 'B')))
2337
2338        if is_dual:
2339            for port in ports:
2340                master = port
2341                slave = port ^ 0x1
2342
2343                if slave in ports:
2344                    raise STLError("dual mode: cannot provide adjacent ports ({0}, {1}) in a batch".format(master, slave))
2345
2346                if not slave in self.get_acquired_ports():
2347                    raise STLError("dual mode: adjacent port {0} must be owned during dual mode".format(slave))
2348
2349        # regular push
2350        if not is_dual:
2351
2352            # create the profile from the PCAP
2353            try:
2354                self.logger.pre_cmd("Converting '{0}' to streams:".format(pcap_filename))
2355                profile = STLProfile.load_pcap(pcap_filename,
2356                                               ipg_usec,
2357                                               speedup,
2358                                               count,
2359                                               vm = vm,
2360                                               packet_hook = packet_hook)
2361                self.logger.post_cmd(RC_OK)
2362            except STLError as e:
2363                self.logger.post_cmd(RC_ERR(e))
2364                raise
2365
2366
2367            self.remove_all_streams(ports = ports)
2368            id_list = self.add_streams(profile.get_streams(), ports)
2369
2370            return self.start(ports = ports, duration = duration)
2371
2372        else:
2373
2374            # create a dual profile
2375            split_mode = 'MAC'
2376
2377            try:
2378                self.logger.pre_cmd("Analyzing '{0}' for dual ports based on {1}:".format(pcap_filename, split_mode))
2379                profile_a, profile_b = STLProfile.load_pcap(pcap_filename,
2380                                                            ipg_usec,
2381                                                            speedup,
2382                                                            count,
2383                                                            vm = vm,
2384                                                            packet_hook = packet_hook,
2385                                                            split_mode = split_mode)
2386
2387                self.logger.post_cmd(RC_OK())
2388
2389            except STLError as e:
2390                self.logger.post_cmd(RC_ERR(e))
2391                raise
2392
2393            all_ports = ports + [p ^ 0x1 for p in ports]
2394
2395            self.remove_all_streams(ports = all_ports)
2396
2397            for port in ports:
2398                master = port
2399                slave = port ^ 0x1
2400
2401                self.add_streams(profile_a.get_streams(), master)
2402                self.add_streams(profile_b.get_streams(), slave)
2403
2404            return self.start(ports = all_ports, duration = duration)
2405
2406
2407
2408
2409
2410    @__api_check(True)
2411    def validate (self, ports = None, mult = "1", duration = -1, total = False):
2412        """
2413            Validate port(s) configuration
2414
2415            :parameters:
2416                ports : list
2417                    Ports on which to execute the command
2418
2419             mult : str
2420                    Multiplier in a form of pps, bps, or line util in %
2421                    Examples: "5kpps", "10gbps", "85%", "32mbps"
2422
2423            duration : int
2424                    Limit the run time (seconds)
2425                    -1 = unlimited
2426
2427            total : bool
2428                    Determines whether to divide the configured bandwidth among the ports, or to duplicate the bandwidth for each port.
2429                    True: Divide bandwidth among the ports
2430                    False: Duplicate
2431
2432            :raises:
2433                + :exc:`STLError`
2434
2435        """
2436
2437
2438        ports = ports if ports is not None else self.get_acquired_ports()
2439        ports = self._validate_port_list(ports)
2440
2441        validate_type('mult', mult, basestring)
2442        validate_type('duration', duration, (int, float))
2443        validate_type('total', total, bool)
2444
2445
2446        # verify multiplier
2447        mult_obj = parsing_opts.decode_multiplier(mult,
2448                                                  allow_update = True,
2449                                                  divide_count = len(ports) if total else 1)
2450        if not mult_obj:
2451            raise STLArgumentError('mult', mult)
2452
2453        self.logger.pre_cmd("Validating streams on port(s) {0}:".format(ports))
2454        rc = self.__validate(ports)
2455        self.logger.post_cmd(rc)
2456
2457        if not rc:
2458            raise STLError(rc)
2459
2460        for port in ports:
2461            self.ports[port].print_profile(mult_obj, duration)
2462
2463
2464    @__api_check(False)
2465    def clear_stats (self, ports = None, clear_global = True, clear_flow_stats = True, clear_latency_stats = True):
2466        """
2467            Clear stats on port(s)
2468
2469            :parameters:
2470                ports : list
2471                    Ports on which to execute the command
2472
2473                clear_global : bool
2474                    Clear the global stats
2475
2476                clear_flow_stats : bool
2477                    Clear the flow stats
2478
2479                clear_latency_stats : bool
2480                    Clear the latency stats
2481
2482            :raises:
2483                + :exc:`STLError`
2484
2485        """
2486
2487        ports = ports if ports is not None else self.get_all_ports()
2488        ports = self._validate_port_list(ports)
2489
2490        # verify clear global
2491        if not type(clear_global) is bool:
2492            raise STLArgumentError('clear_global', clear_global)
2493
2494        rc = self.__clear_stats(ports, clear_global, clear_flow_stats, clear_latency_stats)
2495        if not rc:
2496            raise STLError(rc)
2497
2498
2499
2500    @__api_check(True)
2501    def is_traffic_active (self, ports = None):
2502        """
2503            Return if specified port(s) have traffic
2504
2505            :parameters:
2506                ports : list
2507                    Ports on which to execute the command
2508
2509
2510            :raises:
2511                + :exc:`STLTimeoutError` - in case timeout has expired
2512                + :exe:'STLError'
2513
2514        """
2515
2516        ports = ports if ports is not None else self.get_acquired_ports()
2517        ports = self._validate_port_list(ports)
2518
2519        return set(self.get_active_ports()).intersection(ports)
2520
2521
2522
2523    @__api_check(True)
2524    def wait_on_traffic (self, ports = None, timeout = None, rx_delay_ms = 10):
2525        """
2526            .. _wait_on_traffic:
2527
2528            Block until traffic on specified port(s) has ended
2529
2530            :parameters:
2531                ports : list
2532                    Ports on which to execute the command
2533
2534                timeout : int
2535                    timeout in seconds
2536                    default will be blocking
2537
2538                rx_delay_ms : int
2539                    Time to wait (in milliseconds) after last packet was sent, until RX filters used for
2540                    measuring flow statistics and latency are removed.
2541                    This value should reflect the time it takes packets which were transmitted to arrive
2542                    to the destination.
2543                    After this time, RX filters will be removed, and packets arriving for per flow statistics feature and latency flows will be counted as errors.
2544
2545            :raises:
2546                + :exc:`STLTimeoutError` - in case timeout has expired
2547                + :exe:'STLError'
2548
2549        """
2550
2551        ports = ports if ports is not None else self.get_acquired_ports()
2552        ports = self._validate_port_list(ports)
2553
2554
2555        timer = PassiveTimer(timeout)
2556
2557        # wait while any of the required ports are active
2558        while set(self.get_active_ports()).intersection(ports):
2559
2560            # make sure ASYNC thread is still alive - otherwise we will be stuck forever
2561            if not self.async_client.is_thread_alive():
2562                raise STLError("subscriber thread is dead")
2563
2564            time.sleep(0.01)
2565            if timer.has_expired():
2566                raise STLTimeoutError(timeout)
2567
2568        # remove any RX filters
2569        rc = self._remove_rx_filters(ports, rx_delay_ms = rx_delay_ms)
2570        if not rc:
2571            raise STLError(rc)
2572
2573
2574    @__api_check(True)
2575    def set_port_attr (self, ports = None, promiscuous = None):
2576        """
2577            Set port attributes
2578
2579            :parameters:
2580                promiscuous - True or False
2581
2582            :raises:
2583                None
2584
2585        """
2586
2587        ports = ports if ports is not None else self.get_acquired_ports()
2588        ports = self._validate_port_list(ports)
2589
2590        # check arguments
2591        validate_type('promiscuous', promiscuous, (bool, type(None)))
2592
2593        # build attributes
2594        attr_dict = {}
2595        if promiscuous is not None:
2596            attr_dict['promiscuous'] = {'enabled': bool(promiscuous)}
2597
2598        # no attributes to set
2599        if not attr_dict:
2600            return
2601
2602        self.logger.pre_cmd("Applying attributes on port(s) {0}:".format(ports))
2603        rc = self.__set_port_attr(ports, attr_dict)
2604        self.logger.post_cmd(rc)
2605
2606        if not rc:
2607            raise STLError(rc)
2608
2609    def clear_events (self):
2610        """
2611            Clear all events
2612
2613            :parameters:
2614                None
2615
2616            :raises:
2617                None
2618
2619        """
2620        self.event_handler.clear_events()
2621
2622
2623    ############################   Line       #############################
2624    ############################   Commands   #############################
2625    ############################              #############################
2626
2627    # console decorator
2628    def __console(f):
2629        @wraps(f)
2630        def wrap(*args):
2631            client = args[0]
2632
2633            time1 = time.time()
2634
2635            try:
2636                rc = f(*args)
2637            except STLError as e:
2638                client.logger.log("Log:\n" + format_text(e.brief() + "\n", 'bold'))
2639                return RC_ERR(e.brief())
2640
2641            # if got true - print time
2642            if rc:
2643                delta = time.time() - time1
2644                client.logger.log(format_time(delta) + "\n")
2645
2646            return rc
2647
2648        return wrap
2649
2650    @__console
2651    def ping_line (self, line):
2652        '''pings the server'''
2653        self.ping()
2654        return RC_OK()
2655
2656    @__console
2657    def shutdown_line (self, line):
2658        '''shutdown the server'''
2659        parser = parsing_opts.gen_parser(self,
2660                                         "shutdown",
2661                                         self.shutdown_line.__doc__,
2662                                         parsing_opts.FORCE)
2663
2664        opts = parser.parse_args(line.split())
2665        if not opts:
2666            return opts
2667
2668        self.server_shutdown(force = opts.force)
2669        return RC_OK()
2670
2671    @__console
2672    def connect_line (self, line):
2673        '''Connects to the TRex server and acquire ports'''
2674        parser = parsing_opts.gen_parser(self,
2675                                         "connect",
2676                                         self.connect_line.__doc__,
2677                                         parsing_opts.PORT_LIST_WITH_ALL,
2678                                         parsing_opts.FORCE)
2679
2680        opts = parser.parse_args(line.split(), default_ports = self.get_all_ports())
2681        if not opts:
2682            return opts
2683
2684        self.connect()
2685        self.acquire(ports = opts.ports, force = opts.force)
2686
2687        return RC_OK()
2688
2689
2690    @__console
2691    def acquire_line (self, line):
2692        '''Acquire ports\n'''
2693
2694        # define a parser
2695        parser = parsing_opts.gen_parser(self,
2696                                         "acquire",
2697                                         self.acquire_line.__doc__,
2698                                         parsing_opts.PORT_LIST_WITH_ALL,
2699                                         parsing_opts.FORCE)
2700
2701        opts = parser.parse_args(line.split(), default_ports = self.get_all_ports())
2702        if not opts:
2703            return opts
2704
2705        # filter out all the already owned ports
2706        ports = list_difference(opts.ports, self.get_acquired_ports())
2707        if not ports:
2708            msg = "acquire - all of port(s) {0} are already acquired".format(opts.ports)
2709            self.logger.log(format_text(msg, 'bold'))
2710            return RC_ERR(msg)
2711
2712        self.acquire(ports = ports, force = opts.force)
2713
2714        return RC_OK()
2715
2716
2717    #
2718    @__console
2719    def release_line (self, line):
2720        '''Release ports\n'''
2721
2722        parser = parsing_opts.gen_parser(self,
2723                                         "release",
2724                                         self.release_line.__doc__,
2725                                         parsing_opts.PORT_LIST_WITH_ALL)
2726
2727        opts = parser.parse_args(line.split(), default_ports = self.get_acquired_ports())
2728        if not opts:
2729            return opts
2730
2731        ports = list_intersect(opts.ports, self.get_acquired_ports())
2732        if not ports:
2733            if not opts.ports:
2734                msg = "release - no acquired ports"
2735                self.logger.log(format_text(msg, 'bold'))
2736                return RC_ERR(msg)
2737            else:
2738                msg = "release - none of port(s) {0} are acquired".format(opts.ports)
2739                self.logger.log(format_text(msg, 'bold'))
2740                return RC_ERR(msg)
2741
2742
2743        self.release(ports = ports)
2744
2745        return RC_OK()
2746
2747
2748    @__console
2749    def reacquire_line (self, line):
2750        '''reacquire all the ports under your username which are not acquired by your session'''
2751
2752        parser = parsing_opts.gen_parser(self,
2753                                         "reacquire",
2754                                         self.reacquire_line.__doc__)
2755
2756        opts = parser.parse_args(line.split())
2757        if not opts:
2758            return opts
2759
2760        # find all the on-owned ports under your name
2761        my_unowned_ports = list_difference([k for k, v in self.ports.items() if v.get_owner() == self.username], self.get_acquired_ports())
2762        if not my_unowned_ports:
2763            msg = "reacquire - no unowned ports under '{0}'".format(self.username)
2764            self.logger.log(msg)
2765            return RC_ERR(msg)
2766
2767        self.acquire(ports = my_unowned_ports, force = True)
2768        return RC_OK()
2769
2770
2771    @__console
2772    def disconnect_line (self, line):
2773        self.disconnect()
2774
2775
2776    @__console
2777    def reset_line (self, line):
2778        '''Reset ports - if no ports are provided all acquired ports will be reset'''
2779
2780        parser = parsing_opts.gen_parser(self,
2781                                         "reset",
2782                                         self.reset_line.__doc__,
2783                                         parsing_opts.PORT_LIST_WITH_ALL)
2784
2785        opts = parser.parse_args(line.split(), default_ports = self.get_acquired_ports(), verify_acquired = True)
2786        if not opts:
2787            return opts
2788
2789        self.reset(ports = opts.ports)
2790
2791        return RC_OK()
2792
2793
2794
2795    @__console
2796    def start_line (self, line):
2797        '''Start selected traffic on specified ports on TRex\n'''
2798        # define a parser
2799        parser = parsing_opts.gen_parser(self,
2800                                         "start",
2801                                         self.start_line.__doc__,
2802                                         parsing_opts.PORT_LIST_WITH_ALL,
2803                                         parsing_opts.TOTAL,
2804                                         parsing_opts.FORCE,
2805                                         parsing_opts.FILE_PATH,
2806                                         parsing_opts.DURATION,
2807                                         parsing_opts.TUNABLES,
2808                                         parsing_opts.MULTIPLIER_STRICT,
2809                                         parsing_opts.DRY_RUN,
2810                                         parsing_opts.CORE_MASK_GROUP)
2811
2812        opts = parser.parse_args(line.split(), default_ports = self.get_acquired_ports(), verify_acquired = True)
2813        if not opts:
2814            return opts
2815
2816        # core mask
2817        if opts.core_mask is not None:
2818            core_mask =  opts.core_mask
2819        else:
2820            core_mask = self.CORE_MASK_PIN if opts.pin_cores else self.CORE_MASK_SPLIT
2821
2822        # just for sanity - will be checked on the API as well
2823        self.__decode_core_mask(opts.ports, core_mask)
2824
2825        active_ports = list_intersect(self.get_active_ports(), opts.ports)
2826        if active_ports:
2827            if not opts.force:
2828                msg = "Port(s) {0} are active - please stop them or add '--force'\n".format(active_ports)
2829                self.logger.log(format_text(msg, 'bold'))
2830                return RC_ERR(msg)
2831            else:
2832                self.stop(active_ports)
2833
2834
2835        # process tunables
2836        if type(opts.tunables) is dict:
2837            tunables = opts.tunables
2838        else:
2839            tunables = {}
2840
2841
2842        # remove all streams
2843        self.remove_all_streams(opts.ports)
2844
2845        # pack the profile
2846        try:
2847            for port in opts.ports:
2848
2849                profile = STLProfile.load(opts.file[0],
2850                                          direction = tunables.get('direction', port % 2),
2851                                          port_id = port,
2852                                          **tunables)
2853
2854                self.add_streams(profile.get_streams(), ports = port)
2855
2856        except STLError as e:
2857            error = 'Unknown error.'
2858            for line in e.brief().split('\n'):
2859                if line:
2860                    error = line
2861            msg = format_text("\nError loading profile '{0}'".format(opts.file[0]), 'bold')
2862            self.logger.log(msg + '\n')
2863            self.logger.log(e.brief() + "\n")
2864            return RC_ERR("%s: %s" % (msg, error))
2865
2866
2867        if opts.dry:
2868            self.validate(opts.ports, opts.mult, opts.duration, opts.total)
2869        else:
2870
2871            self.start(opts.ports,
2872                       opts.mult,
2873                       opts.force,
2874                       opts.duration,
2875                       opts.total,
2876                       core_mask)
2877
2878        return RC_OK()
2879
2880
2881
2882    @__console
2883    def stop_line (self, line):
2884        '''Stop active traffic on specified ports on TRex\n'''
2885        parser = parsing_opts.gen_parser(self,
2886                                         "stop",
2887                                         self.stop_line.__doc__,
2888                                         parsing_opts.PORT_LIST_WITH_ALL)
2889
2890        opts = parser.parse_args(line.split(), default_ports = self.get_active_ports(), verify_acquired = True)
2891        if not opts:
2892            return opts
2893
2894
2895        # find the relevant ports
2896        ports = list_intersect(opts.ports, self.get_active_ports())
2897        if not ports:
2898            if not opts.ports:
2899                msg = 'stop - no active ports'
2900            else:
2901                msg = 'stop - no active traffic on ports {0}'.format(opts.ports)
2902
2903            self.logger.log(msg)
2904            return RC_ERR(msg)
2905
2906        # call API
2907        self.stop(ports)
2908
2909        return RC_OK()
2910
2911
2912    @__console
2913    def update_line (self, line):
2914        '''Update port(s) speed currently active\n'''
2915        parser = parsing_opts.gen_parser(self,
2916                                         "update",
2917                                         self.update_line.__doc__,
2918                                         parsing_opts.PORT_LIST_WITH_ALL,
2919                                         parsing_opts.MULTIPLIER,
2920                                         parsing_opts.TOTAL,
2921                                         parsing_opts.FORCE)
2922
2923        opts = parser.parse_args(line.split(), default_ports = self.get_active_ports(), verify_acquired = True)
2924        if not opts:
2925            return opts
2926
2927
2928        # find the relevant ports
2929        ports = list_intersect(opts.ports, self.get_active_ports())
2930        if not ports:
2931            if not opts.ports:
2932                msg = 'update - no active ports'
2933            else:
2934                msg = 'update - no active traffic on ports {0}'.format(opts.ports)
2935
2936            self.logger.log(msg)
2937            return RC_ERR(msg)
2938
2939        self.update(ports, opts.mult, opts.total, opts.force)
2940
2941        return RC_OK()
2942
2943
2944    @__console
2945    def pause_line (self, line):
2946        '''Pause active traffic on specified ports on TRex\n'''
2947        parser = parsing_opts.gen_parser(self,
2948                                         "pause",
2949                                         self.pause_line.__doc__,
2950                                         parsing_opts.PORT_LIST_WITH_ALL)
2951
2952        opts = parser.parse_args(line.split(), default_ports = self.get_transmitting_ports(), verify_acquired = True)
2953        if not opts:
2954            return opts
2955
2956        # check for already paused case
2957        if opts.ports and is_sub_list(opts.ports, self.get_paused_ports()):
2958            msg = 'pause - all of port(s) {0} are already paused'.format(opts.ports)
2959            self.logger.log(msg)
2960            return RC_ERR(msg)
2961
2962        # find the relevant ports
2963        ports = list_intersect(opts.ports, self.get_transmitting_ports())
2964        if not ports:
2965            if not opts.ports:
2966                msg = 'pause - no transmitting ports'
2967            else:
2968                msg = 'pause - none of ports {0} are transmitting'.format(opts.ports)
2969
2970            self.logger.log(msg)
2971            return RC_ERR(msg)
2972
2973        self.pause(ports)
2974
2975        return RC_OK()
2976
2977
2978    @__console
2979    def resume_line (self, line):
2980        '''Resume active traffic on specified ports on TRex\n'''
2981        parser = parsing_opts.gen_parser(self,
2982                                         "resume",
2983                                         self.resume_line.__doc__,
2984                                         parsing_opts.PORT_LIST_WITH_ALL)
2985
2986        opts = parser.parse_args(line.split(), default_ports = self.get_paused_ports(), verify_acquired = True)
2987        if not opts:
2988            return opts
2989
2990        # find the relevant ports
2991        ports = list_intersect(opts.ports, self.get_paused_ports())
2992        if not ports:
2993            if not opts.ports:
2994                msg = 'resume - no paused ports'
2995            else:
2996                msg = 'resume - none of ports {0} are paused'.format(opts.ports)
2997
2998            self.logger.log(msg)
2999            return RC_ERR(msg)
3000
3001
3002        self.resume(ports)
3003
3004        # true means print time
3005        return RC_OK()
3006
3007
3008    @__console
3009    def clear_stats_line (self, line):
3010        '''Clear cached local statistics\n'''
3011        # define a parser
3012        parser = parsing_opts.gen_parser(self,
3013                                         "clear",
3014                                         self.clear_stats_line.__doc__,
3015                                         parsing_opts.PORT_LIST_WITH_ALL)
3016
3017        opts = parser.parse_args(line.split())
3018
3019        if not opts:
3020            return opts
3021
3022        self.clear_stats(opts.ports)
3023
3024        return RC_OK()
3025
3026
3027    @__console
3028    def show_stats_line (self, line):
3029        '''Get statistics from TRex server by port\n'''
3030        # define a parser
3031        parser = parsing_opts.gen_parser(self,
3032                                         "stats",
3033                                         self.show_stats_line.__doc__,
3034                                         parsing_opts.PORT_LIST_WITH_ALL,
3035                                         parsing_opts.STATS_MASK)
3036
3037        opts = parser.parse_args(line.split())
3038
3039        if not opts:
3040            return opts
3041
3042        # determine stats mask
3043        mask = self.__get_mask_keys(**self.__filter_namespace_args(opts, trex_stl_stats.ALL_STATS_OPTS))
3044        if not mask:
3045            # set to show all stats if no filter was given
3046            mask = trex_stl_stats.COMPACT
3047
3048        stats_opts = common.list_intersect(trex_stl_stats.ALL_STATS_OPTS, mask)
3049
3050        stats = self._get_formatted_stats(opts.ports, mask)
3051
3052
3053        # print stats to screen
3054        for stat_type, stat_data in stats.items():
3055            text_tables.print_table_with_header(stat_data.text_table, stat_type)
3056
3057
3058    @__console
3059    def show_streams_line(self, line):
3060        '''Get stream statistics from TRex server by port\n'''
3061        # define a parser
3062        parser = parsing_opts.gen_parser(self,
3063                                         "streams",
3064                                         self.show_streams_line.__doc__,
3065                                         parsing_opts.PORT_LIST_WITH_ALL,
3066                                         parsing_opts.STREAMS_MASK)
3067
3068        opts = parser.parse_args(line.split())
3069
3070        if not opts:
3071            return opts
3072
3073        streams = self._get_streams(opts.ports, set(opts.streams))
3074        if not streams:
3075            self.logger.log(format_text("No streams found with desired filter.\n", "bold", "magenta"))
3076
3077        else:
3078            # print stats to screen
3079            for stream_hdr, port_streams_data in streams.items():
3080                text_tables.print_table_with_header(port_streams_data.text_table,
3081                                                    header= stream_hdr.split(":")[0] + ":",
3082                                                    untouched_header= stream_hdr.split(":")[1])
3083
3084
3085
3086
3087    @__console
3088    def validate_line (self, line):
3089        '''Validates port(s) stream configuration\n'''
3090
3091        parser = parsing_opts.gen_parser(self,
3092                                         "validate",
3093                                         self.validate_line.__doc__,
3094                                         parsing_opts.PORT_LIST_WITH_ALL)
3095
3096        opts = parser.parse_args(line.split())
3097        if not opts:
3098            return opts
3099
3100        self.validate(opts.ports)
3101
3102
3103
3104
3105    @__console
3106    def push_line (self, line):
3107        '''Push a pcap file '''
3108
3109        parser = parsing_opts.gen_parser(self,
3110                                         "push",
3111                                         self.push_line.__doc__,
3112                                         parsing_opts.FILE_PATH,
3113                                         parsing_opts.REMOTE_FILE,
3114                                         parsing_opts.PORT_LIST_WITH_ALL,
3115                                         parsing_opts.COUNT,
3116                                         parsing_opts.DURATION,
3117                                         parsing_opts.IPG,
3118                                         parsing_opts.SPEEDUP,
3119                                         parsing_opts.FORCE,
3120                                         parsing_opts.DUAL)
3121
3122        opts = parser.parse_args(line.split(), verify_acquired = True)
3123        if not opts:
3124            return opts
3125
3126        active_ports = list(set(self.get_active_ports()).intersection(opts.ports))
3127
3128        if active_ports:
3129            if not opts.force:
3130                msg = "Port(s) {0} are active - please stop them or add '--force'\n".format(active_ports)
3131                self.logger.log(format_text(msg, 'bold'))
3132                return RC_ERR(msg)
3133            else:
3134                self.stop(active_ports)
3135
3136
3137        if opts.remote:
3138            self.push_remote(opts.file[0],
3139                             ports     = opts.ports,
3140                             ipg_usec  = opts.ipg_usec,
3141                             speedup   = opts.speedup,
3142                             count     = opts.count,
3143                             duration  = opts.duration,
3144                             is_dual   = opts.dual)
3145
3146        else:
3147            self.push_pcap(opts.file[0],
3148                           ports     = opts.ports,
3149                           ipg_usec  = opts.ipg_usec,
3150                           speedup   = opts.speedup,
3151                           count     = opts.count,
3152                           duration  = opts.duration,
3153                           force     = opts.force,
3154                           is_dual   = opts.dual)
3155
3156
3157
3158        return RC_OK()
3159
3160
3161
3162    @__console
3163    def set_port_attr_line (self, line):
3164        '''Sets port attributes '''
3165
3166        parser = parsing_opts.gen_parser(self,
3167                                         "port_attr",
3168                                         self.set_port_attr_line.__doc__,
3169                                         parsing_opts.PORT_LIST_WITH_ALL,
3170                                         parsing_opts.PROMISCUOUS_SWITCH)
3171
3172        opts = parser.parse_args(line.split(), default_ports = self.get_acquired_ports(), verify_acquired = True)
3173        if not opts:
3174            return opts
3175
3176        # if no attributes - fall back to printing the status
3177        if opts.prom is None:
3178            self.show_stats_line("--ps --port {0}".format(' '.join(str(port) for port in opts.ports)))
3179            return
3180
3181        self.set_port_attr(opts.ports, opts.prom)
3182        return RC_OK()
3183
3184
3185    @__console
3186    def show_profile_line (self, line):
3187        '''Shows profile information'''
3188
3189        parser = parsing_opts.gen_parser(self,
3190                                         "port",
3191                                         self.show_profile_line.__doc__,
3192                                         parsing_opts.FILE_PATH)
3193
3194        opts = parser.parse_args(line.split())
3195        if not opts:
3196            return opts
3197
3198        info = STLProfile.get_info(opts.file[0])
3199
3200        self.logger.log(format_text('\nProfile Information:\n', 'bold'))
3201
3202        # general info
3203        self.logger.log(format_text('\nGeneral Information:', 'underline'))
3204        self.logger.log('Filename:         {:^12}'.format(opts.file[0]))
3205        self.logger.log('Stream count:     {:^12}'.format(info['stream_count']))
3206
3207        # specific info
3208        profile_type = info['type']
3209        self.logger.log(format_text('\nSpecific Information:', 'underline'))
3210
3211        if profile_type == 'python':
3212            self.logger.log('Type:             {:^12}'.format('Python Module'))
3213            self.logger.log('Tunables:         {:^12}'.format(str(['{0} = {1}'.format(k ,v) for k, v in info['tunables'].items()])))
3214
3215        elif profile_type == 'yaml':
3216            self.logger.log('Type:             {:^12}'.format('YAML'))
3217
3218        elif profile_type == 'pcap':
3219            self.logger.log('Type:             {:^12}'.format('PCAP file'))
3220
3221        self.logger.log("")
3222
3223
3224    @__console
3225    def get_events_line (self, line):
3226        '''shows events recieved from server\n'''
3227
3228        x = [parsing_opts.ArgumentPack(['-c','--clear'],
3229                                      {'action' : "store_true",
3230                                       'default': False,
3231                                       'help': "clear the events log"}),
3232
3233             parsing_opts.ArgumentPack(['-i','--info'],
3234                                      {'action' : "store_true",
3235                                       'default': False,
3236                                       'help': "show info events"}),
3237
3238             parsing_opts.ArgumentPack(['-w','--warn'],
3239                                      {'action' : "store_true",
3240                                       'default': False,
3241                                       'help': "show warning events"}),
3242
3243             ]
3244
3245
3246        parser = parsing_opts.gen_parser(self,
3247                                         "events",
3248                                         self.get_events_line.__doc__,
3249                                         *x)
3250
3251        opts = parser.parse_args(line.split())
3252        if not opts:
3253            return opts
3254
3255
3256        ev_type_filter = []
3257
3258        if opts.info:
3259            ev_type_filter.append('info')
3260
3261        if opts.warn:
3262            ev_type_filter.append('warning')
3263
3264        if not ev_type_filter:
3265            ev_type_filter = None
3266
3267        events = self.get_events(ev_type_filter)
3268        for ev in events:
3269            self.logger.log(ev)
3270
3271        if opts.clear:
3272            self.clear_events()
3273
3274    def generate_prompt (self, prefix = 'trex'):
3275        if not self.is_connected():
3276            return "{0}(offline)>".format(prefix)
3277
3278        elif not self.get_acquired_ports():
3279            return "{0}(read-only)>".format(prefix)
3280
3281        elif self.is_all_ports_acquired():
3282            return "{0}>".format(prefix)
3283
3284        else:
3285            return "{0} {1}>".format(prefix, self.get_acquired_ports())
3286