trex_stateless_port.cpp revision 1016c3d4
1/*
2 Itay Marom
3 Cisco Systems, Inc.
4*/
5
6/*
7Copyright (c) 2015-2015 Cisco Systems, Inc.
8
9Licensed under the Apache License, Version 2.0 (the "License");
10you may not use this file except in compliance with the License.
11You may obtain a copy of the License at
12
13    http://www.apache.org/licenses/LICENSE-2.0
14
15Unless required by applicable law or agreed to in writing, software
16distributed under the License is distributed on an "AS IS" BASIS,
17WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18See the License for the specific language governing permissions and
19limitations under the License.
20*/
21
22#include <trex_stateless.h>
23#include <trex_stateless_port.h>
24#include <trex_stateless_messaging.h>
25#include <trex_streams_compiler.h>
26#include <common/basic_utils.h>
27#include <common/captureFile.h>
28
29#include <string>
30
31#ifndef TREX_RPC_MOCK_SERVER
32
33// DPDK c++ issue
34#ifndef UINT8_MAX
35    #define UINT8_MAX 255
36#endif
37
38#ifndef UINT16_MAX
39    #define UINT16_MAX 0xFFFF
40#endif
41
42// DPDK c++ issue
43#endif
44
45#include <os_time.h>
46
47void
48port_id_to_cores(uint8_t port_id, std::vector<std::pair<uint8_t, uint8_t>> &cores_id_list);
49
50using namespace std;
51
52
53
54/***************************
55 * trex DP events handlers
56 *
57 **************************/
58class AsyncStopEvent : public TrexDpPortEvent {
59
60protected:
61    /**
62     * when an async event occurs (all cores have reported in)
63     *
64     * @author imarom (29-Feb-16)
65     */
66    virtual void on_event() {
67        get_port()->change_state(TrexStatelessPort::PORT_STATE_STREAMS);
68
69        get_port()->common_port_stop_actions(true);
70
71        assert(get_port()->m_pending_async_stop_event != TrexDpPortEvents::INVALID_ID);
72        get_port()->m_pending_async_stop_event = TrexDpPortEvents::INVALID_ID;
73    }
74
75    /**
76     * when a DP core encountered an error
77     *
78     * @author imarom (20-Apr-16)
79     */
80    virtual void on_error(int thread_id) {
81        Json::Value data;
82
83        data["port_id"]   = get_port()->get_port_id();
84        data["thread_id"] = thread_id;
85
86        get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_ERROR, data);
87    }
88};
89
90/*************************************
91 * Streams Feeder
92 * A class that holds a temporary
93 * clone of streams that can be
94 * manipulated
95 *
96 * this is a RAII object meant for
97 * graceful cleanup
98 ************************************/
99class StreamsFeeder {
100public:
101    StreamsFeeder(TrexStatelessPort *port) {
102
103        /* start pesimistic */
104        m_success = false;
105
106        /* fetch the original streams */
107        port->get_object_list(m_in_streams);
108
109        for (const TrexStream *in_stream : m_in_streams) {
110            TrexStream *out_stream = in_stream->clone(true);
111
112            get_stateless_obj()->m_rx_flow_stat.start_stream(out_stream);
113
114            m_out_streams.push_back(out_stream);
115        }
116    }
117
118    void set_status(bool status) {
119        m_success = status;
120    }
121
122    vector<TrexStream *> &get_streams() {
123        return m_out_streams;
124    }
125
126    /**
127     * RAII
128     */
129    ~StreamsFeeder() {
130        for (int i = 0; i < m_out_streams.size(); i++) {
131            TrexStream *out_stream = m_out_streams[i];
132            TrexStream *in_stream  = m_in_streams[i];
133
134            if (m_success) {
135                /* success path */
136                get_stateless_obj()->m_rx_flow_stat.copy_state(out_stream, in_stream);
137            } else {
138                /* fail path */
139                get_stateless_obj()->m_rx_flow_stat.stop_stream(out_stream);
140            }
141            delete out_stream;
142        }
143    }
144
145private:
146    vector<TrexStream *>  m_in_streams;
147    vector<TrexStream *>  m_out_streams;
148    bool                  m_success;
149};
150
151
152/***************************
153 * trex stateless port
154 *
155 **************************/
156TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api) : platform_api(api), m_dp_events(this) {
157    std::vector<std::pair<uint8_t, uint8_t>> core_pair_list;
158
159    m_port_id = port_id;
160    m_port_state = PORT_STATE_IDLE;
161
162    /* get the platform specific data */
163    api->get_interface_info(port_id, m_api_info);
164
165    /* get RX caps */
166    api->get_interface_stat_info(port_id, m_rx_count_num, m_rx_caps);
167
168    /* get the DP cores belonging to this port */
169    api->port_id_to_cores(m_port_id, core_pair_list);
170
171    for (auto core_pair : core_pair_list) {
172
173        /* send the core id */
174        m_cores_id_list.push_back(core_pair.first);
175    }
176
177    m_graph_obj = NULL;
178
179    m_pending_async_stop_event = TrexDpPortEvents::INVALID_ID;
180}
181
182TrexStatelessPort::~TrexStatelessPort() {
183
184    stop_traffic();
185    remove_and_delete_all_streams();
186}
187
188/**
189 * acquire the port
190 *
191 * @author imarom (09-Nov-15)
192 *
193 * @param user
194 * @param force
195 */
196void
197TrexStatelessPort::acquire(const std::string &user, uint32_t session_id, bool force) {
198
199    bool used_force = !get_owner().is_free() && force;
200
201    if (get_owner().is_free() || force) {
202        get_owner().own(user, session_id);
203
204    } else {
205        /* not same user or session id and not force - report error */
206        if (get_owner().get_name() == user) {
207            throw TrexException("port is already owned by another session of '" + user + "'");
208        } else {
209            throw TrexException("port is already taken by '" + get_owner().get_name() + "'");
210        }
211    }
212
213    Json::Value data;
214
215    data["port_id"]    = m_port_id;
216    data["who"]        = user;
217    data["session_id"] = session_id;
218    data["force"]      = used_force;
219
220    get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_ACQUIRED, data);
221
222}
223
224void
225TrexStatelessPort::release(void) {
226
227
228    Json::Value data;
229
230    data["port_id"]    = m_port_id;
231    data["who"]        = get_owner().get_name();
232    data["session_id"] = get_owner().get_session_id();
233
234    get_owner().release();
235
236    get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_RELEASED, data);
237}
238
239/**
240 * starts the traffic on the port
241 *
242 */
243void
244TrexStatelessPort::start_traffic(const TrexPortMultiplier &mul, double duration, bool force, uint64_t core_mask) {
245
246    /* command allowed only on state stream */
247    verify_state(PORT_STATE_STREAMS, "start");
248
249    /* just making sure no leftovers... */
250    delete_streams_graph();
251
252    /* on start - we can only provide absolute values */
253    assert(mul.m_op == TrexPortMultiplier::OP_ABS);
254
255    /* check link state */
256    if ( !platform_api->getPortAttrObj()->is_link_up(m_port_id) && !force ) {
257        throw TrexException("Link state is DOWN.");
258    }
259
260    /* caclulate the effective factor for DP */
261    double factor = calculate_effective_factor(mul, force);
262
263    StreamsFeeder feeder(this);
264
265    /* compiler it */
266    std::vector<TrexStreamsCompiledObj *> compiled_objs;
267    std::string fail_msg;
268
269    TrexStreamsCompiler compiler;
270    TrexDPCoreMask mask(get_dp_core_count(), core_mask);
271
272    bool rc = compiler.compile(m_port_id,
273                               feeder.get_streams(),
274                               compiled_objs,
275                               mask,
276                               factor,
277                               &fail_msg);
278
279    if (!rc) {
280        feeder.set_status(false);
281        throw TrexException(fail_msg);
282    }
283
284    feeder.set_status(true);
285
286    /* generate a message to all the relevant DP cores to stop transmitting */
287    assert(m_pending_async_stop_event == TrexDpPortEvents::INVALID_ID);
288    m_pending_async_stop_event = m_dp_events.create_event(new AsyncStopEvent());
289
290    /* update object status */
291    m_factor = factor;
292    m_last_all_streams_continues = compiled_objs[mask.get_active_cores()[0]]->get_all_streams_continues();
293    m_last_duration = duration;
294
295    change_state(PORT_STATE_TX);
296
297    /* update the DP - messages will be freed by the DP */
298    int index = 0;
299    for (auto core_id : m_cores_id_list) {
300
301        /* was the core assigned a compiled object ? */
302        if (compiled_objs[index]) {
303            TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(m_port_id,
304                                                                             m_pending_async_stop_event,
305                                                                             compiled_objs[index],
306                                                                             duration);
307            send_message_to_dp(core_id, start_msg);
308        } else {
309
310            /* mimic an end event */
311            m_dp_events.on_core_reporting_in(m_pending_async_stop_event, core_id);
312        }
313
314        index++;
315    }
316
317    /* for debug - this can be turn on */
318    //m_dp_events.barrier();
319
320    /* update subscribers */
321    Json::Value data;
322    data["port_id"] = m_port_id;
323    get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_STARTED, data);
324
325}
326
327
328bool TrexStatelessPort::is_active() const {
329    return   (  (m_port_state == PORT_STATE_TX)
330             || (m_port_state == PORT_STATE_PAUSE)
331             || (m_port_state == PORT_STATE_PCAP_TX)
332             );
333}
334
335/**
336 * stop traffic on port
337 *
338 * @author imarom (09-Nov-15)
339 *
340 * @return TrexStatelessPort::rc_e
341 */
342void
343TrexStatelessPort::stop_traffic(void) {
344    if (!is_active()) {
345        return;
346    }
347
348    /* delete any previous graphs */
349    delete_streams_graph();
350
351    /* to avoid race, first destroy any previous stop/pause events */
352    if (m_pending_async_stop_event != TrexDpPortEvents::INVALID_ID) {
353        m_dp_events.destroy_event(m_pending_async_stop_event);
354        m_pending_async_stop_event = TrexDpPortEvents::INVALID_ID;
355
356    }
357
358    /* generate a message to all the relevant DP cores to start transmitting */
359    TrexStatelessCpToDpMsgBase *stop_msg = new TrexStatelessDpStop(m_port_id);
360    send_message_to_all_dp(stop_msg);
361
362    /* a barrier - make sure all the DP cores stopped */
363    m_dp_events.barrier();
364
365    change_state(PORT_STATE_STREAMS);
366
367    common_port_stop_actions(false);
368}
369
370/**
371 * remove all RX filters from port
372 *
373 * @author imarom (28-Mar-16)
374 */
375void
376TrexStatelessPort::remove_rx_filters(void) {
377    /* only valid when IDLE or with streams and not TXing */
378    verify_state(PORT_STATE_STREAMS, "remove_rx_filters");
379
380    for (auto entry : m_stream_table) {
381        get_stateless_obj()->m_rx_flow_stat.stop_stream(entry.second);
382    }
383
384}
385
386/**
387 * when a port stops, perform various actions
388 *
389 */
390void
391TrexStatelessPort::common_port_stop_actions(bool async) {
392
393    Json::Value data;
394    data["port_id"] = m_port_id;
395
396    if (async) {
397        get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_FINISHED_TX, data);
398    } else {
399        get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_STOPPED, data);
400    }
401
402}
403
404/**
405 * core is considered active if it has a pending for async stop
406 *
407 */
408bool
409TrexStatelessPort::is_core_active(int core_id) {
410    return ( (m_pending_async_stop_event != TrexDpPortEvents::INVALID_ID) &&
411             (m_dp_events.is_core_pending_on_event(m_pending_async_stop_event, core_id))
412           );
413}
414
415void
416TrexStatelessPort::pause_traffic(void) {
417
418    verify_state(PORT_STATE_TX, "pause");
419
420    if (m_last_all_streams_continues == false) {
421        throw TrexException(" pause is supported when all streams are in continues mode ");
422    }
423
424    if ( m_last_duration>0.0 ) {
425        throw TrexException(" pause is supported when duration is not enable is start command ");
426    }
427
428    /* send a pause message */
429    TrexStatelessCpToDpMsgBase *pause_msg = new TrexStatelessDpPause(m_port_id);
430
431    /* send message to all cores */
432    send_message_to_all_dp(pause_msg, true);
433
434    /* make sure all DP cores paused */
435    m_dp_events.barrier();
436
437    /* change state */
438    change_state(PORT_STATE_PAUSE);
439
440    Json::Value data;
441    data["port_id"] = m_port_id;
442    get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_PAUSED, data);
443}
444
445
446void
447TrexStatelessPort::resume_traffic(void) {
448
449    verify_state(PORT_STATE_PAUSE, "resume");
450
451    /* generate a message to all the relevant DP cores to start transmitting */
452    TrexStatelessCpToDpMsgBase *resume_msg = new TrexStatelessDpResume(m_port_id);
453
454    send_message_to_all_dp(resume_msg, true);
455    change_state(PORT_STATE_TX);
456
457    Json::Value data;
458    data["port_id"] = m_port_id;
459    get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_RESUMED, data);
460}
461
462void
463TrexStatelessPort::update_traffic(const TrexPortMultiplier &mul, bool force) {
464
465    double factor;
466
467    verify_state(PORT_STATE_TX | PORT_STATE_PAUSE, "update");
468
469    /* generate a message to all the relevant DP cores to start transmitting */
470    double new_factor = calculate_effective_factor(mul, force);
471
472    switch (mul.m_op) {
473    case TrexPortMultiplier::OP_ABS:
474        factor = new_factor / m_factor;
475        break;
476
477    case TrexPortMultiplier::OP_ADD:
478        factor = (m_factor + new_factor) / m_factor;
479        break;
480
481    case TrexPortMultiplier::OP_SUB:
482        factor = (m_factor - new_factor) / m_factor;
483        if (factor <= 0) {
484            throw TrexException("Update request will lower traffic to less than zero");
485        }
486        break;
487
488    default:
489        assert(0);
490        break;
491    }
492
493    TrexStatelessCpToDpMsgBase *update_msg = new TrexStatelessDpUpdate(m_port_id, factor);
494    send_message_to_all_dp(update_msg, true);
495
496    m_factor *= factor;
497
498}
499
500void
501TrexStatelessPort::push_remote(const std::string &pcap_filename,
502                               double ipg_usec,
503                               double speedup,
504                               uint32_t count,
505                               double duration,
506                               bool is_dual) {
507
508    /* command allowed only on state stream */
509    verify_state(PORT_STATE_IDLE | PORT_STATE_STREAMS, "push_remote");
510
511    /* check that file exists */
512    std::stringstream ss;
513    CCapReaderBase *reader = CCapReaderFactory::CreateReader((char *)pcap_filename.c_str(), 0, ss);
514    if (!reader) {
515        throw TrexException(ss.str());
516    }
517
518    if ( (is_dual) && (reader->get_type() != ERF) ) {
519        throw TrexException("dual mode is only supported on ERF format");
520    }
521    delete reader;
522
523    /* only one core gets to play */
524    int tx_core = m_cores_id_list[0];
525
526    /* create async event */
527    assert(m_pending_async_stop_event == TrexDpPortEvents::INVALID_ID);
528    m_pending_async_stop_event = m_dp_events.create_event(new AsyncStopEvent());
529
530    /* mark all other cores as done */
531    for (int index = 1; index < m_cores_id_list.size(); index++) {
532        /* mimic an end event */
533        m_dp_events.on_core_reporting_in(m_pending_async_stop_event, m_cores_id_list[index]);
534    }
535
536    /* send a message to core */
537    change_state(PORT_STATE_PCAP_TX);
538    TrexStatelessCpToDpMsgBase *push_msg = new TrexStatelessDpPushPCAP(m_port_id,
539                                                                       m_pending_async_stop_event,
540                                                                       pcap_filename,
541                                                                       ipg_usec,
542                                                                       speedup,
543                                                                       count,
544                                                                       duration,
545                                                                       is_dual);
546    send_message_to_dp(tx_core, push_msg);
547
548    /* update subscribers */
549    Json::Value data;
550    data["port_id"] = m_port_id;
551    get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_STARTED, data);
552}
553
554std::string
555TrexStatelessPort::get_state_as_string() const {
556
557    switch (get_state()) {
558    case PORT_STATE_DOWN:
559        return "DOWN";
560
561    case PORT_STATE_IDLE:
562        return  "IDLE";
563
564    case PORT_STATE_STREAMS:
565        return "STREAMS";
566
567    case PORT_STATE_TX:
568        return "TX";
569
570    case PORT_STATE_PAUSE:
571        return "PAUSE";
572
573    case PORT_STATE_PCAP_TX:
574        return "PCAP_TX";
575    }
576
577    return "UNKNOWN";
578}
579
580int
581TrexStatelessPort::get_max_stream_id() const {
582    return m_stream_table.get_max_stream_id();
583}
584
585void
586TrexStatelessPort::get_properties(std::string &driver, uint32_t &speed) {
587
588    driver = m_api_info.driver_name;
589    speed = platform_api->getPortAttrObj()->get_link_speed(m_port_id);
590}
591
592bool
593TrexStatelessPort::verify_state(int state, const char *cmd_name, bool should_throw) const {
594    if ( (state & m_port_state) == 0 ) {
595        if (should_throw) {
596            std::stringstream ss;
597            ss << "command '" << cmd_name << "' cannot be executed on current state: '" << get_state_as_string() << "'";
598            throw TrexException(ss.str());
599        } else {
600            return false;
601        }
602    }
603
604    return true;
605}
606
607void
608TrexStatelessPort::change_state(port_state_e new_state) {
609
610    m_port_state = new_state;
611}
612
613
614void
615TrexStatelessPort::encode_stats(Json::Value &port) {
616
617    TrexPlatformInterfaceStats stats;
618    platform_api->get_interface_stats(m_port_id, stats);
619
620    port["tx_bps"]          = stats.m_stats.m_tx_bps;
621    port["rx_bps"]          = stats.m_stats.m_rx_bps;
622
623    port["tx_pps"]          = stats.m_stats.m_tx_pps;
624    port["rx_pps"]          = stats.m_stats.m_rx_pps;
625
626    port["total_tx_pkts"]   = Json::Value::UInt64(stats.m_stats.m_total_tx_pkts);
627    port["total_rx_pkts"]   = Json::Value::UInt64(stats.m_stats.m_total_rx_pkts);
628
629    port["total_tx_bytes"]  = Json::Value::UInt64(stats.m_stats.m_total_tx_bytes);
630    port["total_rx_bytes"]  = Json::Value::UInt64(stats.m_stats.m_total_rx_bytes);
631
632    port["tx_rx_errors"]    = Json::Value::UInt64(stats.m_stats.m_tx_rx_errors);
633}
634
635void
636TrexStatelessPort::send_message_to_all_dp(TrexStatelessCpToDpMsgBase *msg, bool send_to_active_only) {
637
638    for (auto core_id : m_cores_id_list) {
639
640        /* skip non active cores if requested */
641        if ( (send_to_active_only) && (!is_core_active(core_id)) ) {
642            continue;
643        }
644
645        send_message_to_dp(core_id, msg->clone());
646    }
647
648    /* original was not sent - delete it */
649    delete msg;
650}
651
652void
653TrexStatelessPort::send_message_to_dp(uint8_t core_id, TrexStatelessCpToDpMsgBase *msg) {
654
655    /* send the message to the core */
656    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingCpToDp(core_id);
657    ring->Enqueue((CGenNode *)msg);
658}
659
660void
661TrexStatelessPort::send_message_to_rx(TrexStatelessCpToRxMsgBase *msg) {
662
663    /* send the message to the core */
664    CNodeRing *ring = CMsgIns::Ins()->getCpRx()->getRingCpToDp(0);
665    ring->Enqueue((CGenNode *)msg);
666}
667
668uint64_t
669TrexStatelessPort::get_port_speed_bps() const {
670    return (uint64_t) platform_api->getPortAttrObj()->get_link_speed(m_port_id) * 1000 * 1000;
671}
672
673static inline double
674bps_to_gbps(double bps) {
675    return (bps / (1000.0 * 1000 * 1000));
676}
677
678double
679TrexStatelessPort::calculate_effective_factor(const TrexPortMultiplier &mul, bool force) {
680
681    double factor = calculate_effective_factor_internal(mul);
682
683    /* did we exceeded the max L1 line rate ? */
684    double expected_l1_rate = m_graph_obj->get_max_bps_l1(factor);
685
686    /* if not force and exceeded - throw exception */
687    if ( (!force) && (expected_l1_rate > get_port_speed_bps()) ) {
688        stringstream ss;
689        ss << "Expected L1 B/W: '" << bps_to_gbps(expected_l1_rate) << " Gbps' exceeds port line rate: '" << bps_to_gbps(get_port_speed_bps()) << " Gbps'";
690        throw TrexException(ss.str());
691    }
692
693    /* L1 BW must be positive */
694    if (expected_l1_rate <= 0){
695        stringstream ss;
696        ss << "Effective bandwidth must be positive, got: " << expected_l1_rate;
697        throw TrexException(ss.str());
698    }
699
700    /* factor must be positive */
701    if (factor <= 0) {
702        stringstream ss;
703        ss << "Factor must be positive, got: " << factor;
704        throw TrexException(ss.str());
705    }
706
707    return factor;
708}
709
710double
711TrexStatelessPort::calculate_effective_factor_internal(const TrexPortMultiplier &mul) {
712
713    /* we now need the graph - generate it if we don't have it (happens once) */
714    if (!m_graph_obj) {
715        generate_streams_graph();
716    }
717
718    switch (mul.m_type) {
719
720    case TrexPortMultiplier::MUL_FACTOR:
721        return (mul.m_value);
722
723    case TrexPortMultiplier::MUL_BPS:
724        return m_graph_obj->get_factor_bps_l2(mul.m_value);
725
726    case TrexPortMultiplier::MUL_BPSL1:
727        return m_graph_obj->get_factor_bps_l1(mul.m_value);
728
729    case TrexPortMultiplier::MUL_PPS:
730        return m_graph_obj->get_factor_pps(mul.m_value);
731
732    case TrexPortMultiplier::MUL_PERCENTAGE:
733        /* if abs percentage is from the line speed - otherwise its from the current speed */
734
735        if (mul.m_op == TrexPortMultiplier::OP_ABS) {
736            double required = (mul.m_value / 100.0) * get_port_speed_bps();
737            return m_graph_obj->get_factor_bps_l1(required);
738        } else {
739            return (m_factor * (mul.m_value / 100.0));
740        }
741
742    default:
743        assert(0);
744    }
745
746}
747
748
749void
750TrexStatelessPort::generate_streams_graph() {
751
752    /* dispose of the old one */
753    if (m_graph_obj) {
754        delete_streams_graph();
755    }
756
757    /* fetch all the streams from the table */
758    vector<TrexStream *> streams;
759    get_object_list(streams);
760
761    TrexStreamsGraph graph;
762    m_graph_obj = graph.generate(streams);
763}
764
765void
766TrexStatelessPort::delete_streams_graph() {
767    if (m_graph_obj) {
768        delete m_graph_obj;
769        m_graph_obj = NULL;
770    }
771}
772
773
774
775/***************************
776 * port multiplier
777 *
778 **************************/
779const std::initializer_list<std::string> TrexPortMultiplier::g_types = {"raw", "bps", "bpsl1", "pps", "percentage"};
780const std::initializer_list<std::string> TrexPortMultiplier::g_ops   = {"abs", "add", "sub"};
781
782TrexPortMultiplier::
783TrexPortMultiplier(const std::string &type_str, const std::string &op_str, double value) {
784    mul_type_e type;
785    mul_op_e   op;
786
787    if (type_str == "raw") {
788        type = MUL_FACTOR;
789
790    } else if (type_str == "bps") {
791        type = MUL_BPS;
792
793    } else if (type_str == "bpsl1") {
794        type = MUL_BPSL1;
795
796    } else if (type_str == "pps") {
797        type = MUL_PPS;
798
799    } else if (type_str == "percentage") {
800        type = MUL_PERCENTAGE;
801    } else {
802        throw TrexException("bad type str: " + type_str);
803    }
804
805    if (op_str == "abs") {
806        op = OP_ABS;
807
808    } else if (op_str == "add") {
809        op = OP_ADD;
810
811    } else if (op_str == "sub") {
812        op = OP_SUB;
813
814    } else {
815        throw TrexException("bad op str: " + op_str);
816    }
817
818    m_type  = type;
819    m_op    = op;
820    m_value = value;
821
822}
823
824const TrexStreamsGraphObj *
825TrexStatelessPort::validate(void) {
826
827    /* first compile the graph */
828
829    vector<TrexStream *> streams;
830    get_object_list(streams);
831
832    if (streams.size() == 0) {
833        throw TrexException("no streams attached to port");
834    }
835
836    TrexStreamsCompiler compiler;
837
838    /* TODO: think of this mask...*/
839    TrexDPCoreMask core_mask(get_dp_core_count(), TrexDPCoreMask::MASK_ALL);
840
841    std::vector<TrexStreamsCompiledObj *> compiled_objs;
842
843    std::string fail_msg;
844    bool rc = compiler.compile(m_port_id,
845                               streams,
846                               compiled_objs,
847                               core_mask,
848                               1.0,
849                               &fail_msg);
850    if (!rc) {
851        throw TrexException(fail_msg);
852    }
853
854    for (auto obj : compiled_objs) {
855        delete obj;
856    }
857
858    /* now create a stream graph */
859    if (!m_graph_obj) {
860        generate_streams_graph();
861    }
862
863    return m_graph_obj;
864}
865
866
867
868void
869TrexStatelessPort::get_port_effective_rate(double &pps,
870                                           double &bps_L1,
871                                           double &bps_L2,
872                                           double &percentage) {
873
874    if (get_stream_count() == 0) {
875        return;
876    }
877
878    if (!m_graph_obj) {
879        generate_streams_graph();
880    }
881
882    pps        = m_graph_obj->get_max_pps(m_factor);
883    bps_L1     = m_graph_obj->get_max_bps_l1(m_factor);
884    bps_L2     = m_graph_obj->get_max_bps_l2(m_factor);
885    percentage = (bps_L1 / get_port_speed_bps()) * 100.0;
886
887}
888
889void
890TrexStatelessPort::get_macaddr(std::string &hw_macaddr,
891                               std::string &src_macaddr,
892                               std::string &dst_macaddr) {
893
894    utl_macaddr_to_str(m_api_info.mac_info.hw_macaddr, hw_macaddr);
895    utl_macaddr_to_str(m_api_info.mac_info.src_macaddr, src_macaddr);
896    utl_macaddr_to_str(m_api_info.mac_info.dst_macaddr, dst_macaddr);
897}
898
899void
900TrexStatelessPort::get_pci_info(std::string &pci_addr, int &numa_node) {
901    pci_addr  = m_api_info.pci_addr;
902    numa_node = m_api_info.numa_node;
903}
904
905void
906TrexStatelessPort::add_stream(TrexStream *stream) {
907
908    verify_state(PORT_STATE_IDLE | PORT_STATE_STREAMS, "add_stream");
909
910    get_stateless_obj()->m_rx_flow_stat.add_stream(stream);
911
912    m_stream_table.add_stream(stream);
913    delete_streams_graph();
914
915    change_state(PORT_STATE_STREAMS);
916}
917
918void
919TrexStatelessPort::remove_stream(TrexStream *stream) {
920
921    verify_state(PORT_STATE_STREAMS, "remove_stream");
922
923    get_stateless_obj()->m_rx_flow_stat.del_stream(stream);
924
925    m_stream_table.remove_stream(stream);
926    delete_streams_graph();
927
928    if (m_stream_table.size() == 0) {
929        change_state(PORT_STATE_IDLE);
930    }
931}
932
933void
934TrexStatelessPort::remove_and_delete_all_streams() {
935    verify_state(PORT_STATE_IDLE | PORT_STATE_STREAMS, "remove_and_delete_all_streams");
936
937    vector<TrexStream *> streams;
938    get_object_list(streams);
939
940    for (auto stream : streams) {
941        remove_stream(stream);
942        delete stream;
943    }
944}
945
946/************* Trex Port Owner **************/
947
948TrexPortOwner::TrexPortOwner() {
949    m_is_free = true;
950    m_session_id = 0;
951
952    /* for handlers random generation */
953    m_seed = time(NULL);
954}
955
956const std::string TrexPortOwner::g_unowned_name = "<FREE>";
957const std::string TrexPortOwner::g_unowned_handler = "";
958