trex_stl_port.py revision 04eae221
1
2from collections import namedtuple, OrderedDict
3
4from .trex_stl_packet_builder_scapy import STLPktBuilder
5from .trex_stl_streams import STLStream
6from .trex_stl_types import *
7from . import trex_stl_stats
8
9import base64
10import copy
11from datetime import datetime, timedelta
12
13StreamOnPort = namedtuple('StreamOnPort', ['compiled_stream', 'metadata'])
14
15########## utlity ############
16def mult_to_factor (mult, max_bps_l2, max_pps, line_util):
17    if mult['type'] == 'raw':
18        return mult['value']
19
20    if mult['type'] == 'bps':
21        return mult['value'] / max_bps_l2
22
23    if mult['type'] == 'pps':
24        return mult['value'] / max_pps
25
26    if mult['type'] == 'percentage':
27        return mult['value'] / line_util
28
29
30# describes a single port
31class Port(object):
32    STATE_DOWN         = 0
33    STATE_IDLE         = 1
34    STATE_STREAMS      = 2
35    STATE_TX           = 3
36    STATE_PAUSE        = 4
37    STATE_PCAP_TX      = 5
38
39    MASK_ALL = ((1 << 64) - 1)
40
41    PortState = namedtuple('PortState', ['state_id', 'state_name'])
42    STATES_MAP = {STATE_DOWN: "DOWN",
43                  STATE_IDLE: "IDLE",
44                  STATE_STREAMS: "IDLE",
45                  STATE_TX: "ACTIVE",
46                  STATE_PAUSE: "PAUSE",
47                  STATE_PCAP_TX : "ACTIVE"}
48
49
50    def __init__ (self, port_id, user, comm_link, session_id, info):
51        self.port_id = port_id
52        self.state = self.STATE_IDLE
53        self.handler = None
54        self.comm_link = comm_link
55        self.transmit = comm_link.transmit
56        self.transmit_batch = comm_link.transmit_batch
57        self.user = user
58
59        self.info = dict(info)
60
61        self.streams = {}
62        self.profile = None
63        self.session_id = session_id
64        self.attr = {}
65
66        self.port_stats = trex_stl_stats.CPortStats(self)
67
68        self.next_available_id = 1
69        self.tx_stopped_ts = None
70        self.has_rx_streams = False
71
72        self.owner = ''
73        self.last_factor_type = None
74
75    # decorator to verify port is up
76    def up(func):
77        def func_wrapper(*args):
78            port = args[0]
79
80            if not port.is_up():
81                return port.err("{0} - port is down".format(func.__name__))
82
83            return func(*args)
84
85        return func_wrapper
86
87    # owned
88    def owned(func):
89        def func_wrapper(*args):
90            port = args[0]
91
92            if not port.is_up():
93                return port.err("{0} - port is down".format(func.__name__))
94
95            if not port.is_acquired():
96                return port.err("{0} - port is not owned".format(func.__name__))
97
98            return func(*args)
99
100        return func_wrapper
101
102
103    # decorator to check server is readable (port not down and etc.)
104    def writeable(func):
105        def func_wrapper(*args, **kwargs):
106            port = args[0]
107
108            if not port.is_up():
109                return port.err("{0} - port is down".format(func.__name__))
110
111            if not port.is_acquired():
112                return port.err("{0} - port is not owned".format(func.__name__))
113
114            if not port.is_writeable():
115                return port.err("{0} - port is not in a writeable state".format(func.__name__))
116
117            return func(*args, **kwargs)
118
119        return func_wrapper
120
121
122
123    def err(self, msg):
124        return RC_ERR("port {0} : {1}\n".format(self.port_id, msg))
125
126    def ok(self, data = ""):
127        return RC_OK(data)
128
129    def get_speed_bps (self):
130        return (self.info['speed'] * 1000 * 1000 * 1000)
131
132    def get_formatted_speed (self):
133        return "{0} Gbps".format(self.info['speed'])
134
135    def is_acquired(self):
136        return (self.handler != None)
137
138    def is_up (self):
139        return (self.state != self.STATE_DOWN)
140
141    def is_active(self):
142        return (self.state == self.STATE_TX ) or (self.state == self.STATE_PAUSE) or (self.state == self.STATE_PCAP_TX)
143
144    def is_transmitting (self):
145        return (self.state == self.STATE_TX) or (self.state == self.STATE_PCAP_TX)
146
147    def is_paused (self):
148        return (self.state == self.STATE_PAUSE)
149
150    def is_writeable (self):
151        # operations on port can be done on state idle or state streams
152        return ((self.state == self.STATE_IDLE) or (self.state == self.STATE_STREAMS))
153
154    def get_owner (self):
155        if self.is_acquired():
156            return self.user
157        else:
158            return self.owner
159
160    def __allocate_stream_id (self):
161        id = self.next_available_id
162        self.next_available_id += 1
163        return id
164
165
166    # take the port
167    @up
168    def acquire(self, force = False, sync_streams = True):
169        params = {"port_id":     self.port_id,
170                  "user":        self.user,
171                  "session_id":  self.session_id,
172                  "force":       force}
173
174        rc = self.transmit("acquire", params)
175        if not rc:
176            return self.err(rc.err())
177
178        self.handler = rc.data()
179
180        if sync_streams:
181            return self.sync_streams()
182        else:
183            return self.ok()
184
185
186    # sync all the streams with the server
187    @up
188    def sync_streams (self):
189        params = {"port_id": self.port_id}
190
191        rc = self.transmit("get_all_streams", params)
192        if rc.bad():
193            return self.err(rc.err())
194
195        for k, v in rc.data()['streams'].items():
196            self.streams[k] = {'next_id': v['next_stream_id'],
197                               'pkt'    : base64.b64decode(v['packet']['binary']),
198                               'mode'   : v['mode']['type'],
199                               'rate'   : STLStream.get_rate_from_field(v['mode']['rate'])}
200        return self.ok()
201
202    # release the port
203    @up
204    def release(self):
205        params = {"port_id": self.port_id,
206                  "handler": self.handler}
207
208        rc = self.transmit("release", params)
209
210        if rc.good():
211
212            self.handler = None
213            self.owner = ''
214
215            return self.ok()
216        else:
217            return self.err(rc.err())
218
219
220
221    @up
222    def sync(self):
223
224        params = {"port_id": self.port_id}
225
226        rc = self.transmit("get_port_status", params)
227        if rc.bad():
228            return self.err(rc.err())
229
230        # sync the port
231        port_state = rc.data()['state']
232
233        if port_state == "DOWN":
234            self.state = self.STATE_DOWN
235        elif port_state == "IDLE":
236            self.state = self.STATE_IDLE
237        elif port_state == "STREAMS":
238            self.state = self.STATE_STREAMS
239        elif port_state == "TX":
240            self.state = self.STATE_TX
241        elif port_state == "PAUSE":
242            self.state = self.STATE_PAUSE
243        elif port_state == "PCAP_TX":
244            self.state = self.STATE_PCAP_TX
245        else:
246            raise Exception("port {0}: bad state received from server '{1}'".format(self.port_id, port_state))
247
248        self.owner = rc.data()['owner']
249
250        self.next_available_id = int(rc.data()['max_stream_id']) + 1
251
252        # attributes
253        self.attr = rc.data()['attr']
254
255
256        return self.ok()
257
258
259
260    # add streams
261    @writeable
262    def add_streams (self, streams_list):
263
264        # listify
265        streams_list = streams_list if isinstance(streams_list, list) else [streams_list]
266
267        lookup = {}
268
269        # allocate IDs
270        for stream in streams_list:
271
272            # allocate stream id
273            stream_id = stream.get_id() if stream.get_id() is not None else self.__allocate_stream_id()
274            if stream_id in self.streams:
275                return self.err('Stream ID: {0} already exists'.format(stream_id))
276
277            # name
278            name = stream.get_name() if stream.get_name() is not None else id(stream)
279            if name in lookup:
280                return self.err("multiple streams with duplicate name: '{0}'".format(name))
281            lookup[name] = stream_id
282
283        batch = []
284        for stream in streams_list:
285
286            name = stream.get_name() if stream.get_name() is not None else id(stream)
287            stream_id = lookup[name]
288            next_id = -1
289
290            next = stream.get_next()
291            if next:
292                if not next in lookup:
293                    return self.err("stream dependency error - unable to find '{0}'".format(next))
294                next_id = lookup[next]
295
296            stream_json = stream.to_json()
297            stream_json['next_stream_id'] = next_id
298
299            params = {"handler": self.handler,
300                      "port_id": self.port_id,
301                      "stream_id": stream_id,
302                      "stream": stream_json}
303
304            cmd = RpcCmdData('add_stream', params, 'core')
305            batch.append(cmd)
306
307
308        rc = self.transmit_batch(batch)
309
310        ret = RC()
311        for i, single_rc in enumerate(rc):
312            if single_rc.rc:
313                stream_id = batch[i].params['stream_id']
314                next_id   = batch[i].params['stream']['next_stream_id']
315                self.streams[stream_id] = {'next_id'        : next_id,
316                                           'pkt'            : streams_list[i].get_pkt(),
317                                           'mode'           : streams_list[i].get_mode(),
318                                           'rate'           : streams_list[i].get_rate(),
319                                           'has_flow_stats' : streams_list[i].has_flow_stats()}
320
321                ret.add(RC_OK(data = stream_id))
322
323                self.has_rx_streams = self.has_rx_streams or streams_list[i].has_flow_stats()
324
325            else:
326                ret.add(RC(*single_rc))
327
328        self.state = self.STATE_STREAMS if (len(self.streams) > 0) else self.STATE_IDLE
329
330        return ret if ret else self.err(str(ret))
331
332
333
334    # remove stream from port
335    @writeable
336    def remove_streams (self, stream_id_list):
337
338        # single element to list
339        stream_id_list = stream_id_list if isinstance(stream_id_list, list) else [stream_id_list]
340
341        # verify existance
342        if not all([stream_id in self.streams for stream_id in stream_id_list]):
343            return self.err("stream {0} does not exists".format(stream_id))
344
345        batch = []
346
347        for stream_id in stream_id_list:
348            params = {"handler": self.handler,
349                      "port_id": self.port_id,
350                      "stream_id": stream_id}
351
352            cmd = RpcCmdData('remove_stream', params, 'core')
353            batch.append(cmd)
354
355
356        rc = self.transmit_batch(batch)
357        for i, single_rc in enumerate(rc):
358            if single_rc:
359                id = batch[i].params['stream_id']
360                del self.streams[id]
361
362        self.state = self.STATE_STREAMS if (len(self.streams) > 0) else self.STATE_IDLE
363
364        # recheck if any RX stats streams present on the port
365        self.has_rx_streams = any([stream['has_flow_stats'] for stream in self.streams.values()])
366
367        return self.ok() if rc else self.err(rc.err())
368
369
370    # remove all the streams
371    @writeable
372    def remove_all_streams (self):
373
374        params = {"handler": self.handler,
375                  "port_id": self.port_id}
376
377        rc = self.transmit("remove_all_streams", params)
378        if not rc:
379            return self.err(rc.err())
380
381        self.streams = {}
382
383        self.state = self.STATE_IDLE
384        self.has_rx_streams = False
385
386        return self.ok()
387
388
389    # get a specific stream
390    def get_stream (self, stream_id):
391        if stream_id in self.streams:
392            return self.streams[stream_id]
393        else:
394            return None
395
396    def get_all_streams (self):
397        return self.streams
398
399
400    @writeable
401    def start (self, mul, duration, force, mask):
402
403        if self.state == self.STATE_IDLE:
404            return self.err("unable to start traffic - no streams attached to port")
405
406        params = {"handler":    self.handler,
407                  "port_id":    self.port_id,
408                  "mul":        mul,
409                  "duration":   duration,
410                  "force":      force,
411                  "core_mask":  mask if mask is not None else self.MASK_ALL}
412
413        # must set this before to avoid race with the async response
414        last_state = self.state
415        self.state = self.STATE_TX
416
417        rc = self.transmit("start_traffic", params)
418
419        if rc.bad():
420            self.state = last_state
421            return self.err(rc.err())
422
423        # save this for TUI
424        self.last_factor_type = mul['type']
425
426        return self.ok()
427
428
429    # stop traffic
430    # with force ignores the cached state and sends the command
431    @owned
432    def stop (self, force = False):
433
434        # if not is not active and not force - go back
435        if not self.is_active() and not force:
436            return self.ok()
437
438        params = {"handler": self.handler,
439                  "port_id": self.port_id}
440
441        rc = self.transmit("stop_traffic", params)
442        if rc.bad():
443            return self.err(rc.err())
444
445        self.state = self.STATE_STREAMS
446        self.last_factor_type = None
447
448        # timestamp for last tx
449        self.tx_stopped_ts = datetime.now()
450
451        return self.ok()
452
453
454    # return True if port has any stream configured with RX stats
455    def has_rx_enabled (self):
456        return self.has_rx_streams
457
458
459    # return true if rx_delay_ms has passed since the last port stop
460    def has_rx_delay_expired (self, rx_delay_ms):
461        assert(self.has_rx_enabled())
462
463        # if active - it's not safe to remove RX filters
464        if self.is_active():
465            return False
466
467        # either no timestamp present or time has already passed
468        return not self.tx_stopped_ts or (datetime.now() - self.tx_stopped_ts) > timedelta(milliseconds = rx_delay_ms)
469
470
471    @writeable
472    def remove_rx_filters (self):
473        assert(self.has_rx_enabled())
474
475        if self.state == self.STATE_IDLE:
476            return self.ok()
477
478
479        params = {"handler": self.handler,
480                  "port_id": self.port_id}
481
482        rc = self.transmit("remove_rx_filters", params)
483        if rc.bad():
484            return self.err(rc.err())
485
486        return self.ok()
487
488    @owned
489    def pause (self):
490
491        if (self.state == self.STATE_PCAP_TX) :
492            return self.err("pause is not supported during PCAP TX")
493
494        if (self.state != self.STATE_TX) :
495            return self.err("port is not transmitting")
496
497        params = {"handler": self.handler,
498                  "port_id": self.port_id}
499
500        rc  = self.transmit("pause_traffic", params)
501        if rc.bad():
502            return self.err(rc.err())
503
504        self.state = self.STATE_PAUSE
505
506        return self.ok()
507
508    @owned
509    def resume (self):
510
511        if (self.state != self.STATE_PAUSE) :
512            return self.err("port is not in pause mode")
513
514        params = {"handler": self.handler,
515                  "port_id": self.port_id}
516
517        # only valid state after stop
518
519        rc = self.transmit("resume_traffic", params)
520        if rc.bad():
521            return self.err(rc.err())
522
523        self.state = self.STATE_TX
524
525        return self.ok()
526
527    @owned
528    def update (self, mul, force):
529
530        if (self.state == self.STATE_PCAP_TX) :
531            return self.err("update is not supported during PCAP TX")
532
533        if (self.state != self.STATE_TX) :
534            return self.err("port is not transmitting")
535
536        params = {"handler": self.handler,
537                  "port_id": self.port_id,
538                  "mul":     mul,
539                  "force":   force}
540
541        rc = self.transmit("update_traffic", params)
542        if rc.bad():
543            return self.err(rc.err())
544
545        # save this for TUI
546        self.last_factor_type = mul['type']
547
548        return self.ok()
549
550    @owned
551    def validate (self):
552
553        if (self.state == self.STATE_IDLE):
554            return self.err("no streams attached to port")
555
556        params = {"handler": self.handler,
557                  "port_id": self.port_id}
558
559        rc = self.transmit("validate", params)
560        if rc.bad():
561            return self.err(rc.err())
562
563        self.profile = rc.data()
564
565        return self.ok()
566
567
568    @owned
569    def set_attr (self, attr_dict):
570
571        params = {"handler": self.handler,
572                  "port_id": self.port_id,
573                  "attr": attr_dict}
574
575        rc = self.transmit("set_port_attr", params)
576        if rc.bad():
577            return self.err(rc.err())
578
579
580        self.attr.update(attr_dict)
581
582        return self.ok()
583
584    @writeable
585    def push_remote (self, pcap_filename, ipg_usec, speedup, count, duration, is_dual, slave_handler):
586
587        params = {"handler": self.handler,
588                  "port_id": self.port_id,
589                  "pcap_filename": pcap_filename,
590                  "ipg_usec": ipg_usec if ipg_usec is not None else -1,
591                  "speedup": speedup,
592                  "count": count,
593                  "duration": duration,
594                  "is_dual": is_dual,
595                  "slave_handler": slave_handler}
596
597        rc = self.transmit("push_remote", params)
598        if rc.bad():
599            return self.err(rc.err())
600
601        self.state = self.STATE_PCAP_TX
602        return self.ok()
603
604
605    def get_profile (self):
606        return self.profile
607
608
609    def print_profile (self, mult, duration):
610        if not self.get_profile():
611            return
612
613        rate = self.get_profile()['rate']
614        graph = self.get_profile()['graph']
615
616        print(format_text("Profile Map Per Port\n", 'underline', 'bold'))
617
618        factor = mult_to_factor(mult, rate['max_bps_l2'], rate['max_pps'], rate['max_line_util'])
619
620        print("Profile max BPS L2    (base / req):   {:^12} / {:^12}".format(format_num(rate['max_bps_l2'], suffix = "bps"),
621                                                                             format_num(rate['max_bps_l2'] * factor, suffix = "bps")))
622
623        print("Profile max BPS L1    (base / req):   {:^12} / {:^12}".format(format_num(rate['max_bps_l1'], suffix = "bps"),
624                                                                             format_num(rate['max_bps_l1'] * factor, suffix = "bps")))
625
626        print("Profile max PPS       (base / req):   {:^12} / {:^12}".format(format_num(rate['max_pps'], suffix = "pps"),
627                                                                             format_num(rate['max_pps'] * factor, suffix = "pps"),))
628
629        print("Profile line util.    (base / req):   {:^12} / {:^12}".format(format_percentage(rate['max_line_util']),
630                                                                             format_percentage(rate['max_line_util'] * factor)))
631
632
633        # duration
634        exp_time_base_sec = graph['expected_duration'] / (1000 * 1000)
635        exp_time_factor_sec = exp_time_base_sec / factor
636
637        # user configured a duration
638        if duration > 0:
639            if exp_time_factor_sec > 0:
640                exp_time_factor_sec = min(exp_time_factor_sec, duration)
641            else:
642                exp_time_factor_sec = duration
643
644
645        print("Duration              (base / req):   {:^12} / {:^12}".format(format_time(exp_time_base_sec),
646                                                                             format_time(exp_time_factor_sec)))
647        print("\n")
648
649    # generate port info
650    def get_info (self):
651        info = dict(self.info)
652
653        info['status']       = self.get_port_state_name()
654
655        if self.attr.get('promiscuous'):
656            info['prom'] = "on" if self.attr['promiscuous']['enabled'] else "off"
657        else:
658            info['prom'] = "N/A"
659
660        return info
661
662
663    def get_port_state_name(self):
664        return self.STATES_MAP.get(self.state, "Unknown")
665
666    ################# stats handler ######################
667    def generate_port_stats(self):
668        return self.port_stats.generate_stats()
669
670    def generate_port_status(self):
671
672        info = self.get_info()
673
674        return {"driver":        info['driver'],
675                "HW src mac":  info['hw_macaddr'],
676                "SW src mac":  info['src_macaddr'],
677                "SW dst mac":  info['dst_macaddr'],
678                "PCI Address": info['pci_addr'],
679                "NUMA Node":   info['numa'],
680                "--": "",
681                "---": "",
682                "maximum": "{speed} Gb/s".format(speed=info['speed']),
683                "status": info['status'],
684                "promiscuous" : info['prom']
685                }
686
687    def clear_stats(self):
688        return self.port_stats.clear_stats()
689
690
691    def get_stats (self):
692        return self.port_stats.get_stats()
693
694
695    def invalidate_stats(self):
696        return self.port_stats.invalidate()
697
698    ################# stream printout ######################
699    def generate_loaded_streams_sum(self):
700        if self.state == self.STATE_DOWN:
701            return {}
702
703        data = {}
704        for id, obj in self.streams.items():
705
706            # lazy build scapy repr.
707            if not 'pkt_type' in obj:
708                obj['pkt_type'] = STLPktBuilder.pkt_layers_desc_from_buffer(obj['pkt'])
709
710            data[id] = OrderedDict([ ('id',  id),
711                                     ('packet_type',  obj['pkt_type']),
712                                     ('L2 len',       len(obj['pkt']) + 4),
713                                     ('mode',         obj['mode']),
714                                     ('rate',         obj['rate']),
715                                     ('next_stream',  obj['next_id'] if not '-1' else 'None')
716                                    ])
717
718        return {"streams" : OrderedDict(sorted(data.items())) }
719
720
721
722  ################# events handler ######################
723    def async_event_port_job_done (self):
724        # until thread is locked - order is important
725        self.tx_stopped_ts = datetime.now()
726        self.state = self.STATE_STREAMS
727        self.last_factor_type = None
728
729    # rest of the events are used for TUI / read only sessions
730    def async_event_port_stopped (self):
731        if not self.is_acquired():
732            self.state = self.STATE_STREAMS
733
734    def async_event_port_paused (self):
735        if not self.is_acquired():
736            self.state = self.STATE_PAUSE
737
738    def async_event_port_started (self):
739        if not self.is_acquired():
740            self.state = self.STATE_TX
741
742    def async_event_port_resumed (self):
743        if not self.is_acquired():
744            self.state = self.STATE_TX
745
746    def async_event_acquired (self, who):
747        self.handler = None
748        self.owner = who
749
750    def async_event_released (self):
751        self.owner = ''
752
753