trex_stateless_port.cpp revision d0c838e0
1/*
2 Itay Marom
3 Cisco Systems, Inc.
4*/
5
6/*
7Copyright (c) 2015-2015 Cisco Systems, Inc.
8
9Licensed under the Apache License, Version 2.0 (the "License");
10you may not use this file except in compliance with the License.
11You may obtain a copy of the License at
12
13    http://www.apache.org/licenses/LICENSE-2.0
14
15Unless required by applicable law or agreed to in writing, software
16distributed under the License is distributed on an "AS IS" BASIS,
17WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18See the License for the specific language governing permissions and
19limitations under the License.
20*/
21
22#include <trex_stateless.h>
23#include <trex_stateless_port.h>
24#include <trex_stateless_messaging.h>
25#include <trex_streams_compiler.h>
26#include <common/basic_utils.h>
27#include <common/captureFile.h>
28#include "trex_stateless_rx_defs.h"
29
30#include <string>
31
32#ifndef TREX_RPC_MOCK_SERVER
33
34// DPDK c++ issue
35#ifndef UINT8_MAX
36    #define UINT8_MAX 255
37#endif
38
39#ifndef UINT16_MAX
40    #define UINT16_MAX 0xFFFF
41#endif
42
43// DPDK c++ issue
44#endif
45
46#include <os_time.h>
47
48void
49port_id_to_cores(uint8_t port_id, std::vector<std::pair<uint8_t, uint8_t>> &cores_id_list);
50
51using namespace std;
52
53
54
55/***************************
56 * trex DP events handlers
57 *
58 **************************/
59class AsyncStopEvent : public TrexDpPortEvent {
60
61protected:
62    /**
63     * when an async event occurs (all cores have reported in)
64     *
65     * @author imarom (29-Feb-16)
66     */
67    virtual void on_event() {
68        get_port()->change_state(TrexStatelessPort::PORT_STATE_STREAMS);
69
70        get_port()->common_port_stop_actions(true);
71
72        assert(get_port()->m_pending_async_stop_event != TrexDpPortEvents::INVALID_ID);
73        get_port()->m_pending_async_stop_event = TrexDpPortEvents::INVALID_ID;
74    }
75
76    /**
77     * when a DP core encountered an error
78     *
79     * @author imarom (20-Apr-16)
80     */
81    virtual void on_error(int thread_id) {
82        Json::Value data;
83
84        data["port_id"]   = get_port()->get_port_id();
85        data["thread_id"] = thread_id;
86
87        get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_ERROR, data);
88    }
89};
90
91/*************************************
92 * Streams Feeder
93 * A class that holds a temporary
94 * clone of streams that can be
95 * manipulated
96 *
97 * this is a RAII object meant for
98 * graceful cleanup
99 ************************************/
100class StreamsFeeder {
101public:
102    StreamsFeeder(TrexStatelessPort *port) {
103
104        /* start pesimistic */
105        m_success = false;
106
107        /* fetch the original streams */
108        port->get_object_list(m_in_streams);
109
110        for (const TrexStream *in_stream : m_in_streams) {
111            TrexStream *out_stream = in_stream->clone(true);
112
113            get_stateless_obj()->m_rx_flow_stat.start_stream(out_stream);
114
115            m_out_streams.push_back(out_stream);
116        }
117    }
118
119    void set_status(bool status) {
120        m_success = status;
121    }
122
123    vector<TrexStream *> &get_streams() {
124        return m_out_streams;
125    }
126
127    /**
128     * RAII
129     */
130    ~StreamsFeeder() {
131        for (int i = 0; i < m_out_streams.size(); i++) {
132            TrexStream *out_stream = m_out_streams[i];
133            TrexStream *in_stream  = m_in_streams[i];
134
135            if (m_success) {
136                /* success path */
137                get_stateless_obj()->m_rx_flow_stat.copy_state(out_stream, in_stream);
138            } else {
139                /* fail path */
140                get_stateless_obj()->m_rx_flow_stat.stop_stream(out_stream);
141            }
142            delete out_stream;
143        }
144    }
145
146private:
147    vector<TrexStream *>  m_in_streams;
148    vector<TrexStream *>  m_out_streams;
149    bool                  m_success;
150};
151
152
153/***************************
154 * trex stateless port
155 *
156 **************************/
157TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api) : m_dp_events(this) {
158    std::vector<std::pair<uint8_t, uint8_t>> core_pair_list;
159
160    m_port_id            = port_id;
161    m_port_state         = PORT_STATE_IDLE;
162    m_platform_api       = api;
163
164    /* get the platform specific data */
165    api->get_interface_info(port_id, m_api_info);
166
167    /* get RX caps */
168    api->get_interface_stat_info(port_id, m_rx_count_num, m_rx_caps);
169
170    /* get the DP cores belonging to this port */
171    api->port_id_to_cores(m_port_id, core_pair_list);
172
173    for (auto core_pair : core_pair_list) {
174
175        /* send the core id */
176        m_cores_id_list.push_back(core_pair.first);
177    }
178
179    m_graph_obj = NULL;
180
181    m_pending_async_stop_event = TrexDpPortEvents::INVALID_ID;
182}
183
184TrexStatelessPort::~TrexStatelessPort() {
185
186    stop_traffic();
187    remove_and_delete_all_streams();
188}
189
190/**
191 * acquire the port
192 *
193 * @author imarom (09-Nov-15)
194 *
195 * @param user
196 * @param force
197 */
198void
199TrexStatelessPort::acquire(const std::string &user, uint32_t session_id, bool force) {
200
201    bool used_force = !get_owner().is_free() && force;
202
203    if (get_owner().is_free() || force) {
204        get_owner().own(user, session_id);
205
206    } else {
207        /* not same user or session id and not force - report error */
208        if (get_owner().get_name() == user) {
209            throw TrexException("port is already owned by another session of '" + user + "'");
210        } else {
211            throw TrexException("port is already taken by '" + get_owner().get_name() + "'");
212        }
213    }
214
215    Json::Value data;
216
217    data["port_id"]    = m_port_id;
218    data["who"]        = user;
219    data["session_id"] = session_id;
220    data["force"]      = used_force;
221
222    get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_ACQUIRED, data);
223
224}
225
226void
227TrexStatelessPort::release(void) {
228
229
230    Json::Value data;
231
232    data["port_id"]    = m_port_id;
233    data["who"]        = get_owner().get_name();
234    data["session_id"] = get_owner().get_session_id();
235
236    get_owner().release();
237
238    get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_RELEASED, data);
239}
240
241/**
242 * starts the traffic on the port
243 *
244 */
245void
246TrexStatelessPort::start_traffic(const TrexPortMultiplier &mul, double duration, bool force, uint64_t core_mask) {
247
248    /* command allowed only on state stream */
249    verify_state(PORT_STATE_STREAMS, "start");
250
251    /* just making sure no leftovers... */
252    delete_streams_graph();
253
254    /* on start - we can only provide absolute values */
255    assert(mul.m_op == TrexPortMultiplier::OP_ABS);
256
257    /* check link state */
258    if ( !m_platform_api->getPortAttrObj(m_port_id)->is_link_up() && !force ) {
259        throw TrexException("Link state is DOWN.");
260    }
261
262    /* caclulate the effective factor for DP */
263    double factor = calculate_effective_factor(mul, force);
264
265    StreamsFeeder feeder(this);
266
267    /* compiler it */
268    std::vector<TrexStreamsCompiledObj *> compiled_objs;
269    std::string fail_msg;
270
271    TrexStreamsCompiler compiler;
272    TrexDPCoreMask mask(get_dp_core_count(), core_mask);
273
274    bool rc = compiler.compile(m_port_id,
275                               feeder.get_streams(),
276                               compiled_objs,
277                               mask,
278                               factor,
279                               &fail_msg);
280
281    if (!rc) {
282        feeder.set_status(false);
283        throw TrexException(fail_msg);
284    }
285
286    feeder.set_status(true);
287
288    /* generate a message to all the relevant DP cores to stop transmitting */
289    assert(m_pending_async_stop_event == TrexDpPortEvents::INVALID_ID);
290    m_pending_async_stop_event = m_dp_events.create_event(new AsyncStopEvent());
291
292    /* update object status */
293    m_factor = factor;
294    m_last_all_streams_continues = compiled_objs[mask.get_active_cores()[0]]->get_all_streams_continues();
295    m_last_duration = duration;
296
297    change_state(PORT_STATE_TX);
298
299    /* update the DP - messages will be freed by the DP */
300    int index = 0;
301    for (auto core_id : m_cores_id_list) {
302
303        /* was the core assigned a compiled object ? */
304        if (compiled_objs[index]) {
305            TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(m_port_id,
306                                                                             m_pending_async_stop_event,
307                                                                             compiled_objs[index],
308                                                                             duration);
309            send_message_to_dp(core_id, start_msg);
310        } else {
311
312            /* mimic an end event */
313            m_dp_events.on_core_reporting_in(m_pending_async_stop_event, core_id);
314        }
315
316        index++;
317    }
318
319    /* for debug - this can be turn on */
320    //m_dp_events.barrier();
321
322    /* update subscribers */
323    Json::Value data;
324    data["port_id"] = m_port_id;
325    get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_STARTED, data);
326
327}
328
329
330bool TrexStatelessPort::is_active() const {
331    return   (  (m_port_state == PORT_STATE_TX)
332             || (m_port_state == PORT_STATE_PAUSE)
333             || (m_port_state == PORT_STATE_PCAP_TX)
334             );
335}
336
337/**
338 * stop traffic on port
339 *
340 * @author imarom (09-Nov-15)
341 *
342 * @return TrexStatelessPort::rc_e
343 */
344void
345TrexStatelessPort::stop_traffic(void) {
346    if (!is_active()) {
347        return;
348    }
349
350    /* delete any previous graphs */
351    delete_streams_graph();
352
353    /* to avoid race, first destroy any previous stop/pause events */
354    if (m_pending_async_stop_event != TrexDpPortEvents::INVALID_ID) {
355        m_dp_events.destroy_event(m_pending_async_stop_event);
356        m_pending_async_stop_event = TrexDpPortEvents::INVALID_ID;
357
358    }
359
360    /* generate a message to all the relevant DP cores to start transmitting */
361    TrexStatelessCpToDpMsgBase *stop_msg = new TrexStatelessDpStop(m_port_id);
362    send_message_to_all_dp(stop_msg);
363
364    /* a barrier - make sure all the DP cores stopped */
365    m_dp_events.barrier();
366
367    change_state(PORT_STATE_STREAMS);
368
369    common_port_stop_actions(false);
370}
371
372/**
373 * remove all RX filters from port
374 *
375 * @author imarom (28-Mar-16)
376 */
377void
378TrexStatelessPort::remove_rx_filters(void) {
379    /* only valid when IDLE or with streams and not TXing */
380    verify_state(PORT_STATE_STREAMS, "remove_rx_filters");
381
382    for (auto entry : m_stream_table) {
383        get_stateless_obj()->m_rx_flow_stat.stop_stream(entry.second);
384    }
385
386}
387
388/**
389 * when a port stops, perform various actions
390 *
391 */
392void
393TrexStatelessPort::common_port_stop_actions(bool async) {
394
395    Json::Value data;
396    data["port_id"] = m_port_id;
397
398    if (async) {
399        get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_FINISHED_TX, data);
400    } else {
401        get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_STOPPED, data);
402    }
403
404}
405
406/**
407 * core is considered active if it has a pending for async stop
408 *
409 */
410bool
411TrexStatelessPort::is_core_active(int core_id) {
412    return ( (m_pending_async_stop_event != TrexDpPortEvents::INVALID_ID) &&
413             (m_dp_events.is_core_pending_on_event(m_pending_async_stop_event, core_id))
414           );
415}
416
417void
418TrexStatelessPort::pause_traffic(void) {
419
420    verify_state(PORT_STATE_TX, "pause");
421
422    if (m_last_all_streams_continues == false) {
423        throw TrexException(" pause is supported when all streams are in continues mode ");
424    }
425
426    if ( m_last_duration>0.0 ) {
427        throw TrexException(" pause is supported when duration is not enable is start command ");
428    }
429
430    /* send a pause message */
431    TrexStatelessCpToDpMsgBase *pause_msg = new TrexStatelessDpPause(m_port_id);
432
433    /* send message to all cores */
434    send_message_to_all_dp(pause_msg, true);
435
436    /* make sure all DP cores paused */
437    m_dp_events.barrier();
438
439    /* change state */
440    change_state(PORT_STATE_PAUSE);
441
442    Json::Value data;
443    data["port_id"] = m_port_id;
444    get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_PAUSED, data);
445}
446
447
448void
449TrexStatelessPort::resume_traffic(void) {
450
451    verify_state(PORT_STATE_PAUSE, "resume");
452
453    /* generate a message to all the relevant DP cores to start transmitting */
454    TrexStatelessCpToDpMsgBase *resume_msg = new TrexStatelessDpResume(m_port_id);
455
456    send_message_to_all_dp(resume_msg, true);
457    change_state(PORT_STATE_TX);
458
459    Json::Value data;
460    data["port_id"] = m_port_id;
461    get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_RESUMED, data);
462}
463
464void
465TrexStatelessPort::update_traffic(const TrexPortMultiplier &mul, bool force) {
466
467    double factor;
468
469    verify_state(PORT_STATE_TX | PORT_STATE_PAUSE, "update");
470
471    /* generate a message to all the relevant DP cores to start transmitting */
472    double new_factor = calculate_effective_factor(mul, force);
473
474    switch (mul.m_op) {
475    case TrexPortMultiplier::OP_ABS:
476        factor = new_factor / m_factor;
477        break;
478
479    case TrexPortMultiplier::OP_ADD:
480        factor = (m_factor + new_factor) / m_factor;
481        break;
482
483    case TrexPortMultiplier::OP_SUB:
484        factor = (m_factor - new_factor) / m_factor;
485        if (factor <= 0) {
486            throw TrexException("Update request will lower traffic to less than zero");
487        }
488        break;
489
490    default:
491        assert(0);
492        break;
493    }
494
495    TrexStatelessCpToDpMsgBase *update_msg = new TrexStatelessDpUpdate(m_port_id, factor);
496    send_message_to_all_dp(update_msg, true);
497
498    m_factor *= factor;
499
500}
501
502void
503TrexStatelessPort::push_remote(const std::string &pcap_filename,
504                               double ipg_usec,
505                               double speedup,
506                               uint32_t count,
507                               double duration,
508                               bool is_dual) {
509
510    /* command allowed only on state stream */
511    verify_state(PORT_STATE_IDLE | PORT_STATE_STREAMS, "push_remote");
512
513    /* check that file exists */
514    std::stringstream ss;
515    CCapReaderBase *reader = CCapReaderFactory::CreateReader((char *)pcap_filename.c_str(), 0, ss);
516    if (!reader) {
517        throw TrexException(ss.str());
518    }
519
520    if ( (is_dual) && (reader->get_type() != ERF) ) {
521        throw TrexException("dual mode is only supported on ERF format");
522    }
523    delete reader;
524
525    /* only one core gets to play */
526    int tx_core = m_cores_id_list[0];
527
528    /* create async event */
529    assert(m_pending_async_stop_event == TrexDpPortEvents::INVALID_ID);
530    m_pending_async_stop_event = m_dp_events.create_event(new AsyncStopEvent());
531
532    /* mark all other cores as done */
533    for (int index = 1; index < m_cores_id_list.size(); index++) {
534        /* mimic an end event */
535        m_dp_events.on_core_reporting_in(m_pending_async_stop_event, m_cores_id_list[index]);
536    }
537
538    /* send a message to core */
539    change_state(PORT_STATE_PCAP_TX);
540    TrexStatelessCpToDpMsgBase *push_msg = new TrexStatelessDpPushPCAP(m_port_id,
541                                                                       m_pending_async_stop_event,
542                                                                       pcap_filename,
543                                                                       ipg_usec,
544                                                                       speedup,
545                                                                       count,
546                                                                       duration,
547                                                                       is_dual);
548    send_message_to_dp(tx_core, push_msg);
549
550    /* update subscribers */
551    Json::Value data;
552    data["port_id"] = m_port_id;
553    get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_STARTED, data);
554}
555
556std::string
557TrexStatelessPort::get_state_as_string() const {
558
559    switch (get_state()) {
560    case PORT_STATE_DOWN:
561        return "DOWN";
562
563    case PORT_STATE_IDLE:
564        return  "IDLE";
565
566    case PORT_STATE_STREAMS:
567        return "STREAMS";
568
569    case PORT_STATE_TX:
570        return "TX";
571
572    case PORT_STATE_PAUSE:
573        return "PAUSE";
574
575    case PORT_STATE_PCAP_TX:
576        return "PCAP_TX";
577    }
578
579    return "UNKNOWN";
580}
581
582int
583TrexStatelessPort::get_max_stream_id() const {
584    return m_stream_table.get_max_stream_id();
585}
586
587void
588TrexStatelessPort::get_properties(std::string &driver) {
589
590    driver = m_api_info.driver_name;
591}
592
593bool
594TrexStatelessPort::verify_state(int state, const char *cmd_name, bool should_throw) const {
595    if ( (state & m_port_state) == 0 ) {
596        if (should_throw) {
597            std::stringstream ss;
598            ss << "command '" << cmd_name << "' cannot be executed on current state: '" << get_state_as_string() << "'";
599            throw TrexException(ss.str());
600        } else {
601            return false;
602        }
603    }
604
605    return true;
606}
607
608void
609TrexStatelessPort::change_state(port_state_e new_state) {
610
611    m_port_state = new_state;
612}
613
614
615void
616TrexStatelessPort::encode_stats(Json::Value &port) {
617
618    TrexPlatformInterfaceStats stats;
619    m_platform_api->get_interface_stats(m_port_id, stats);
620
621    port["tx_bps"]          = stats.m_stats.m_tx_bps;
622    port["rx_bps"]          = stats.m_stats.m_rx_bps;
623
624    port["tx_pps"]          = stats.m_stats.m_tx_pps;
625    port["rx_pps"]          = stats.m_stats.m_rx_pps;
626
627    port["total_tx_pkts"]   = Json::Value::UInt64(stats.m_stats.m_total_tx_pkts);
628    port["total_rx_pkts"]   = Json::Value::UInt64(stats.m_stats.m_total_rx_pkts);
629
630    port["total_tx_bytes"]  = Json::Value::UInt64(stats.m_stats.m_total_tx_bytes);
631    port["total_rx_bytes"]  = Json::Value::UInt64(stats.m_stats.m_total_rx_bytes);
632
633    port["tx_rx_errors"]    = Json::Value::UInt64(stats.m_stats.m_tx_rx_errors);
634}
635
636void
637TrexStatelessPort::send_message_to_all_dp(TrexStatelessCpToDpMsgBase *msg, bool send_to_active_only) {
638
639    for (auto core_id : m_cores_id_list) {
640
641        /* skip non active cores if requested */
642        if ( (send_to_active_only) && (!is_core_active(core_id)) ) {
643            continue;
644        }
645
646        send_message_to_dp(core_id, msg->clone());
647    }
648
649    /* original was not sent - delete it */
650    delete msg;
651}
652
653void
654TrexStatelessPort::send_message_to_dp(uint8_t core_id, TrexStatelessCpToDpMsgBase *msg) {
655
656    /* send the message to the core */
657    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingCpToDp(core_id);
658    ring->Enqueue((CGenNode *)msg);
659}
660
661void
662TrexStatelessPort::send_message_to_rx(TrexStatelessCpToRxMsgBase *msg) {
663
664    /* send the message to the core */
665    CNodeRing *ring = CMsgIns::Ins()->getCpRx()->getRingCpToDp(0);
666    ring->Enqueue((CGenNode *)msg);
667}
668
669uint64_t
670TrexStatelessPort::get_port_speed_bps() const {
671    return (uint64_t) m_platform_api->getPortAttrObj(m_port_id)->get_link_speed() * 1000 * 1000;
672}
673
674static inline double
675bps_to_gbps(double bps) {
676    return (bps / (1000.0 * 1000 * 1000));
677}
678
679double
680TrexStatelessPort::calculate_effective_factor(const TrexPortMultiplier &mul, bool force) {
681
682    double factor = calculate_effective_factor_internal(mul);
683
684    /* did we exceeded the max L1 line rate ? */
685    double expected_l1_rate = m_graph_obj->get_max_bps_l1(factor);
686
687    /* if not force and exceeded - throw exception */
688    if ( (!force) && (expected_l1_rate > get_port_speed_bps()) ) {
689        stringstream ss;
690        ss << "Expected L1 B/W: '" << bps_to_gbps(expected_l1_rate) << " Gbps' exceeds port line rate: '" << bps_to_gbps(get_port_speed_bps()) << " Gbps'";
691        throw TrexException(ss.str());
692    }
693
694    /* L1 BW must be positive */
695    if (expected_l1_rate <= 0){
696        stringstream ss;
697        ss << "Effective bandwidth must be positive, got: " << expected_l1_rate;
698        throw TrexException(ss.str());
699    }
700
701    /* factor must be positive */
702    if (factor <= 0) {
703        stringstream ss;
704        ss << "Factor must be positive, got: " << factor;
705        throw TrexException(ss.str());
706    }
707
708    return factor;
709}
710
711double
712TrexStatelessPort::calculate_effective_factor_internal(const TrexPortMultiplier &mul) {
713
714    /* we now need the graph - generate it if we don't have it (happens once) */
715    if (!m_graph_obj) {
716        generate_streams_graph();
717    }
718
719    switch (mul.m_type) {
720
721    case TrexPortMultiplier::MUL_FACTOR:
722        return (mul.m_value);
723
724    case TrexPortMultiplier::MUL_BPS:
725        return m_graph_obj->get_factor_bps_l2(mul.m_value);
726
727    case TrexPortMultiplier::MUL_BPSL1:
728        return m_graph_obj->get_factor_bps_l1(mul.m_value);
729
730    case TrexPortMultiplier::MUL_PPS:
731        return m_graph_obj->get_factor_pps(mul.m_value);
732
733    case TrexPortMultiplier::MUL_PERCENTAGE:
734        /* if abs percentage is from the line speed - otherwise its from the current speed */
735
736        if (mul.m_op == TrexPortMultiplier::OP_ABS) {
737            double required = (mul.m_value / 100.0) * get_port_speed_bps();
738            return m_graph_obj->get_factor_bps_l1(required);
739        } else {
740            return (m_factor * (mul.m_value / 100.0));
741        }
742
743    default:
744        assert(0);
745    }
746
747}
748
749
750void
751TrexStatelessPort::generate_streams_graph() {
752
753    /* dispose of the old one */
754    if (m_graph_obj) {
755        delete_streams_graph();
756    }
757
758    /* fetch all the streams from the table */
759    vector<TrexStream *> streams;
760    get_object_list(streams);
761
762    TrexStreamsGraph graph;
763    m_graph_obj = graph.generate(streams);
764}
765
766void
767TrexStatelessPort::delete_streams_graph() {
768    if (m_graph_obj) {
769        delete m_graph_obj;
770        m_graph_obj = NULL;
771    }
772}
773
774
775
776/***************************
777 * port multiplier
778 *
779 **************************/
780const std::initializer_list<std::string> TrexPortMultiplier::g_types = {"raw", "bps", "bpsl1", "pps", "percentage"};
781const std::initializer_list<std::string> TrexPortMultiplier::g_ops   = {"abs", "add", "sub"};
782
783TrexPortMultiplier::
784TrexPortMultiplier(const std::string &type_str, const std::string &op_str, double value) {
785    mul_type_e type;
786    mul_op_e   op;
787
788    if (type_str == "raw") {
789        type = MUL_FACTOR;
790
791    } else if (type_str == "bps") {
792        type = MUL_BPS;
793
794    } else if (type_str == "bpsl1") {
795        type = MUL_BPSL1;
796
797    } else if (type_str == "pps") {
798        type = MUL_PPS;
799
800    } else if (type_str == "percentage") {
801        type = MUL_PERCENTAGE;
802    } else {
803        throw TrexException("bad type str: " + type_str);
804    }
805
806    if (op_str == "abs") {
807        op = OP_ABS;
808
809    } else if (op_str == "add") {
810        op = OP_ADD;
811
812    } else if (op_str == "sub") {
813        op = OP_SUB;
814
815    } else {
816        throw TrexException("bad op str: " + op_str);
817    }
818
819    m_type  = type;
820    m_op    = op;
821    m_value = value;
822
823}
824
825const TrexStreamsGraphObj *
826TrexStatelessPort::validate(void) {
827
828    /* first compile the graph */
829
830    vector<TrexStream *> streams;
831    get_object_list(streams);
832
833    if (streams.size() == 0) {
834        throw TrexException("no streams attached to port");
835    }
836
837    TrexStreamsCompiler compiler;
838
839    /* TODO: think of this mask...*/
840    TrexDPCoreMask core_mask(get_dp_core_count(), TrexDPCoreMask::MASK_ALL);
841
842    std::vector<TrexStreamsCompiledObj *> compiled_objs;
843
844    std::string fail_msg;
845    bool rc = compiler.compile(m_port_id,
846                               streams,
847                               compiled_objs,
848                               core_mask,
849                               1.0,
850                               &fail_msg);
851    if (!rc) {
852        throw TrexException(fail_msg);
853    }
854
855    for (auto obj : compiled_objs) {
856        delete obj;
857    }
858
859    /* now create a stream graph */
860    if (!m_graph_obj) {
861        generate_streams_graph();
862    }
863
864    return m_graph_obj;
865}
866
867
868
869void
870TrexStatelessPort::get_port_effective_rate(double &pps,
871                                           double &bps_L1,
872                                           double &bps_L2,
873                                           double &percentage) {
874
875    if (get_stream_count() == 0) {
876        return;
877    }
878
879    if (!m_graph_obj) {
880        generate_streams_graph();
881    }
882
883    pps        = m_graph_obj->get_max_pps(m_factor);
884    bps_L1     = m_graph_obj->get_max_bps_l1(m_factor);
885    bps_L2     = m_graph_obj->get_max_bps_l2(m_factor);
886    percentage = (bps_L1 / get_port_speed_bps()) * 100.0;
887
888}
889
890void
891TrexStatelessPort::get_pci_info(std::string &pci_addr, int &numa_node) {
892    pci_addr  = m_api_info.pci_addr;
893    numa_node = m_api_info.numa_node;
894}
895
896void
897TrexStatelessPort::add_stream(TrexStream *stream) {
898
899    verify_state(PORT_STATE_IDLE | PORT_STATE_STREAMS, "add_stream");
900
901    get_stateless_obj()->m_rx_flow_stat.add_stream(stream);
902
903    m_stream_table.add_stream(stream);
904    delete_streams_graph();
905
906    change_state(PORT_STATE_STREAMS);
907}
908
909void
910TrexStatelessPort::remove_stream(TrexStream *stream) {
911
912    verify_state(PORT_STATE_STREAMS, "remove_stream");
913
914    get_stateless_obj()->m_rx_flow_stat.del_stream(stream);
915
916    m_stream_table.remove_stream(stream);
917    delete_streams_graph();
918
919    if (m_stream_table.size() == 0) {
920        change_state(PORT_STATE_IDLE);
921    }
922}
923
924void
925TrexStatelessPort::remove_and_delete_all_streams() {
926    verify_state(PORT_STATE_IDLE | PORT_STATE_STREAMS, "remove_and_delete_all_streams");
927
928    vector<TrexStream *> streams;
929    get_object_list(streams);
930
931    for (auto stream : streams) {
932        remove_stream(stream);
933        delete stream;
934    }
935}
936
937void
938TrexStatelessPort::start_rx_capture(const std::string &pcap_filename, uint64_t limit) {
939
940    m_rx_features_info.m_rx_capture_info.enable(pcap_filename, limit);
941
942    TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStartCapture(m_port_id, m_rx_features_info.m_rx_capture_info);
943    send_message_to_rx(msg);
944}
945
946void
947TrexStatelessPort::stop_rx_capture() {
948    TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStopCapture(m_port_id);
949    send_message_to_rx(msg);
950    m_rx_features_info.m_rx_capture_info.disable();
951}
952
953void
954TrexStatelessPort::start_rx_queue(uint64_t size) {
955
956    m_rx_features_info.m_rx_queue_info.enable(size);
957
958    TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStartQueue(m_port_id, m_rx_features_info.m_rx_queue_info);
959    send_message_to_rx(msg);
960}
961
962void
963TrexStatelessPort::stop_rx_queue() {
964    TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStopQueue(m_port_id);
965    send_message_to_rx(msg);
966    m_rx_features_info.m_rx_queue_info.disable();
967}
968
969
970RXPacketBuffer *
971TrexStatelessPort::get_rx_queue_pkts() {
972
973    if (m_rx_features_info.m_rx_queue_info.is_empty()) {
974        return NULL;
975    }
976
977    /* ask RX core for the pkt queue */
978    TrexStatelessMsgReply<RXPacketBuffer *> msg_reply;
979
980    TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxQueueGetPkts(m_port_id, msg_reply);
981    send_message_to_rx(msg);
982
983    RXPacketBuffer *pkt_buffer = msg_reply.wait_for_reply();
984    return pkt_buffer;
985}
986
987/************* Trex Port Owner **************/
988
989TrexPortOwner::TrexPortOwner() {
990    m_is_free = true;
991    m_session_id = 0;
992
993    /* for handlers random generation */
994    m_seed = time(NULL);
995}
996
997const std::string TrexPortOwner::g_unowned_name = "<FREE>";
998const std::string TrexPortOwner::g_unowned_handler = "";
999