trex_stl_port.py revision c2e0fc65
1
2from collections import namedtuple, OrderedDict
3
4from .trex_stl_packet_builder_scapy import STLPktBuilder
5from .trex_stl_streams import STLStream
6from .trex_stl_types import *
7from .trex_stl_rx_features import ARPResolver, PingResolver
8from . import trex_stl_stats
9from .utils.constants import FLOW_CTRL_DICT_REVERSED
10
11import base64
12import copy
13from datetime import datetime, timedelta
14import threading
15
16StreamOnPort = namedtuple('StreamOnPort', ['compiled_stream', 'metadata'])
17
18########## utlity ############
19def mult_to_factor (mult, max_bps_l2, max_pps, line_util):
20    if mult['type'] == 'raw':
21        return mult['value']
22
23    if mult['type'] == 'bps':
24        return mult['value'] / max_bps_l2
25
26    if mult['type'] == 'pps':
27        return mult['value'] / max_pps
28
29    if mult['type'] == 'percentage':
30        return mult['value'] / line_util
31
32
33# describes a single port
34class Port(object):
35    STATE_DOWN         = 0
36    STATE_IDLE         = 1
37    STATE_STREAMS      = 2
38    STATE_TX           = 3
39    STATE_PAUSE        = 4
40    STATE_PCAP_TX      = 5
41
42    MASK_ALL = ((1 << 64) - 1)
43
44    PortState = namedtuple('PortState', ['state_id', 'state_name'])
45    STATES_MAP = {STATE_DOWN: "DOWN",
46                  STATE_IDLE: "IDLE",
47                  STATE_STREAMS: "IDLE",
48                  STATE_TX: "TRANSMITTING",
49                  STATE_PAUSE: "PAUSE",
50                  STATE_PCAP_TX : "TRANSMITTING"}
51
52
53    def __init__ (self, port_id, user, comm_link, session_id, info):
54        self.port_id = port_id
55
56        self.state = self.STATE_IDLE
57
58        self.handler = None
59        self.comm_link = comm_link
60        self.transmit = comm_link.transmit
61        self.transmit_batch = comm_link.transmit_batch
62        self.user = user
63
64        self.info = dict(info)
65
66        self.streams = {}
67        self.profile = None
68        self.session_id = session_id
69        self.status = {}
70
71        self.port_stats = trex_stl_stats.CPortStats(self)
72
73        self.next_available_id = 1
74        self.tx_stopped_ts = None
75        self.has_rx_streams = False
76
77        self.owner = ''
78        self.last_factor_type = None
79
80        self.__attr = {}
81        self.attr_lock = threading.Lock()
82
83    # decorator to verify port is up
84    def up(func):
85        def func_wrapper(*args, **kwargs):
86            port = args[0]
87
88            if not port.is_up():
89                return port.err("{0} - port is down".format(func.__name__))
90
91            return func(*args, **kwargs)
92
93        return func_wrapper
94
95    # owned
96    def owned(func):
97        def func_wrapper(*args, **kwargs):
98            port = args[0]
99
100            if not port.is_acquired():
101                return port.err("{0} - port is not owned".format(func.__name__))
102
103            return func(*args, **kwargs)
104
105        return func_wrapper
106
107
108    # decorator to check server is readable (port not down and etc.)
109    def writeable(func):
110        def func_wrapper(*args, **kwargs):
111            port = args[0]
112
113            if not port.is_acquired():
114                return port.err("{0} - port is not owned".format(func.__name__))
115
116            if not port.is_writeable():
117                return port.err("{0} - port is not in a writeable state".format(func.__name__))
118
119            return func(*args, **kwargs)
120
121        return func_wrapper
122
123
124
125    def err(self, msg):
126        return RC_ERR("port {0} : {1}\n".format(self.port_id, msg))
127
128    def ok(self, data = ""):
129        return RC_OK(data)
130
131    def get_speed_bps (self):
132        return (self.get_speed_gbps() * 1000 * 1000 * 1000)
133
134    def get_speed_gbps (self):
135        return self.__attr['speed']
136
137    def is_acquired(self):
138        return (self.handler != None)
139
140    def is_up (self):
141        return self.__attr['link']['up']
142
143    def is_active(self):
144        return (self.state == self.STATE_TX ) or (self.state == self.STATE_PAUSE) or (self.state == self.STATE_PCAP_TX)
145
146    def is_transmitting (self):
147        return (self.state == self.STATE_TX) or (self.state == self.STATE_PCAP_TX)
148
149    def is_paused (self):
150        return (self.state == self.STATE_PAUSE)
151
152    def is_writeable (self):
153        # operations on port can be done on state idle or state streams
154        return ((self.state == self.STATE_IDLE) or (self.state == self.STATE_STREAMS))
155
156    def get_owner (self):
157        if self.is_acquired():
158            return self.user
159        else:
160            return self.owner
161
162    def __allocate_stream_id (self):
163        id = self.next_available_id
164        self.next_available_id += 1
165        return id
166
167
168    # take the port
169    def acquire(self, force = False, sync_streams = True):
170        params = {"port_id":     self.port_id,
171                  "user":        self.user,
172                  "session_id":  self.session_id,
173                  "force":       force}
174
175        rc = self.transmit("acquire", params)
176        if not rc:
177            return self.err(rc.err())
178
179        self.handler = rc.data()
180
181        if sync_streams:
182            return self.sync_streams()
183        else:
184            return self.ok()
185
186
187    # sync all the streams with the server
188    def sync_streams (self):
189        params = {"port_id": self.port_id}
190
191        rc = self.transmit("get_all_streams", params)
192        if rc.bad():
193            return self.err(rc.err())
194
195        for k, v in rc.data()['streams'].items():
196            self.streams[k] = {'next_id': v['next_stream_id'],
197                               'pkt'    : base64.b64decode(v['packet']['binary']),
198                               'mode'   : v['mode']['type'],
199                               'rate'   : STLStream.get_rate_from_field(v['mode']['rate'])}
200        return self.ok()
201
202    # release the port
203    def release(self):
204        params = {"port_id": self.port_id,
205                  "handler": self.handler}
206
207        rc = self.transmit("release", params)
208
209        if rc.good():
210
211            self.handler = None
212            self.owner = ''
213
214            return self.ok()
215        else:
216            return self.err(rc.err())
217
218
219
220    def sync(self):
221
222        params = {"port_id": self.port_id}
223
224        rc = self.transmit("get_port_status", params)
225        if rc.bad():
226            return self.err(rc.err())
227
228        # sync the port
229        port_state = rc.data()['state']
230
231        if port_state == "DOWN":
232            self.state = self.STATE_DOWN
233        elif port_state == "IDLE":
234            self.state = self.STATE_IDLE
235        elif port_state == "STREAMS":
236            self.state = self.STATE_STREAMS
237        elif port_state == "TX":
238            self.state = self.STATE_TX
239        elif port_state == "PAUSE":
240            self.state = self.STATE_PAUSE
241        elif port_state == "PCAP_TX":
242            self.state = self.STATE_PCAP_TX
243        else:
244            raise Exception("port {0}: bad state received from server '{1}'".format(self.port_id, port_state))
245
246        self.owner = rc.data()['owner']
247
248        self.next_available_id = int(rc.data()['max_stream_id']) + 1
249
250        self.status = rc.data()
251
252        # replace the attributes in a thread safe manner
253        self.set_ts_attr(rc.data()['attr'])
254
255        return self.ok()
256
257
258
259    # add streams
260    @writeable
261    def add_streams (self, streams_list):
262
263        # listify
264        streams_list = streams_list if isinstance(streams_list, list) else [streams_list]
265
266        lookup = {}
267
268        # allocate IDs
269        for stream in streams_list:
270
271            # allocate stream id
272            stream_id = stream.get_id() if stream.get_id() is not None else self.__allocate_stream_id()
273            if stream_id in self.streams:
274                return self.err('Stream ID: {0} already exists'.format(stream_id))
275
276            # name
277            name = stream.get_name() if stream.get_name() is not None else id(stream)
278            if name in lookup:
279                return self.err("multiple streams with duplicate name: '{0}'".format(name))
280            lookup[name] = stream_id
281
282        batch = []
283        for stream in streams_list:
284
285            name = stream.get_name() if stream.get_name() is not None else id(stream)
286            stream_id = lookup[name]
287            next_id = -1
288
289            next = stream.get_next()
290            if next:
291                if not next in lookup:
292                    return self.err("stream dependency error - unable to find '{0}'".format(next))
293                next_id = lookup[next]
294
295            stream_json = stream.to_json()
296            stream_json['next_stream_id'] = next_id
297
298            params = {"handler": self.handler,
299                      "port_id": self.port_id,
300                      "stream_id": stream_id,
301                      "stream": stream_json}
302
303            cmd = RpcCmdData('add_stream', params, 'core')
304            batch.append(cmd)
305
306
307        rc = self.transmit_batch(batch)
308
309        ret = RC()
310        for i, single_rc in enumerate(rc):
311            if single_rc.rc:
312                stream_id = batch[i].params['stream_id']
313                next_id   = batch[i].params['stream']['next_stream_id']
314                self.streams[stream_id] = {'next_id'        : next_id,
315                                           'pkt'            : streams_list[i].get_pkt(),
316                                           'mode'           : streams_list[i].get_mode(),
317                                           'rate'           : streams_list[i].get_rate(),
318                                           'has_flow_stats' : streams_list[i].has_flow_stats()}
319
320                ret.add(RC_OK(data = stream_id))
321
322                self.has_rx_streams = self.has_rx_streams or streams_list[i].has_flow_stats()
323
324            else:
325                ret.add(RC(*single_rc))
326
327        self.state = self.STATE_STREAMS if (len(self.streams) > 0) else self.STATE_IDLE
328
329        return ret if ret else self.err(str(ret))
330
331
332
333    # remove stream from port
334    @writeable
335    def remove_streams (self, stream_id_list):
336
337        # single element to list
338        stream_id_list = stream_id_list if isinstance(stream_id_list, list) else [stream_id_list]
339
340        # verify existance
341        if not all([stream_id in self.streams for stream_id in stream_id_list]):
342            return self.err("stream {0} does not exists".format(stream_id))
343
344        batch = []
345
346        for stream_id in stream_id_list:
347            params = {"handler": self.handler,
348                      "port_id": self.port_id,
349                      "stream_id": stream_id}
350
351            cmd = RpcCmdData('remove_stream', params, 'core')
352            batch.append(cmd)
353
354
355        rc = self.transmit_batch(batch)
356        for i, single_rc in enumerate(rc):
357            if single_rc:
358                id = batch[i].params['stream_id']
359                del self.streams[id]
360
361        self.state = self.STATE_STREAMS if (len(self.streams) > 0) else self.STATE_IDLE
362
363        # recheck if any RX stats streams present on the port
364        self.has_rx_streams = any([stream['has_flow_stats'] for stream in self.streams.values()])
365
366        return self.ok() if rc else self.err(rc.err())
367
368
369    # remove all the streams
370    @writeable
371    def remove_all_streams (self):
372
373        params = {"handler": self.handler,
374                  "port_id": self.port_id}
375
376        rc = self.transmit("remove_all_streams", params)
377        if not rc:
378            return self.err(rc.err())
379
380        self.streams = {}
381
382        self.state = self.STATE_IDLE
383        self.has_rx_streams = False
384
385        return self.ok()
386
387
388    # get a specific stream
389    def get_stream (self, stream_id):
390        if stream_id in self.streams:
391            return self.streams[stream_id]
392        else:
393            return None
394
395    def get_all_streams (self):
396        return self.streams
397
398
399    @writeable
400    def start (self, mul, duration, force, mask):
401
402        if self.state == self.STATE_IDLE:
403            return self.err("unable to start traffic - no streams attached to port")
404
405        params = {"handler":    self.handler,
406                  "port_id":    self.port_id,
407                  "mul":        mul,
408                  "duration":   duration,
409                  "force":      force,
410                  "core_mask":  mask if mask is not None else self.MASK_ALL}
411
412        # must set this before to avoid race with the async response
413        last_state = self.state
414        self.state = self.STATE_TX
415
416        rc = self.transmit("start_traffic", params)
417
418        if rc.bad():
419            self.state = last_state
420            return self.err(rc.err())
421
422        # save this for TUI
423        self.last_factor_type = mul['type']
424
425        return rc
426
427
428    # stop traffic
429    # with force ignores the cached state and sends the command
430    @owned
431    def stop (self, force = False):
432
433        # if not is not active and not force - go back
434        if not self.is_active() and not force:
435            return self.ok()
436
437        params = {"handler": self.handler,
438                  "port_id": self.port_id}
439
440        rc = self.transmit("stop_traffic", params)
441        if rc.bad():
442            return self.err(rc.err())
443
444        self.state = self.STATE_STREAMS
445
446        self.last_factor_type = None
447
448        # timestamp for last tx
449        self.tx_stopped_ts = datetime.now()
450
451        return self.ok()
452
453
454    # return True if port has any stream configured with RX stats
455    def has_rx_enabled (self):
456        return self.has_rx_streams
457
458
459    # return true if rx_delay_ms has passed since the last port stop
460    def has_rx_delay_expired (self, rx_delay_ms):
461        assert(self.has_rx_enabled())
462
463        # if active - it's not safe to remove RX filters
464        if self.is_active():
465            return False
466
467        # either no timestamp present or time has already passed
468        return not self.tx_stopped_ts or (datetime.now() - self.tx_stopped_ts) > timedelta(milliseconds = rx_delay_ms)
469
470
471    @writeable
472    def remove_rx_filters (self):
473        assert(self.has_rx_enabled())
474
475        if self.state == self.STATE_IDLE:
476            return self.ok()
477
478
479        params = {"handler": self.handler,
480                  "port_id": self.port_id}
481
482        rc = self.transmit("remove_rx_filters", params)
483        if rc.bad():
484            return self.err(rc.err())
485
486        return self.ok()
487
488
489    @owned
490    def set_rx_sniffer (self, pcap_filename, limit):
491
492        params = {"handler":        self.handler,
493                  "port_id":        self.port_id,
494                  "type":           "capture",
495                  "enabled":        True,
496                  "pcap_filename":  pcap_filename,
497                  "limit":          limit}
498
499        rc = self.transmit("set_rx_feature", params)
500        if rc.bad():
501            return self.err(rc.err())
502
503        return self.ok()
504
505
506    @owned
507    def remove_rx_sniffer (self):
508        params = {"handler":        self.handler,
509                  "port_id":        self.port_id,
510                  "type":           "capture",
511                  "enabled":        False}
512
513        rc = self.transmit("set_rx_feature", params)
514        if rc.bad():
515            return self.err(rc.err())
516
517        return self.ok()
518
519
520    @owned
521    def set_arp_resolution (self, ipv4, mac):
522
523        params = {"handler":        self.handler,
524                  "port_id":        self.port_id,
525                  "ipv4":           ipv4,
526                  "mac":            mac}
527
528        rc = self.transmit("set_arp_resolution", params)
529        if rc.bad():
530            return self.err(rc.err())
531
532        return self.ok()
533
534
535
536
537    @owned
538    def set_rx_queue (self, size):
539
540        params = {"handler":        self.handler,
541                  "port_id":        self.port_id,
542                  "type":           "queue",
543                  "enabled":        True,
544                  "size":          size}
545
546        rc = self.transmit("set_rx_feature", params)
547        if rc.bad():
548            return self.err(rc.err())
549
550        return self.ok()
551
552    @owned
553    def remove_rx_queue (self):
554        params = {"handler":        self.handler,
555                  "port_id":        self.port_id,
556                  "type":           "queue",
557                  "enabled":        False}
558
559        rc = self.transmit("set_rx_feature", params)
560        if rc.bad():
561            return self.err(rc.err())
562
563        return self.ok()
564
565    @owned
566    def get_rx_queue_pkts (self):
567        params = {"handler":        self.handler,
568                  "port_id":        self.port_id}
569
570        rc = self.transmit("get_rx_queue_pkts", params)
571        if rc.bad():
572            return self.err(rc.err())
573
574        pkts = rc.data()['pkts']
575
576        # decode the packets from base64 to binary
577        for i in range(len(pkts)):
578            pkts[i]['binary'] = base64.b64decode(pkts[i]['binary'])
579
580        return RC_OK(pkts)
581
582
583    @owned
584    def pause (self):
585
586        if (self.state == self.STATE_PCAP_TX) :
587            return self.err("pause is not supported during PCAP TX")
588
589        if (self.state != self.STATE_TX) :
590            return self.err("port is not transmitting")
591
592        params = {"handler": self.handler,
593                  "port_id": self.port_id}
594
595        rc  = self.transmit("pause_traffic", params)
596        if rc.bad():
597            return self.err(rc.err())
598
599        self.state = self.STATE_PAUSE
600
601        return self.ok()
602
603    @owned
604    def resume (self):
605
606        if (self.state != self.STATE_PAUSE) :
607            return self.err("port is not in pause mode")
608
609        params = {"handler": self.handler,
610                  "port_id": self.port_id}
611
612        # only valid state after stop
613
614        rc = self.transmit("resume_traffic", params)
615        if rc.bad():
616            return self.err(rc.err())
617
618        self.state = self.STATE_TX
619
620        return self.ok()
621
622    @owned
623    def update (self, mul, force):
624
625        if (self.state == self.STATE_PCAP_TX) :
626            return self.err("update is not supported during PCAP TX")
627
628        if (self.state != self.STATE_TX) :
629            return self.err("port is not transmitting")
630
631        params = {"handler": self.handler,
632                  "port_id": self.port_id,
633                  "mul":     mul,
634                  "force":   force}
635
636        rc = self.transmit("update_traffic", params)
637        if rc.bad():
638            return self.err(rc.err())
639
640        # save this for TUI
641        self.last_factor_type = mul['type']
642
643        return self.ok()
644
645    @owned
646    def validate (self):
647
648        if (self.state == self.STATE_IDLE):
649            return self.err("no streams attached to port")
650
651        params = {"handler": self.handler,
652                  "port_id": self.port_id}
653
654        rc = self.transmit("validate", params)
655        if rc.bad():
656            return self.err(rc.err())
657
658        self.profile = rc.data()
659
660        return self.ok()
661
662
663    @owned
664    def set_attr (self, **kwargs):
665
666        json_attr = {}
667
668        if kwargs.get('promiscuous') is not None:
669            json_attr['promiscuous'] = {'enabled': kwargs.get('promiscuous')}
670
671        if kwargs.get('link_status') is not None:
672            json_attr['link_status'] = {'up': kwargs.get('link_status')}
673
674        if kwargs.get('led_status') is not None:
675            json_attr['led_status'] = {'on': kwargs.get('led_status')}
676
677        if kwargs.get('flow_ctrl_mode') is not None:
678            json_attr['flow_ctrl_mode'] = {'on': kwargs.get('flow_ctrl_mode')}
679
680        if kwargs.get('rx_filter_mode') is not None:
681            json_attr['rx_filter_mode'] = {'mode': kwargs.get('rx_filter_mode')}
682
683        if kwargs.get('ipv4') is not None:
684            json_attr['ipv4'] = {'addr': kwargs.get('ipv4')}
685
686        if kwargs.get('dest') is not None:
687            json_attr['dest'] = {'addr': kwargs.get('dest')}
688
689
690        params = {"handler": self.handler,
691                  "port_id": self.port_id,
692                  "attr": json_attr}
693
694        rc = self.transmit("set_port_attr", params)
695        if rc.bad():
696            return self.err(rc.err())
697
698        # update the dictionary from the server explicitly
699        return self.sync()
700
701
702    @writeable
703    def push_remote (self, pcap_filename, ipg_usec, speedup, count, duration, is_dual, slave_handler):
704
705        params = {"handler": self.handler,
706                  "port_id": self.port_id,
707                  "pcap_filename": pcap_filename,
708                  "ipg_usec": ipg_usec if ipg_usec is not None else -1,
709                  "speedup": speedup,
710                  "count": count,
711                  "duration": duration,
712                  "is_dual": is_dual,
713                  "slave_handler": slave_handler}
714
715        rc = self.transmit("push_remote", params)
716        if rc.bad():
717            return self.err(rc.err())
718
719        self.state = self.STATE_PCAP_TX
720        return self.ok()
721
722
723    def get_profile (self):
724        return self.profile
725
726    # invalidates the current ARP
727    def invalidate_arp (self):
728        dest = self.__attr['dest']
729
730        if dest['type'] != 'mac':
731            return self.set_attr(dest = dest['ipv4'])
732        else:
733            return self.ok()
734
735
736    def print_profile (self, mult, duration):
737        if not self.get_profile():
738            return
739
740        rate = self.get_profile()['rate']
741        graph = self.get_profile()['graph']
742
743        print(format_text("Profile Map Per Port\n", 'underline', 'bold'))
744
745        factor = mult_to_factor(mult, rate['max_bps_l2'], rate['max_pps'], rate['max_line_util'])
746
747        print("Profile max BPS L2    (base / req):   {:^12} / {:^12}".format(format_num(rate['max_bps_l2'], suffix = "bps"),
748                                                                             format_num(rate['max_bps_l2'] * factor, suffix = "bps")))
749
750        print("Profile max BPS L1    (base / req):   {:^12} / {:^12}".format(format_num(rate['max_bps_l1'], suffix = "bps"),
751                                                                             format_num(rate['max_bps_l1'] * factor, suffix = "bps")))
752
753        print("Profile max PPS       (base / req):   {:^12} / {:^12}".format(format_num(rate['max_pps'], suffix = "pps"),
754                                                                             format_num(rate['max_pps'] * factor, suffix = "pps"),))
755
756        print("Profile line util.    (base / req):   {:^12} / {:^12}".format(format_percentage(rate['max_line_util']),
757                                                                             format_percentage(rate['max_line_util'] * factor)))
758
759
760        # duration
761        exp_time_base_sec = graph['expected_duration'] / (1000 * 1000)
762        exp_time_factor_sec = exp_time_base_sec / factor
763
764        # user configured a duration
765        if duration > 0:
766            if exp_time_factor_sec > 0:
767                exp_time_factor_sec = min(exp_time_factor_sec, duration)
768            else:
769                exp_time_factor_sec = duration
770
771
772        print("Duration              (base / req):   {:^12} / {:^12}".format(format_time(exp_time_base_sec),
773                                                                             format_time(exp_time_factor_sec)))
774        print("\n")
775
776    # generate formatted (console friendly) port info
777    def get_formatted_info (self, sync = True):
778
779        # sync the status
780        if sync:
781            self.sync()
782
783        # get a copy of the current attribute set (safe against manipulation)
784        attr = self.get_ts_attr()
785
786        info = dict(self.info)
787
788        info['status'] = self.get_port_state_name()
789
790        if 'link' in attr:
791            info['link'] = 'UP' if attr['link']['up'] else 'DOWN'
792        else:
793            info['link'] = 'N/A'
794
795        if 'fc' in attr:
796            info['fc'] = FLOW_CTRL_DICT_REVERSED.get(attr['fc']['mode'], 'N/A')
797        else:
798            info['fc'] = 'N/A'
799
800        if 'promiscuous' in attr:
801            info['prom'] = "on" if attr['promiscuous']['enabled'] else "off"
802        else:
803            info['prom'] = "N/A"
804
805        if 'description' not in info:
806            info['description'] = "N/A"
807
808        if 'is_fc_supported' in info:
809            info['fc_supported'] = 'yes' if info['is_fc_supported'] else 'no'
810        else:
811            info['fc_supported'] = 'N/A'
812
813        if 'is_led_supported' in info:
814            info['led_change_supported'] = 'yes' if info['is_led_supported'] else 'no'
815        else:
816            info['led_change_supported'] = 'N/A'
817
818        if 'is_link_supported' in info:
819            info['link_change_supported'] = 'yes' if info['is_link_supported'] else 'no'
820        else:
821            info['link_change_supported'] = 'N/A'
822
823        if 'is_virtual' in info:
824            info['is_virtual'] = 'yes' if info['is_virtual'] else 'no'
825        else:
826            info['is_virtual'] = 'N/A'
827
828        # speed
829        info['speed'] = self.get_speed_gbps()
830
831        # RX filter mode
832        info['rx_filter_mode'] = 'hardware match' if attr['rx_filter_mode'] == 'hw' else 'fetch all'
833
834        # src MAC and IPv4
835        info['src_mac']   = attr['src_mac']
836        info['src_ipv4']  = attr['src_ipv4']
837
838        if info['src_ipv4'] is None:
839            info['src_ipv4'] = 'Not Configured'
840
841        # dest
842        dest = attr['dest']
843        if dest['type'] == 'mac':
844            info['dest']  = dest['mac']
845            info['arp']   = '-'
846
847        elif dest['type'] == 'ipv4':
848            info['dest']  = dest['ipv4']
849            info['arp']   = dest['arp']
850
851        elif dest['type'] == 'ipv4_u':
852            info['dest']  = dest['ipv4']
853            info['arp']   = 'unresolved'
854
855
856        # RX info
857        rx_info = self.status['rx_info']
858
859        # RX sniffer
860        sniffer = rx_info['sniffer']
861        info['rx_sniffer'] = '{0}\n[{1} / {2}]'.format(sniffer['pcap_filename'], sniffer['count'], sniffer['limit']) if sniffer['is_active'] else 'off'
862
863
864        # RX queue
865        queue = rx_info['queue']
866        info['rx_queue'] = '[{0} / {1}]'.format(queue['count'], queue['size']) if queue['is_active'] else 'off'
867
868        return info
869
870
871    def get_port_state_name(self):
872        return self.STATES_MAP.get(self.state, "Unknown")
873
874    def get_src_addr (self):
875        src_mac  = self.__attr['src_mac']
876        src_ipv4 = self.__attr['src_ipv4']
877
878        return {'mac': src_mac, 'ipv4': src_ipv4}
879
880
881    def get_dst_addr (self):
882        dest = self.__attr['dest']
883
884        if dest['type'] == 'mac':
885            return {'ipv4': None, 'mac': dest['mac']}
886
887        elif dest['type'] == 'ipv4':
888            return {'ipv4': dest['ipv4'], 'mac': dest['arp']}
889
890        elif dest['type'] == 'ipv4_u':
891            return {'ipv4': dest['ipv4'], 'mac': None}
892
893        else:
894            assert(0)
895
896
897    # port is considered resolved if it's dest is either MAC or resolved IPv4
898    def is_resolved (self):
899        return (self.get_dst_addr()['mac'] is not None)
900
901    # return True if the port is valid for resolve (has an IPv4 address as dest)
902    def is_resolvable (self):
903        return (self.get_dst_addr()['ipv4'] is not None)
904
905    @writeable
906    def arp_resolve (self, retries):
907        return ARPResolver(self).resolve(retries)
908
909    @writeable
910    def ping (self, ping_ipv4, pkt_size):
911        return PingResolver(self, ping_ipv4, pkt_size).resolve()
912
913
914    ################# stats handler ######################
915    def generate_port_stats(self):
916        return self.port_stats.generate_stats()
917
918    def generate_port_status(self):
919
920        info = self.get_formatted_info()
921
922        return {"driver":           info['driver'],
923                "description":      info.get('description', 'N/A')[:18],
924                "src MAC":          info['src_mac'],
925                "src IPv4":         info['src_ipv4'],
926                "Destination":      info['dest'],
927                "ARP Resolution":   info['arp'],
928                "PCI Address":      info['pci_addr'],
929                "NUMA Node":        info['numa'],
930                "--": "",
931                "---": "",
932                "----": "",
933                "-----": "",
934                "link speed": info['speed'],
935                "port status": info['status'],
936                "link status": info['link'],
937                "promiscuous" : info['prom'],
938                "flow ctrl" : info['fc'],
939
940                "RX Filter Mode": info['rx_filter_mode'],
941                "RX Queueing": info['rx_queue'],
942                "RX sniffer": info['rx_sniffer'],
943
944                }
945
946    def clear_stats(self):
947        return self.port_stats.clear_stats()
948
949
950    def get_stats (self):
951        return self.port_stats.get_stats()
952
953
954    def invalidate_stats(self):
955        return self.port_stats.invalidate()
956
957    ################# stream printout ######################
958    def generate_loaded_streams_sum(self):
959        if self.state == self.STATE_DOWN:
960            return {}
961
962        data = {}
963        for id, obj in self.streams.items():
964
965            # lazy build scapy repr.
966            if not 'pkt_type' in obj:
967                obj['pkt_type'] = STLPktBuilder.pkt_layers_desc_from_buffer(obj['pkt'])
968
969            data[id] = OrderedDict([ ('id',  id),
970                                     ('packet_type',  obj['pkt_type']),
971                                     ('L2 len',       len(obj['pkt']) + 4),
972                                     ('mode',         obj['mode']),
973                                     ('rate',         obj['rate']),
974                                     ('next_stream',  obj['next_id'] if not '-1' else 'None')
975                                    ])
976
977        return {"streams" : OrderedDict(sorted(data.items())) }
978
979
980    ######## attributes are a complex type (dict) that might be manipulated through the async thread #############
981
982    # get in a thread safe manner a duplication of attributes
983    def get_ts_attr (self):
984        with self.attr_lock:
985            return dict(self.__attr)
986
987    # set in a thread safe manner a new dict of attributes
988    def set_ts_attr (self, new_attr):
989        with self.attr_lock:
990            self.__attr = new_attr
991
992
993  ################# events handler ######################
994    def async_event_port_job_done (self):
995        # until thread is locked - order is important
996        self.tx_stopped_ts = datetime.now()
997        self.state = self.STATE_STREAMS
998
999        self.last_factor_type = None
1000
1001    def async_event_port_attr_changed (self, new_attr):
1002
1003        # get a thread safe duplicate
1004        cur_attr = self.get_ts_attr()
1005
1006        # check if anything changed
1007        if new_attr == cur_attr:
1008            return None
1009
1010        # generate before
1011        before = self.get_formatted_info(sync = False)
1012
1013        # update
1014        self.set_ts_attr(new_attr)
1015
1016        # generate after
1017        after = self.get_formatted_info(sync = False)
1018
1019        # return diff
1020        diff = {}
1021        for key, new_value in after.items():
1022            old_value = before.get(key, 'N/A')
1023            if new_value != old_value:
1024                diff[key] = (old_value, new_value)
1025
1026        return diff
1027
1028
1029    # rest of the events are used for TUI / read only sessions
1030    def async_event_port_stopped (self):
1031        if not self.is_acquired():
1032            self.state = self.STATE_STREAMS
1033
1034    def async_event_port_paused (self):
1035        if not self.is_acquired():
1036            self.state = self.STATE_PAUSE
1037
1038    def async_event_port_started (self):
1039        if not self.is_acquired():
1040            self.state = self.STATE_TX
1041
1042    def async_event_port_resumed (self):
1043        if not self.is_acquired():
1044            self.state = self.STATE_TX
1045
1046    def async_event_acquired (self, who):
1047        self.handler = None
1048        self.owner = who
1049
1050    def async_event_released (self):
1051        self.owner = ''
1052
1053
1054