trex_stl_port.py revision 04ecbc54
1
2from collections import namedtuple, OrderedDict
3
4from .trex_stl_packet_builder_scapy import STLPktBuilder
5from .trex_stl_streams import STLStream
6from .trex_stl_types import *
7
8from .rx_services.trex_stl_rx_service_arp import RXServiceARP
9from .rx_services.trex_stl_rx_service_icmp import RXServiceICMP
10
11from . import trex_stl_stats
12from .utils.constants import FLOW_CTRL_DICT_REVERSED
13
14import base64
15import copy
16from datetime import datetime, timedelta
17import threading
18
19StreamOnPort = namedtuple('StreamOnPort', ['compiled_stream', 'metadata'])
20
21########## utlity ############
22def mult_to_factor (mult, max_bps_l2, max_pps, line_util):
23    if mult['type'] == 'raw':
24        return mult['value']
25
26    if mult['type'] == 'bps':
27        return mult['value'] / max_bps_l2
28
29    if mult['type'] == 'pps':
30        return mult['value'] / max_pps
31
32    if mult['type'] == 'percentage':
33        return mult['value'] / line_util
34
35
36# describes a single port
37class Port(object):
38    STATE_DOWN         = 0
39    STATE_IDLE         = 1
40    STATE_STREAMS      = 2
41    STATE_TX           = 3
42    STATE_PAUSE        = 4
43    STATE_PCAP_TX      = 5
44
45    MASK_ALL = ((1 << 64) - 1)
46
47    PortState = namedtuple('PortState', ['state_id', 'state_name'])
48    STATES_MAP = {STATE_DOWN: "DOWN",
49                  STATE_IDLE: "IDLE",
50                  STATE_STREAMS: "IDLE",
51                  STATE_TX: "TRANSMITTING",
52                  STATE_PAUSE: "PAUSE",
53                  STATE_PCAP_TX : "TRANSMITTING"}
54
55
56    def __init__ (self, port_id, user, comm_link, session_id, info):
57        self.port_id = port_id
58
59        self.state = self.STATE_IDLE
60
61        self.handler = None
62        self.comm_link = comm_link
63        self.transmit = comm_link.transmit
64        self.transmit_batch = comm_link.transmit_batch
65        self.user = user
66
67        self.info = dict(info)
68
69        self.streams = {}
70        self.profile = None
71        self.session_id = session_id
72        self.status = {}
73
74        self.port_stats = trex_stl_stats.CPortStats(self)
75
76        self.next_available_id = 1
77        self.tx_stopped_ts = None
78        self.has_rx_streams = False
79
80        self.owner = ''
81        self.last_factor_type = None
82
83        self.__attr = {}
84        self.attr_lock = threading.Lock()
85
86    # decorator to verify port is up
87    def up(func):
88        def func_wrapper(*args, **kwargs):
89            port = args[0]
90
91            if not port.is_up():
92                return port.err("{0} - port is down".format(func.__name__))
93
94            return func(*args, **kwargs)
95
96        return func_wrapper
97
98    # owned
99    def owned(func):
100        def func_wrapper(*args, **kwargs):
101            port = args[0]
102
103            if not port.is_acquired():
104                return port.err("{0} - port is not owned".format(func.__name__))
105
106            return func(*args, **kwargs)
107
108        return func_wrapper
109
110
111    # decorator to check server is readable (port not down and etc.)
112    def writeable(func):
113        def func_wrapper(*args, **kwargs):
114            port = args[0]
115
116            if not port.is_acquired():
117                return port.err("{0} - port is not owned".format(func.__name__))
118
119            if not port.is_writeable():
120                return port.err("{0} - port is active, please stop the port before executing command".format(func.__name__))
121
122            return func(*args, **kwargs)
123
124        return func_wrapper
125
126
127
128    def err(self, msg):
129        return RC_ERR("port {0} : *** {1}".format(self.port_id, msg))
130
131    def ok(self, data = ""):
132        return RC_OK(data)
133
134    def get_speed_bps (self):
135        return (self.get_speed_gbps() * 1000 * 1000 * 1000)
136
137    def get_speed_gbps (self):
138        return self.__attr['speed']
139
140    def is_acquired(self):
141        return (self.handler != None)
142
143    def is_up (self):
144        return self.__attr['link']['up']
145
146    def is_active(self):
147        return (self.state == self.STATE_TX ) or (self.state == self.STATE_PAUSE) or (self.state == self.STATE_PCAP_TX)
148
149    def is_transmitting (self):
150        return (self.state == self.STATE_TX) or (self.state == self.STATE_PCAP_TX)
151
152    def is_paused (self):
153        return (self.state == self.STATE_PAUSE)
154
155    def is_writeable (self):
156        # operations on port can be done on state idle or state streams
157        return ((self.state == self.STATE_IDLE) or (self.state == self.STATE_STREAMS))
158
159    def get_owner (self):
160        if self.is_acquired():
161            return self.user
162        else:
163            return self.owner
164
165    def __allocate_stream_id (self):
166        id = self.next_available_id
167        self.next_available_id += 1
168        return id
169
170
171    # take the port
172    def acquire(self, force = False, sync_streams = True):
173        params = {"port_id":     self.port_id,
174                  "user":        self.user,
175                  "session_id":  self.session_id,
176                  "force":       force}
177
178        rc = self.transmit("acquire", params)
179        if not rc:
180            return self.err(rc.err())
181
182        self.handler = rc.data()
183
184        if sync_streams:
185            return self.sync_streams()
186        else:
187            return self.ok()
188
189
190    # sync all the streams with the server
191    def sync_streams (self):
192        params = {"port_id": self.port_id}
193
194        rc = self.transmit("get_all_streams", params)
195        if rc.bad():
196            return self.err(rc.err())
197
198        for k, v in rc.data()['streams'].items():
199            self.streams[k] = {'next_id': v['next_stream_id'],
200                               'pkt'    : base64.b64decode(v['packet']['binary']),
201                               'mode'   : v['mode']['type'],
202                               'rate'   : STLStream.get_rate_from_field(v['mode']['rate'])}
203        return self.ok()
204
205    # release the port
206    def release(self):
207        params = {"port_id": self.port_id,
208                  "handler": self.handler}
209
210        rc = self.transmit("release", params)
211
212        if rc.good():
213
214            self.handler = None
215            self.owner = ''
216
217            return self.ok()
218        else:
219            return self.err(rc.err())
220
221
222
223    def sync(self):
224
225        params = {"port_id": self.port_id}
226
227        rc = self.transmit("get_port_status", params)
228        if rc.bad():
229            return self.err(rc.err())
230
231        # sync the port
232        port_state = rc.data()['state']
233
234        if port_state == "DOWN":
235            self.state = self.STATE_DOWN
236        elif port_state == "IDLE":
237            self.state = self.STATE_IDLE
238        elif port_state == "STREAMS":
239            self.state = self.STATE_STREAMS
240        elif port_state == "TX":
241            self.state = self.STATE_TX
242        elif port_state == "PAUSE":
243            self.state = self.STATE_PAUSE
244        elif port_state == "PCAP_TX":
245            self.state = self.STATE_PCAP_TX
246        else:
247            raise Exception("port {0}: bad state received from server '{1}'".format(self.port_id, port_state))
248
249        self.owner = rc.data()['owner']
250
251        self.next_available_id = int(rc.data()['max_stream_id']) + 1
252
253        self.status = rc.data()
254
255        # replace the attributes in a thread safe manner
256        self.set_ts_attr(rc.data()['attr'])
257
258        return self.ok()
259
260
261
262    # add streams
263    @writeable
264    def add_streams (self, streams_list):
265
266        # listify
267        streams_list = streams_list if isinstance(streams_list, list) else [streams_list]
268
269        lookup = {}
270
271        # allocate IDs
272        for stream in streams_list:
273
274            # allocate stream id
275            stream_id = stream.get_id() if stream.get_id() is not None else self.__allocate_stream_id()
276            if stream_id in self.streams:
277                return self.err('Stream ID: {0} already exists'.format(stream_id))
278
279            # name
280            name = stream.get_name() if stream.get_name() is not None else id(stream)
281            if name in lookup:
282                return self.err("multiple streams with duplicate name: '{0}'".format(name))
283            lookup[name] = stream_id
284
285        batch = []
286        for stream in streams_list:
287
288            name = stream.get_name() if stream.get_name() is not None else id(stream)
289            stream_id = lookup[name]
290            next_id = -1
291
292            next = stream.get_next()
293            if next:
294                if not next in lookup:
295                    return self.err("stream dependency error - unable to find '{0}'".format(next))
296                next_id = lookup[next]
297
298            stream_json = stream.to_json()
299            stream_json['next_stream_id'] = next_id
300
301            params = {"handler": self.handler,
302                      "port_id": self.port_id,
303                      "stream_id": stream_id,
304                      "stream": stream_json}
305
306            cmd = RpcCmdData('add_stream', params, 'core')
307            batch.append(cmd)
308
309
310        rc = self.transmit_batch(batch)
311
312        ret = RC()
313        for i, single_rc in enumerate(rc):
314            if single_rc.rc:
315                stream_id = batch[i].params['stream_id']
316                next_id   = batch[i].params['stream']['next_stream_id']
317                self.streams[stream_id] = {'next_id'        : next_id,
318                                           'pkt'            : streams_list[i].get_pkt(),
319                                           'mode'           : streams_list[i].get_mode(),
320                                           'rate'           : streams_list[i].get_rate(),
321                                           'has_flow_stats' : streams_list[i].has_flow_stats()}
322
323                ret.add(RC_OK(data = stream_id))
324
325                self.has_rx_streams = self.has_rx_streams or streams_list[i].has_flow_stats()
326
327            else:
328                ret.add(RC(*single_rc))
329
330        self.state = self.STATE_STREAMS if (len(self.streams) > 0) else self.STATE_IDLE
331
332        return ret if ret else self.err(str(ret))
333
334
335
336    # remove stream from port
337    @writeable
338    def remove_streams (self, stream_id_list):
339
340        # single element to list
341        stream_id_list = stream_id_list if isinstance(stream_id_list, list) else [stream_id_list]
342
343        # verify existance
344        if not all([stream_id in self.streams for stream_id in stream_id_list]):
345            return self.err("stream {0} does not exists".format(stream_id))
346
347        batch = []
348
349        for stream_id in stream_id_list:
350            params = {"handler": self.handler,
351                      "port_id": self.port_id,
352                      "stream_id": stream_id}
353
354            cmd = RpcCmdData('remove_stream', params, 'core')
355            batch.append(cmd)
356
357
358        rc = self.transmit_batch(batch)
359        for i, single_rc in enumerate(rc):
360            if single_rc:
361                id = batch[i].params['stream_id']
362                del self.streams[id]
363
364        self.state = self.STATE_STREAMS if (len(self.streams) > 0) else self.STATE_IDLE
365
366        # recheck if any RX stats streams present on the port
367        self.has_rx_streams = any([stream['has_flow_stats'] for stream in self.streams.values()])
368
369        return self.ok() if rc else self.err(rc.err())
370
371
372    # remove all the streams
373    @writeable
374    def remove_all_streams (self):
375
376        params = {"handler": self.handler,
377                  "port_id": self.port_id}
378
379        rc = self.transmit("remove_all_streams", params)
380        if not rc:
381            return self.err(rc.err())
382
383        self.streams = {}
384
385        self.state = self.STATE_IDLE
386        self.has_rx_streams = False
387
388        return self.ok()
389
390
391    # get a specific stream
392    def get_stream (self, stream_id):
393        if stream_id in self.streams:
394            return self.streams[stream_id]
395        else:
396            return None
397
398    def get_all_streams (self):
399        return self.streams
400
401
402    @writeable
403    def start (self, mul, duration, force, mask):
404
405        if self.state == self.STATE_IDLE:
406            return self.err("unable to start traffic - no streams attached to port")
407
408        params = {"handler":    self.handler,
409                  "port_id":    self.port_id,
410                  "mul":        mul,
411                  "duration":   duration,
412                  "force":      force,
413                  "core_mask":  mask if mask is not None else self.MASK_ALL}
414
415        # must set this before to avoid race with the async response
416        last_state = self.state
417        self.state = self.STATE_TX
418
419        rc = self.transmit("start_traffic", params)
420
421        if rc.bad():
422            self.state = last_state
423            return self.err(rc.err())
424
425        # save this for TUI
426        self.last_factor_type = mul['type']
427
428        return rc
429
430
431    # stop traffic
432    # with force ignores the cached state and sends the command
433    @owned
434    def stop (self, force = False):
435
436        # if not is not active and not force - go back
437        if not self.is_active() and not force:
438            return self.ok()
439
440        params = {"handler": self.handler,
441                  "port_id": self.port_id}
442
443        rc = self.transmit("stop_traffic", params)
444        if rc.bad():
445            return self.err(rc.err())
446
447        self.state = self.STATE_STREAMS
448
449        self.last_factor_type = None
450
451        # timestamp for last tx
452        self.tx_stopped_ts = datetime.now()
453
454        return self.ok()
455
456
457    # return True if port has any stream configured with RX stats
458    def has_rx_enabled (self):
459        return self.has_rx_streams
460
461
462    # return true if rx_delay_ms has passed since the last port stop
463    def has_rx_delay_expired (self, rx_delay_ms):
464        assert(self.has_rx_enabled())
465
466        # if active - it's not safe to remove RX filters
467        if self.is_active():
468            return False
469
470        # either no timestamp present or time has already passed
471        return not self.tx_stopped_ts or (datetime.now() - self.tx_stopped_ts) > timedelta(milliseconds = rx_delay_ms)
472
473
474    @writeable
475    def remove_rx_filters (self):
476        assert(self.has_rx_enabled())
477
478        if self.state == self.STATE_IDLE:
479            return self.ok()
480
481
482        params = {"handler": self.handler,
483                  "port_id": self.port_id}
484
485        rc = self.transmit("remove_rx_filters", params)
486        if rc.bad():
487            return self.err(rc.err())
488
489        return self.ok()
490
491
492    @owned
493    def set_rx_sniffer (self, pcap_filename, limit):
494
495        if not self.is_service_mode_on():
496            return self.err('port service mode must be enabled for performing RX capturing. Please enable service mode')
497
498        params = {"handler":        self.handler,
499                  "port_id":        self.port_id,
500                  "type":           "capture",
501                  "enabled":        True,
502                  "pcap_filename":  pcap_filename,
503                  "limit":          limit}
504
505        rc = self.transmit("set_rx_feature", params)
506        if rc.bad():
507            return self.err(rc.err())
508
509        return self.ok()
510
511
512    @owned
513    def remove_rx_sniffer (self):
514        params = {"handler":        self.handler,
515                  "port_id":        self.port_id,
516                  "type":           "capture",
517                  "enabled":        False}
518
519        rc = self.transmit("set_rx_feature", params)
520        if rc.bad():
521            return self.err(rc.err())
522
523        return self.ok()
524
525    @writeable
526    def set_l2_mode (self, dst_mac):
527        if not self.is_service_mode_on():
528            return self.err('port service mode must be enabled for configuring L2 mode. Please enable service mode')
529
530        params = {"handler":        self.handler,
531                  "port_id":        self.port_id,
532                  "dst_mac":        dst_mac}
533
534        rc = self.transmit("set_l2", params)
535        if rc.bad():
536            return self.err(rc.err())
537
538        return self.sync()
539
540
541    @writeable
542    def set_l3_mode (self, src_addr, dst_addr, resolved_mac = None):
543        if not self.is_service_mode_on():
544            return self.err('port service mode must be enabled for configuring L3 mode. Please enable service mode')
545
546        params = {"handler":        self.handler,
547                  "port_id":        self.port_id,
548                  "src_addr":       src_addr,
549                  "dst_addr":       dst_addr}
550
551        if resolved_mac:
552            params["resolved_mac"] = resolved_mac
553
554        rc = self.transmit("set_l3", params)
555        if rc.bad():
556            return self.err(rc.err())
557
558        return self.sync()
559
560
561    @owned
562    def set_rx_queue (self, size):
563
564        params = {"handler":        self.handler,
565                  "port_id":        self.port_id,
566                  "type":           "queue",
567                  "enabled":        True,
568                  "size":          size}
569
570        rc = self.transmit("set_rx_feature", params)
571        if rc.bad():
572            return self.err(rc.err())
573
574        return self.ok()
575
576    @owned
577    def remove_rx_queue (self):
578        params = {"handler":        self.handler,
579                  "port_id":        self.port_id,
580                  "type":           "queue",
581                  "enabled":        False}
582
583        rc = self.transmit("set_rx_feature", params)
584        if rc.bad():
585            return self.err(rc.err())
586
587        return self.ok()
588
589    @owned
590    def get_rx_queue_pkts (self):
591        params = {"handler":        self.handler,
592                  "port_id":        self.port_id}
593
594        rc = self.transmit("get_rx_queue_pkts", params)
595        if rc.bad():
596            return self.err(rc.err())
597
598        pkts = rc.data()['pkts']
599
600        # decode the packets from base64 to binary
601        for i in range(len(pkts)):
602            pkts[i]['binary'] = base64.b64decode(pkts[i]['binary'])
603
604        return RC_OK(pkts)
605
606
607    @owned
608    def pause (self):
609
610        if (self.state == self.STATE_PCAP_TX) :
611            return self.err("pause is not supported during PCAP TX")
612
613        if (self.state != self.STATE_TX) :
614            return self.err("port is not transmitting")
615
616        params = {"handler": self.handler,
617                  "port_id": self.port_id}
618
619        rc  = self.transmit("pause_traffic", params)
620        if rc.bad():
621            return self.err(rc.err())
622
623        self.state = self.STATE_PAUSE
624
625        return self.ok()
626
627    @owned
628    def resume (self):
629
630        if (self.state != self.STATE_PAUSE) :
631            return self.err("port is not in pause mode")
632
633        params = {"handler": self.handler,
634                  "port_id": self.port_id}
635
636        # only valid state after stop
637
638        rc = self.transmit("resume_traffic", params)
639        if rc.bad():
640            return self.err(rc.err())
641
642        self.state = self.STATE_TX
643
644        return self.ok()
645
646    @owned
647    def update (self, mul, force):
648
649        if (self.state == self.STATE_PCAP_TX) :
650            return self.err("update is not supported during PCAP TX")
651
652        if (self.state != self.STATE_TX) :
653            return self.err("port is not transmitting")
654
655        params = {"handler": self.handler,
656                  "port_id": self.port_id,
657                  "mul":     mul,
658                  "force":   force}
659
660        rc = self.transmit("update_traffic", params)
661        if rc.bad():
662            return self.err(rc.err())
663
664        # save this for TUI
665        self.last_factor_type = mul['type']
666
667        return self.ok()
668
669    @owned
670    def validate (self):
671
672        if (self.state == self.STATE_IDLE):
673            return self.err("no streams attached to port")
674
675        params = {"handler": self.handler,
676                  "port_id": self.port_id}
677
678        rc = self.transmit("validate", params)
679        if rc.bad():
680            return self.err(rc.err())
681
682        self.profile = rc.data()
683
684        return self.ok()
685
686
687    @owned
688    def set_attr (self, **kwargs):
689
690        json_attr = {}
691
692        if kwargs.get('promiscuous') is not None:
693            json_attr['promiscuous'] = {'enabled': kwargs.get('promiscuous')}
694
695        if kwargs.get('link_status') is not None:
696            json_attr['link_status'] = {'up': kwargs.get('link_status')}
697
698        if kwargs.get('led_status') is not None:
699            json_attr['led_status'] = {'on': kwargs.get('led_status')}
700
701        if kwargs.get('flow_ctrl_mode') is not None:
702            json_attr['flow_ctrl_mode'] = {'mode': kwargs.get('flow_ctrl_mode')}
703
704        if kwargs.get('rx_filter_mode') is not None:
705            json_attr['rx_filter_mode'] = {'mode': kwargs.get('rx_filter_mode')}
706
707
708        params = {"handler": self.handler,
709                  "port_id": self.port_id,
710                  "attr": json_attr}
711
712        rc = self.transmit("set_port_attr", params)
713        if rc.bad():
714            return self.err(rc.err())
715
716        # update the dictionary from the server explicitly
717        return self.sync()
718
719
720    @owned
721    def set_service_mode (self, enabled):
722        rc = self.set_attr(rx_filter_mode = 'all' if enabled else 'hw')
723        if not rc:
724            return rc
725
726        if not enabled:
727            rc = self.remove_rx_queue()
728            if not rc:
729                return rc
730
731            rc = self.remove_rx_sniffer()
732            if not rc:
733                return rc
734
735        return self.ok()
736
737    def is_service_mode_on (self):
738        return self.get_rx_filter_mode() == 'all'
739
740    @writeable
741    def push_remote (self, pcap_filename, ipg_usec, speedup, count, duration, is_dual, slave_handler, min_ipg_usec):
742
743        params = {"handler": self.handler,
744                  "port_id": self.port_id,
745                  "pcap_filename": pcap_filename,
746                  "ipg_usec": ipg_usec if ipg_usec is not None else -1,
747                  "speedup": speedup,
748                  "count": count,
749                  "duration": duration,
750                  "is_dual": is_dual,
751                  "slave_handler": slave_handler,
752                  "min_ipg_usec": min_ipg_usec if min_ipg_usec else 0}
753
754        rc = self.transmit("push_remote", params)
755        if rc.bad():
756            return self.err(rc.err())
757
758        self.state = self.STATE_PCAP_TX
759        return self.ok()
760
761
762    def get_profile (self):
763        return self.profile
764
765    # invalidates the current ARP
766    def invalidate_arp (self):
767        if not self.is_l3_mode():
768            return self.err('port is not configured with L3')
769
770        layer_cfg = self.get_layer_cfg()
771
772        # reconfigure server with unresolved IPv4 information
773        return self.set_l3_mode(layer_cfg['ipv4']['src'], layer_cfg['ipv4']['dst'])
774
775
776
777    def print_profile (self, mult, duration):
778        if not self.get_profile():
779            return
780
781        rate = self.get_profile()['rate']
782        graph = self.get_profile()['graph']
783
784        print(format_text("Profile Map Per Port\n", 'underline', 'bold'))
785
786        factor = mult_to_factor(mult, rate['max_bps_l2'], rate['max_pps'], rate['max_line_util'])
787
788        print("Profile max BPS L2    (base / req):   {:^12} / {:^12}".format(format_num(rate['max_bps_l2'], suffix = "bps"),
789                                                                             format_num(rate['max_bps_l2'] * factor, suffix = "bps")))
790
791        print("Profile max BPS L1    (base / req):   {:^12} / {:^12}".format(format_num(rate['max_bps_l1'], suffix = "bps"),
792                                                                             format_num(rate['max_bps_l1'] * factor, suffix = "bps")))
793
794        print("Profile max PPS       (base / req):   {:^12} / {:^12}".format(format_num(rate['max_pps'], suffix = "pps"),
795                                                                             format_num(rate['max_pps'] * factor, suffix = "pps"),))
796
797        print("Profile line util.    (base / req):   {:^12} / {:^12}".format(format_percentage(rate['max_line_util']),
798                                                                             format_percentage(rate['max_line_util'] * factor)))
799
800
801        # duration
802        exp_time_base_sec = graph['expected_duration'] / (1000 * 1000)
803        exp_time_factor_sec = exp_time_base_sec / factor
804
805        # user configured a duration
806        if duration > 0:
807            if exp_time_factor_sec > 0:
808                exp_time_factor_sec = min(exp_time_factor_sec, duration)
809            else:
810                exp_time_factor_sec = duration
811
812
813        print("Duration              (base / req):   {:^12} / {:^12}".format(format_time(exp_time_base_sec),
814                                                                             format_time(exp_time_factor_sec)))
815        print("\n")
816
817    # generate formatted (console friendly) port info
818    def get_formatted_info (self, sync = True):
819
820        # sync the status
821        if sync:
822            self.sync()
823
824        # get a copy of the current attribute set (safe against manipulation)
825        attr = self.get_ts_attr()
826
827        info = dict(self.info)
828
829        info['status'] = self.get_port_state_name()
830
831        if 'link' in attr:
832            info['link'] = 'UP' if attr['link']['up'] else 'DOWN'
833        else:
834            info['link'] = 'N/A'
835
836        if 'fc' in attr:
837            info['fc'] = FLOW_CTRL_DICT_REVERSED.get(attr['fc']['mode'], 'N/A')
838        else:
839            info['fc'] = 'N/A'
840
841        if 'promiscuous' in attr:
842            info['prom'] = "on" if attr['promiscuous']['enabled'] else "off"
843        else:
844            info['prom'] = "N/A"
845
846        if 'description' not in info:
847            info['description'] = "N/A"
848
849        if 'is_fc_supported' in info:
850            info['fc_supported'] = 'yes' if info['is_fc_supported'] else 'no'
851        else:
852            info['fc_supported'] = 'N/A'
853
854        if 'is_led_supported' in info:
855            info['led_change_supported'] = 'yes' if info['is_led_supported'] else 'no'
856        else:
857            info['led_change_supported'] = 'N/A'
858
859        if 'is_link_supported' in info:
860            info['link_change_supported'] = 'yes' if info['is_link_supported'] else 'no'
861        else:
862            info['link_change_supported'] = 'N/A'
863
864        if 'is_virtual' in info:
865            info['is_virtual'] = 'yes' if info['is_virtual'] else 'no'
866        else:
867            info['is_virtual'] = 'N/A'
868
869        # speed
870        info['speed'] = self.get_speed_gbps()
871
872        # RX filter mode
873        info['rx_filter_mode'] = 'hardware match' if attr['rx_filter_mode'] == 'hw' else 'fetch all'
874
875        # holds the information about all the layers configured for the port
876        layer_cfg = attr['layer_cfg']
877
878        info['src_mac'] = attr['layer_cfg']['ether']['src']
879
880        # pretty show per mode
881
882        if layer_cfg['ipv4']['state'] == 'none':
883            info['layer_mode'] = 'Ethernet'
884            info['src_ipv4']   = '-'
885            info['dest']       = layer_cfg['ether']['dst'] if layer_cfg['ether']['state'] == 'configured' else 'unconfigured'
886            info['arp']        = '-'
887
888        elif layer_cfg['ipv4']['state'] == 'unresolved':
889            info['layer_mode'] = 'IPv4'
890            info['src_ipv4']   = layer_cfg['ipv4']['src']
891            info['dest']       = layer_cfg['ipv4']['dst']
892            info['arp']        = 'unresolved'
893
894        elif layer_cfg['ipv4']['state'] == 'resolved':
895            info['layer_mode'] = 'IPv4'
896            info['src_ipv4']   = layer_cfg['ipv4']['src']
897            info['dest']       = layer_cfg['ipv4']['dst']
898            info['arp']        = layer_cfg['ether']['dst']
899
900
901
902        # RX info
903        rx_info = self.status['rx_info']
904
905        # RX sniffer
906        sniffer = rx_info['sniffer']
907        info['rx_sniffer'] = '{0}\n[{1} / {2}]'.format(sniffer['pcap_filename'], sniffer['count'], sniffer['limit']) if sniffer['is_active'] else 'off'
908
909
910        # RX queue
911        queue = rx_info['queue']
912        info['rx_queue'] = '[{0} / {1}]'.format(queue['count'], queue['size']) if queue['is_active'] else 'off'
913
914        # Grat ARP
915        grat_arp = rx_info['grat_arp']
916        if grat_arp['is_active']:
917            info['grat_arp'] = "every {0} seconds".format(grat_arp['interval_sec'])
918        else:
919            info['grat_arp'] = "off"
920
921
922        return info
923
924
925    def get_port_state_name(self):
926        return self.STATES_MAP.get(self.state, "Unknown")
927
928    def get_layer_cfg (self):
929        return self.__attr['layer_cfg']
930
931    def get_rx_filter_mode (self):
932        return self.__attr['rx_filter_mode']
933
934    def is_virtual(self):
935        return self.info.get('is_virtual')
936
937    def is_l3_mode (self):
938        return self.get_layer_cfg()['ipv4']['state'] != 'none'
939
940    def is_resolved (self):
941        # for L3
942        if self.is_l3_mode():
943            return self.get_layer_cfg()['ipv4']['state'] != 'unresolved'
944        # for L2
945        else:
946            return self.get_layer_cfg()['ether']['state'] != 'unconfigured'
947
948
949    @writeable
950    def arp_resolve (self, retries):
951
952        # execute the ARP service
953        rc = RXServiceARP(self).execute(retries)
954        if not rc:
955            return rc
956
957        # fetch the data returned
958        arp_rc = rc.data()
959
960        # first invalidate current ARP if exists
961        rc = self.invalidate_arp()
962        if not rc:
963            return rc
964
965        # update the port with L3 full configuration
966        rc = self.set_l3_mode(self.get_layer_cfg()['ipv4']['src'], self.get_layer_cfg()['ipv4']['dst'], arp_rc['hwsrc'])
967        if not rc:
968            return rc
969
970        return self.ok('Port {0} - Recieved ARP reply from: {1}, hw: {2}'.format(self.port_id, arp_rc['psrc'], arp_rc['hwsrc']))
971
972
973
974    @writeable
975    def ping (self, ping_ipv4, pkt_size):
976        return RXServiceICMP(self, ping_ipv4, pkt_size).execute()
977
978
979    ################# stats handler ######################
980    def generate_port_stats(self):
981        return self.port_stats.generate_stats()
982
983    def generate_port_status(self):
984
985        info = self.get_formatted_info()
986
987        return {"driver":           info['driver'],
988                "description":      info.get('description', 'N/A')[:18],
989                "src MAC":          info['src_mac'],
990                "src IPv4":         info['src_ipv4'],
991                "Destination":      format_text("{0}".format(info['dest']), 'bold', 'red' if info['dest'] == 'unconfigured' else None),
992                "ARP Resolution":   format_text("{0}".format(info['arp']), 'bold', 'red' if info['arp'] == 'unresolved' else None),
993                "PCI Address":      info['pci_addr'],
994                "NUMA Node":        info['numa'],
995                "--": "",
996                "---": "",
997                "----": "",
998                "-----": "",
999                "link speed": "%g Gb/s" % info['speed'],
1000                "port status": info['status'],
1001                "link status": info['link'],
1002                "promiscuous" : info['prom'],
1003                "flow ctrl" : info['fc'],
1004
1005                "layer mode": format_text(info['layer_mode'], 'green' if info['layer_mode'] == 'IPv4' else 'magenta'),
1006                "RX Filter Mode": info['rx_filter_mode'],
1007                "RX Queueing": info['rx_queue'],
1008                "RX sniffer": info['rx_sniffer'],
1009                "Grat ARP": info['grat_arp'],
1010
1011                }
1012
1013    def clear_stats(self):
1014        return self.port_stats.clear_stats()
1015
1016
1017    def get_stats (self):
1018        return self.port_stats.get_stats()
1019
1020
1021    def invalidate_stats(self):
1022        return self.port_stats.invalidate()
1023
1024    ################# stream printout ######################
1025    def generate_loaded_streams_sum(self):
1026        if self.state == self.STATE_DOWN:
1027            return {}
1028
1029        data = {}
1030        for id, obj in self.streams.items():
1031
1032            # lazy build scapy repr.
1033            if not 'pkt_type' in obj:
1034                obj['pkt_type'] = STLPktBuilder.pkt_layers_desc_from_buffer(obj['pkt'])
1035
1036            data[id] = OrderedDict([ ('id',  id),
1037                                     ('packet_type',  obj['pkt_type']),
1038                                     ('L2 len',       len(obj['pkt']) + 4),
1039                                     ('mode',         obj['mode']),
1040                                     ('rate',         obj['rate']),
1041                                     ('next_stream',  obj['next_id'] if not '-1' else 'None')
1042                                    ])
1043
1044        return {"streams" : OrderedDict(sorted(data.items())) }
1045
1046
1047    ######## attributes are a complex type (dict) that might be manipulated through the async thread #############
1048
1049    # get in a thread safe manner a duplication of attributes
1050    def get_ts_attr (self):
1051        with self.attr_lock:
1052            return dict(self.__attr)
1053
1054    # set in a thread safe manner a new dict of attributes
1055    def set_ts_attr (self, new_attr):
1056        with self.attr_lock:
1057            self.__attr = new_attr
1058
1059
1060  ################# events handler ######################
1061    def async_event_port_job_done (self):
1062        # until thread is locked - order is important
1063        self.tx_stopped_ts = datetime.now()
1064        self.state = self.STATE_STREAMS
1065
1066        self.last_factor_type = None
1067
1068    def async_event_port_attr_changed (self, new_attr):
1069
1070        # get a thread safe duplicate
1071        cur_attr = self.get_ts_attr()
1072
1073        # check if anything changed
1074        if new_attr == cur_attr:
1075            return None
1076
1077        # generate before
1078        before = self.get_formatted_info(sync = False)
1079
1080        # update
1081        self.set_ts_attr(new_attr)
1082
1083        # generate after
1084        after = self.get_formatted_info(sync = False)
1085
1086        # return diff
1087        diff = {}
1088        for key, new_value in after.items():
1089            old_value = before.get(key, 'N/A')
1090            if new_value != old_value:
1091                diff[key] = (old_value, new_value)
1092
1093        return diff
1094
1095
1096    # rest of the events are used for TUI / read only sessions
1097    def async_event_port_stopped (self):
1098        if not self.is_acquired():
1099            self.state = self.STATE_STREAMS
1100
1101    def async_event_port_paused (self):
1102        if not self.is_acquired():
1103            self.state = self.STATE_PAUSE
1104
1105    def async_event_port_started (self):
1106        if not self.is_acquired():
1107            self.state = self.STATE_TX
1108
1109    def async_event_port_resumed (self):
1110        if not self.is_acquired():
1111            self.state = self.STATE_TX
1112
1113    def async_event_acquired (self, who):
1114        self.handler = None
1115        self.owner = who
1116
1117    def async_event_released (self):
1118        self.owner = ''
1119
1120
1121