trex_stateless_port.cpp revision 34cb66c9
1/*
2 Itay Marom
3 Cisco Systems, Inc.
4*/
5
6/*
7Copyright (c) 2015-2015 Cisco Systems, Inc.
8
9Licensed under the Apache License, Version 2.0 (the "License");
10you may not use this file except in compliance with the License.
11You may obtain a copy of the License at
12
13    http://www.apache.org/licenses/LICENSE-2.0
14
15Unless required by applicable law or agreed to in writing, software
16distributed under the License is distributed on an "AS IS" BASIS,
17WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18See the License for the specific language governing permissions and
19limitations under the License.
20*/
21
22#include <trex_stateless.h>
23#include <trex_stateless_port.h>
24#include <trex_stateless_messaging.h>
25#include <trex_streams_compiler.h>
26#include <common/basic_utils.h>
27#include <common/captureFile.h>
28#include "trex_stateless_rx_defs.h"
29
30#include <string>
31
32#ifndef TREX_RPC_MOCK_SERVER
33
34// DPDK c++ issue
35#ifndef UINT8_MAX
36    #define UINT8_MAX 255
37#endif
38
39#ifndef UINT16_MAX
40    #define UINT16_MAX 0xFFFF
41#endif
42
43// DPDK c++ issue
44#endif
45
46#include <os_time.h>
47
48void
49port_id_to_cores(uint8_t port_id, std::vector<std::pair<uint8_t, uint8_t>> &cores_id_list);
50
51using namespace std;
52
53
54
55/***************************
56 * trex DP events handlers
57 *
58 **************************/
59class AsyncStopEvent : public TrexDpPortEvent {
60
61protected:
62    /**
63     * when an async event occurs (all cores have reported in)
64     *
65     * @author imarom (29-Feb-16)
66     */
67    virtual void on_event() {
68        get_port()->change_state(TrexStatelessPort::PORT_STATE_STREAMS);
69
70        get_port()->common_port_stop_actions(true);
71
72        assert(get_port()->m_pending_async_stop_event != TrexDpPortEvents::INVALID_ID);
73        get_port()->m_pending_async_stop_event = TrexDpPortEvents::INVALID_ID;
74    }
75
76    /**
77     * when a DP core encountered an error
78     *
79     * @author imarom (20-Apr-16)
80     */
81    virtual void on_error(int thread_id) {
82        Json::Value data;
83
84        data["port_id"]   = get_port()->get_port_id();
85        data["thread_id"] = thread_id;
86
87        get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_ERROR, data);
88    }
89};
90
91/*************************************
92 * Streams Feeder
93 * A class that holds a temporary
94 * clone of streams that can be
95 * manipulated
96 *
97 * this is a RAII object meant for
98 * graceful cleanup
99 ************************************/
100class StreamsFeeder {
101public:
102
103    StreamsFeeder(TrexStatelessPort *port) {
104        /* start pesimistic */
105        m_success = false;
106
107        m_port = port;
108    }
109
110    void feed() {
111
112        /* fetch the original streams */
113        m_port->get_object_list(m_in_streams);
114
115        for (const TrexStream *in_stream : m_in_streams) {
116            TrexStream *out_stream = in_stream->clone(true);
117
118            m_out_streams.push_back(out_stream);
119
120            get_stateless_obj()->m_rx_flow_stat.start_stream(out_stream);
121
122        }
123    }
124
125    void set_status(bool status) {
126        m_success = status;
127    }
128
129    vector<TrexStream *> &get_streams() {
130        return m_out_streams;
131    }
132
133    /**
134     * RAII
135     */
136    ~StreamsFeeder() {
137        for (int i = 0; i < m_out_streams.size(); i++) {
138            TrexStream *out_stream = m_out_streams[i];
139            TrexStream *in_stream  = m_in_streams[i];
140
141            if (m_success) {
142                /* success path */
143                get_stateless_obj()->m_rx_flow_stat.copy_state(out_stream, in_stream);
144            } else {
145                /* fail path */
146                get_stateless_obj()->m_rx_flow_stat.stop_stream(out_stream);
147            }
148            delete out_stream;
149        }
150    }
151
152private:
153    vector<TrexStream *>  m_in_streams;
154    vector<TrexStream *>  m_out_streams;
155    bool                  m_success;
156
157    TrexStatelessPort    *m_port;
158};
159
160
161/***************************
162 * trex stateless port
163 *
164 **************************/
165TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api) : m_dp_events(this) {
166    std::vector<std::pair<uint8_t, uint8_t>> core_pair_list;
167
168    m_port_id            = port_id;
169    m_port_state         = PORT_STATE_IDLE;
170    m_platform_api       = api;
171
172    /* get the platform specific data */
173    api->get_interface_info(port_id, m_api_info);
174
175    /* get RX caps */
176    api->get_interface_stat_info(port_id, m_rx_count_num, m_rx_caps);
177
178    /* get the DP cores belonging to this port */
179    api->port_id_to_cores(m_port_id, core_pair_list);
180
181    for (auto core_pair : core_pair_list) {
182
183        /* send the core id */
184        m_cores_id_list.push_back(core_pair.first);
185    }
186
187    m_graph_obj = NULL;
188
189    m_pending_async_stop_event = TrexDpPortEvents::INVALID_ID;
190}
191
192TrexStatelessPort::~TrexStatelessPort() {
193
194    stop_traffic();
195    remove_and_delete_all_streams();
196}
197
198/**
199 * acquire the port
200 *
201 * @author imarom (09-Nov-15)
202 *
203 * @param user
204 * @param force
205 */
206void
207TrexStatelessPort::acquire(const std::string &user, uint32_t session_id, bool force) {
208
209    bool used_force = !get_owner().is_free() && force;
210
211    if (get_owner().is_free() || force) {
212        get_owner().own(user, session_id);
213
214    } else {
215        /* not same user or session id and not force - report error */
216        if (get_owner().get_name() == user) {
217            throw TrexException("port is already owned by another session of '" + user + "'");
218        } else {
219            throw TrexException("port is already taken by '" + get_owner().get_name() + "'");
220        }
221    }
222
223    Json::Value data;
224
225    data["port_id"]    = m_port_id;
226    data["who"]        = user;
227    data["session_id"] = session_id;
228    data["force"]      = used_force;
229
230    get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_ACQUIRED, data);
231
232}
233
234void
235TrexStatelessPort::release(void) {
236
237
238    Json::Value data;
239
240    data["port_id"]    = m_port_id;
241    data["who"]        = get_owner().get_name();
242    data["session_id"] = get_owner().get_session_id();
243
244    get_owner().release();
245
246    get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_RELEASED, data);
247}
248
249/**
250 * starts the traffic on the port
251 *
252 */
253void
254TrexStatelessPort::start_traffic(const TrexPortMultiplier &mul, double duration, bool force, uint64_t core_mask) {
255
256    /* command allowed only on state stream */
257    verify_state(PORT_STATE_STREAMS, "start");
258
259    /* just making sure no leftovers... */
260    delete_streams_graph();
261
262    /* on start - we can only provide absolute values */
263    assert(mul.m_op == TrexPortMultiplier::OP_ABS);
264
265    /* check link state */
266    if ( !m_platform_api->getPortAttrObj(m_port_id)->is_link_up() && !force ) {
267        throw TrexException("Link state is DOWN.");
268    }
269
270    /* caclulate the effective factor for DP */
271    double factor = calculate_effective_factor(mul, force);
272
273    StreamsFeeder feeder(this);
274    feeder.feed();
275
276    /* compiler it */
277    std::vector<TrexStreamsCompiledObj *> compiled_objs;
278    std::string fail_msg;
279
280    TrexStreamsCompiler compiler;
281    TrexDPCoreMask mask(get_dp_core_count(), core_mask);
282
283    bool rc = compiler.compile(m_port_id,
284                               feeder.get_streams(),
285                               compiled_objs,
286                               mask,
287                               factor,
288                               &fail_msg);
289
290    if (!rc) {
291        feeder.set_status(false);
292        throw TrexException(fail_msg);
293    }
294
295    feeder.set_status(true);
296
297    /* generate a message to all the relevant DP cores to stop transmitting */
298    assert(m_pending_async_stop_event == TrexDpPortEvents::INVALID_ID);
299    m_pending_async_stop_event = m_dp_events.create_event(new AsyncStopEvent());
300
301    /* update object status */
302    m_factor = factor;
303    m_last_all_streams_continues = compiled_objs[mask.get_active_cores()[0]]->get_all_streams_continues();
304    m_last_duration = duration;
305
306    change_state(PORT_STATE_TX);
307
308    /* update the DP - messages will be freed by the DP */
309    int index = 0;
310    for (auto core_id : m_cores_id_list) {
311
312        /* was the core assigned a compiled object ? */
313        if (compiled_objs[index]) {
314            TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(m_port_id,
315                                                                             m_pending_async_stop_event,
316                                                                             compiled_objs[index],
317                                                                             duration);
318            send_message_to_dp(core_id, start_msg);
319        } else {
320
321            /* mimic an end event */
322            m_dp_events.on_core_reporting_in(m_pending_async_stop_event, core_id);
323        }
324
325        index++;
326    }
327
328    /* for debug - this can be turn on */
329    //m_dp_events.barrier();
330
331    /* update subscribers */
332    Json::Value data;
333    data["port_id"] = m_port_id;
334    get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_STARTED, data);
335
336}
337
338
339bool TrexStatelessPort::is_active() const {
340    return   (  (m_port_state == PORT_STATE_TX)
341             || (m_port_state == PORT_STATE_PAUSE)
342             || (m_port_state == PORT_STATE_PCAP_TX)
343             );
344}
345
346/**
347 * stop traffic on port
348 *
349 * @author imarom (09-Nov-15)
350 *
351 * @return TrexStatelessPort::rc_e
352 */
353void
354TrexStatelessPort::stop_traffic(void) {
355    if (!is_active()) {
356        return;
357    }
358
359    /* delete any previous graphs */
360    delete_streams_graph();
361
362    /* to avoid race, first destroy any previous stop/pause events */
363    if (m_pending_async_stop_event != TrexDpPortEvents::INVALID_ID) {
364        m_dp_events.destroy_event(m_pending_async_stop_event);
365        m_pending_async_stop_event = TrexDpPortEvents::INVALID_ID;
366
367    }
368
369    /* generate a message to all the relevant DP cores to start transmitting */
370    TrexStatelessCpToDpMsgBase *stop_msg = new TrexStatelessDpStop(m_port_id);
371    send_message_to_all_dp(stop_msg);
372
373    /* a barrier - make sure all the DP cores stopped */
374    m_dp_events.barrier();
375
376    change_state(PORT_STATE_STREAMS);
377
378    common_port_stop_actions(false);
379}
380
381/**
382 * remove all RX filters from port
383 *
384 * @author imarom (28-Mar-16)
385 */
386void
387TrexStatelessPort::remove_rx_filters(void) {
388    /* only valid when IDLE or with streams and not TXing */
389    verify_state(PORT_STATE_STREAMS, "remove_rx_filters");
390
391    for (auto entry : m_stream_table) {
392        get_stateless_obj()->m_rx_flow_stat.stop_stream(entry.second);
393    }
394
395}
396
397/**
398 * when a port stops, perform various actions
399 *
400 */
401void
402TrexStatelessPort::common_port_stop_actions(bool async) {
403
404    Json::Value data;
405    data["port_id"] = m_port_id;
406
407    if (async) {
408        get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_FINISHED_TX, data);
409    } else {
410        get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_STOPPED, data);
411    }
412
413}
414
415/**
416 * core is considered active if it has a pending for async stop
417 *
418 */
419bool
420TrexStatelessPort::is_core_active(int core_id) {
421    return ( (m_pending_async_stop_event != TrexDpPortEvents::INVALID_ID) &&
422             (m_dp_events.is_core_pending_on_event(m_pending_async_stop_event, core_id))
423           );
424}
425
426void
427TrexStatelessPort::pause_traffic(void) {
428
429    verify_state(PORT_STATE_TX, "pause");
430
431    if (m_last_all_streams_continues == false) {
432        throw TrexException(" pause is supported when all streams are in continues mode ");
433    }
434
435    if ( m_last_duration>0.0 ) {
436        throw TrexException(" pause is supported when duration is not enable is start command ");
437    }
438
439    /* send a pause message */
440    TrexStatelessCpToDpMsgBase *pause_msg = new TrexStatelessDpPause(m_port_id);
441
442    /* send message to all cores */
443    send_message_to_all_dp(pause_msg, true);
444
445    /* make sure all DP cores paused */
446    m_dp_events.barrier();
447
448    /* change state */
449    change_state(PORT_STATE_PAUSE);
450
451    Json::Value data;
452    data["port_id"] = m_port_id;
453    get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_PAUSED, data);
454}
455
456
457void
458TrexStatelessPort::resume_traffic(void) {
459
460    verify_state(PORT_STATE_PAUSE, "resume");
461
462    /* generate a message to all the relevant DP cores to start transmitting */
463    TrexStatelessCpToDpMsgBase *resume_msg = new TrexStatelessDpResume(m_port_id);
464
465    send_message_to_all_dp(resume_msg, true);
466    change_state(PORT_STATE_TX);
467
468    Json::Value data;
469    data["port_id"] = m_port_id;
470    get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_RESUMED, data);
471}
472
473void
474TrexStatelessPort::update_traffic(const TrexPortMultiplier &mul, bool force) {
475
476    double factor;
477
478    verify_state(PORT_STATE_TX | PORT_STATE_PAUSE, "update");
479
480    /* generate a message to all the relevant DP cores to start transmitting */
481    double new_factor = calculate_effective_factor(mul, force);
482
483    switch (mul.m_op) {
484    case TrexPortMultiplier::OP_ABS:
485        factor = new_factor / m_factor;
486        break;
487
488    case TrexPortMultiplier::OP_ADD:
489        factor = (m_factor + new_factor) / m_factor;
490        break;
491
492    case TrexPortMultiplier::OP_SUB:
493        factor = (m_factor - new_factor) / m_factor;
494        if (factor <= 0) {
495            throw TrexException("Update request will lower traffic to less than zero");
496        }
497        break;
498
499    default:
500        assert(0);
501        break;
502    }
503
504    TrexStatelessCpToDpMsgBase *update_msg = new TrexStatelessDpUpdate(m_port_id, factor);
505    send_message_to_all_dp(update_msg, true);
506
507    m_factor *= factor;
508
509}
510
511void
512TrexStatelessPort::push_remote(const std::string &pcap_filename,
513                               double ipg_usec,
514                               double min_ipg_sec,
515                               double speedup,
516                               uint32_t count,
517                               double duration,
518                               bool is_dual) {
519
520    /* command allowed only on state stream */
521    verify_state(PORT_STATE_IDLE | PORT_STATE_STREAMS, "push_remote");
522
523    /* check that file exists */
524    std::stringstream ss;
525    CCapReaderBase *reader = CCapReaderFactory::CreateReader((char *)pcap_filename.c_str(), 0, ss);
526    if (!reader) {
527        throw TrexException(ss.str());
528    }
529
530    if ( (is_dual) && (reader->get_type() != ERF) ) {
531        throw TrexException("dual mode is only supported on ERF format");
532    }
533    delete reader;
534
535    /* only one core gets to play */
536    int tx_core = m_cores_id_list[0];
537
538    /* create async event */
539    assert(m_pending_async_stop_event == TrexDpPortEvents::INVALID_ID);
540    m_pending_async_stop_event = m_dp_events.create_event(new AsyncStopEvent());
541
542    /* mark all other cores as done */
543    for (int index = 1; index < m_cores_id_list.size(); index++) {
544        /* mimic an end event */
545        m_dp_events.on_core_reporting_in(m_pending_async_stop_event, m_cores_id_list[index]);
546    }
547
548    /* send a message to core */
549    change_state(PORT_STATE_PCAP_TX);
550    TrexStatelessCpToDpMsgBase *push_msg = new TrexStatelessDpPushPCAP(m_port_id,
551                                                                       m_pending_async_stop_event,
552                                                                       pcap_filename,
553                                                                       ipg_usec,
554                                                                       min_ipg_sec,
555                                                                       speedup,
556                                                                       count,
557                                                                       duration,
558                                                                       is_dual);
559    send_message_to_dp(tx_core, push_msg);
560
561    /* update subscribers */
562    Json::Value data;
563    data["port_id"] = m_port_id;
564    get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_STARTED, data);
565}
566
567std::string
568TrexStatelessPort::get_state_as_string() const {
569
570    switch (get_state()) {
571    case PORT_STATE_DOWN:
572        return "DOWN";
573
574    case PORT_STATE_IDLE:
575        return  "IDLE";
576
577    case PORT_STATE_STREAMS:
578        return "STREAMS";
579
580    case PORT_STATE_TX:
581        return "TX";
582
583    case PORT_STATE_PAUSE:
584        return "PAUSE";
585
586    case PORT_STATE_PCAP_TX:
587        return "PCAP_TX";
588    }
589
590    return "UNKNOWN";
591}
592
593int
594TrexStatelessPort::get_max_stream_id() const {
595    return m_stream_table.get_max_stream_id();
596}
597
598void
599TrexStatelessPort::get_properties(std::string &driver) {
600
601    driver = m_api_info.driver_name;
602}
603
604bool
605TrexStatelessPort::verify_state(int state, const char *cmd_name, bool should_throw) const {
606    if ( (state & m_port_state) == 0 ) {
607        if (should_throw) {
608            std::stringstream ss;
609            ss << "command '" << cmd_name << "' cannot be executed on current state: '" << get_state_as_string() << "'";
610            throw TrexException(ss.str());
611        } else {
612            return false;
613        }
614    }
615
616    return true;
617}
618
619void
620TrexStatelessPort::change_state(port_state_e new_state) {
621
622    m_port_state = new_state;
623}
624
625
626void
627TrexStatelessPort::encode_stats(Json::Value &port) {
628
629    TrexPlatformInterfaceStats stats;
630    m_platform_api->get_interface_stats(m_port_id, stats);
631
632    port["tx_bps"]          = stats.m_stats.m_tx_bps;
633    port["rx_bps"]          = stats.m_stats.m_rx_bps;
634
635    port["tx_pps"]          = stats.m_stats.m_tx_pps;
636    port["rx_pps"]          = stats.m_stats.m_rx_pps;
637
638    port["total_tx_pkts"]   = Json::Value::UInt64(stats.m_stats.m_total_tx_pkts);
639    port["total_rx_pkts"]   = Json::Value::UInt64(stats.m_stats.m_total_rx_pkts);
640
641    port["total_tx_bytes"]  = Json::Value::UInt64(stats.m_stats.m_total_tx_bytes);
642    port["total_rx_bytes"]  = Json::Value::UInt64(stats.m_stats.m_total_rx_bytes);
643
644    port["tx_rx_errors"]    = Json::Value::UInt64(stats.m_stats.m_tx_rx_errors);
645}
646
647void
648TrexStatelessPort::send_message_to_all_dp(TrexStatelessCpToDpMsgBase *msg, bool send_to_active_only) {
649
650    for (auto core_id : m_cores_id_list) {
651
652        /* skip non active cores if requested */
653        if ( (send_to_active_only) && (!is_core_active(core_id)) ) {
654            continue;
655        }
656
657        send_message_to_dp(core_id, msg->clone());
658    }
659
660    /* original was not sent - delete it */
661    delete msg;
662}
663
664void
665TrexStatelessPort::send_message_to_dp(uint8_t core_id, TrexStatelessCpToDpMsgBase *msg) {
666
667    /* send the message to the core */
668    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingCpToDp(core_id);
669    ring->Enqueue((CGenNode *)msg);
670}
671
672void
673TrexStatelessPort::send_message_to_rx(TrexStatelessCpToRxMsgBase *msg) {
674
675    /* send the message to the core */
676    CNodeRing *ring = CMsgIns::Ins()->getCpRx()->getRingCpToDp(0);
677    ring->Enqueue((CGenNode *)msg);
678}
679
680uint64_t
681TrexStatelessPort::get_port_speed_bps() const {
682    return (uint64_t) m_platform_api->getPortAttrObj(m_port_id)->get_link_speed() * 1000 * 1000;
683}
684
685static inline double
686bps_to_gbps(double bps) {
687    return (bps / (1000.0 * 1000 * 1000));
688}
689
690double
691TrexStatelessPort::calculate_effective_factor(const TrexPortMultiplier &mul, bool force) {
692
693    double factor = calculate_effective_factor_internal(mul);
694
695    /* did we exceeded the max L1 line rate ? */
696    double expected_l1_rate = m_graph_obj->get_max_bps_l1(factor);
697
698    /* if not force and exceeded - throw exception */
699    if ( (!force) && (expected_l1_rate > get_port_speed_bps()) ) {
700        stringstream ss;
701        ss << "Expected L1 B/W: '" << bps_to_gbps(expected_l1_rate) << " Gbps' exceeds port line rate: '" << bps_to_gbps(get_port_speed_bps()) << " Gbps'";
702        throw TrexException(ss.str());
703    }
704
705    /* L1 BW must be positive */
706    if (expected_l1_rate <= 0){
707        stringstream ss;
708        ss << "Effective bandwidth must be positive, got: " << expected_l1_rate;
709        throw TrexException(ss.str());
710    }
711
712    /* factor must be positive */
713    if (factor <= 0) {
714        stringstream ss;
715        ss << "Factor must be positive, got: " << factor;
716        throw TrexException(ss.str());
717    }
718
719    return factor;
720}
721
722double
723TrexStatelessPort::calculate_effective_factor_internal(const TrexPortMultiplier &mul) {
724
725    /* we now need the graph - generate it if we don't have it (happens once) */
726    if (!m_graph_obj) {
727        generate_streams_graph();
728    }
729
730    switch (mul.m_type) {
731
732    case TrexPortMultiplier::MUL_FACTOR:
733        return (mul.m_value);
734
735    case TrexPortMultiplier::MUL_BPS:
736        return m_graph_obj->get_factor_bps_l2(mul.m_value);
737
738    case TrexPortMultiplier::MUL_BPSL1:
739        return m_graph_obj->get_factor_bps_l1(mul.m_value);
740
741    case TrexPortMultiplier::MUL_PPS:
742        return m_graph_obj->get_factor_pps(mul.m_value);
743
744    case TrexPortMultiplier::MUL_PERCENTAGE:
745        /* if abs percentage is from the line speed - otherwise its from the current speed */
746
747        if (mul.m_op == TrexPortMultiplier::OP_ABS) {
748            double required = (mul.m_value / 100.0) * get_port_speed_bps();
749            return m_graph_obj->get_factor_bps_l1(required);
750        } else {
751            return (m_factor * (mul.m_value / 100.0));
752        }
753
754    default:
755        assert(0);
756    }
757
758}
759
760
761void
762TrexStatelessPort::generate_streams_graph() {
763
764    /* dispose of the old one */
765    if (m_graph_obj) {
766        delete_streams_graph();
767    }
768
769    /* fetch all the streams from the table */
770    vector<TrexStream *> streams;
771    get_object_list(streams);
772
773    TrexStreamsGraph graph;
774    m_graph_obj = graph.generate(streams);
775}
776
777void
778TrexStatelessPort::delete_streams_graph() {
779    if (m_graph_obj) {
780        delete m_graph_obj;
781        m_graph_obj = NULL;
782    }
783}
784
785
786
787/***************************
788 * port multiplier
789 *
790 **************************/
791const std::initializer_list<std::string> TrexPortMultiplier::g_types = {"raw", "bps", "bpsl1", "pps", "percentage"};
792const std::initializer_list<std::string> TrexPortMultiplier::g_ops   = {"abs", "add", "sub"};
793
794TrexPortMultiplier::
795TrexPortMultiplier(const std::string &type_str, const std::string &op_str, double value) {
796    mul_type_e type;
797    mul_op_e   op;
798
799    if (type_str == "raw") {
800        type = MUL_FACTOR;
801
802    } else if (type_str == "bps") {
803        type = MUL_BPS;
804
805    } else if (type_str == "bpsl1") {
806        type = MUL_BPSL1;
807
808    } else if (type_str == "pps") {
809        type = MUL_PPS;
810
811    } else if (type_str == "percentage") {
812        type = MUL_PERCENTAGE;
813    } else {
814        throw TrexException("bad type str: " + type_str);
815    }
816
817    if (op_str == "abs") {
818        op = OP_ABS;
819
820    } else if (op_str == "add") {
821        op = OP_ADD;
822
823    } else if (op_str == "sub") {
824        op = OP_SUB;
825
826    } else {
827        throw TrexException("bad op str: " + op_str);
828    }
829
830    m_type  = type;
831    m_op    = op;
832    m_value = value;
833
834}
835
836const TrexStreamsGraphObj *
837TrexStatelessPort::validate(void) {
838
839    /* first compile the graph */
840
841    vector<TrexStream *> streams;
842    get_object_list(streams);
843
844    if (streams.size() == 0) {
845        throw TrexException("no streams attached to port");
846    }
847
848    TrexStreamsCompiler compiler;
849
850    /* TODO: think of this mask...*/
851    TrexDPCoreMask core_mask(get_dp_core_count(), TrexDPCoreMask::MASK_ALL);
852
853    std::vector<TrexStreamsCompiledObj *> compiled_objs;
854
855    std::string fail_msg;
856    bool rc = compiler.compile(m_port_id,
857                               streams,
858                               compiled_objs,
859                               core_mask,
860                               1.0,
861                               &fail_msg);
862    if (!rc) {
863        throw TrexException(fail_msg);
864    }
865
866    for (auto obj : compiled_objs) {
867        delete obj;
868    }
869
870    /* now create a stream graph */
871    if (!m_graph_obj) {
872        generate_streams_graph();
873    }
874
875    return m_graph_obj;
876}
877
878
879
880void
881TrexStatelessPort::get_port_effective_rate(double &pps,
882                                           double &bps_L1,
883                                           double &bps_L2,
884                                           double &percentage) {
885
886    if (get_stream_count() == 0) {
887        return;
888    }
889
890    if (!m_graph_obj) {
891        generate_streams_graph();
892    }
893
894    pps        = m_graph_obj->get_max_pps(m_factor);
895    bps_L1     = m_graph_obj->get_max_bps_l1(m_factor);
896    bps_L2     = m_graph_obj->get_max_bps_l2(m_factor);
897    percentage = (bps_L1 / get_port_speed_bps()) * 100.0;
898
899}
900
901void
902TrexStatelessPort::get_pci_info(std::string &pci_addr, int &numa_node) {
903    pci_addr  = m_api_info.pci_addr;
904    numa_node = m_api_info.numa_node;
905}
906
907void
908TrexStatelessPort::add_stream(TrexStream *stream) {
909
910    verify_state(PORT_STATE_IDLE | PORT_STATE_STREAMS, "add_stream");
911
912    if (m_stream_table.size() >= MAX_STREAMS) {
913        throw TrexException("Reached limit of " + std::to_string(MAX_STREAMS) + " streams at the port.");
914    }
915    get_stateless_obj()->m_rx_flow_stat.add_stream(stream);
916
917    m_stream_table.add_stream(stream);
918    delete_streams_graph();
919
920    change_state(PORT_STATE_STREAMS);
921}
922
923void
924TrexStatelessPort::remove_stream(TrexStream *stream) {
925
926    verify_state(PORT_STATE_STREAMS, "remove_stream");
927
928    get_stateless_obj()->m_rx_flow_stat.del_stream(stream);
929
930    m_stream_table.remove_stream(stream);
931    delete_streams_graph();
932
933    if (m_stream_table.size() == 0) {
934        change_state(PORT_STATE_IDLE);
935    }
936}
937
938void
939TrexStatelessPort::remove_and_delete_all_streams() {
940    verify_state(PORT_STATE_IDLE | PORT_STATE_STREAMS, "remove_and_delete_all_streams");
941
942    vector<TrexStream *> streams;
943    get_object_list(streams);
944
945    for (auto stream : streams) {
946        remove_stream(stream);
947        delete stream;
948    }
949}
950
951void
952TrexStatelessPort::start_rx_capture(const std::string &pcap_filename, uint64_t limit) {
953    static MsgReply<bool> reply;
954
955    reply.reset();
956
957    TrexStatelessRxStartCapture *msg = new TrexStatelessRxStartCapture(m_port_id, pcap_filename, limit, reply);
958    send_message_to_rx((TrexStatelessCpToRxMsgBase *)msg);
959
960    /* as below, must wait for ACK from RX core before returning ACK */
961    reply.wait_for_reply();
962}
963
964void
965TrexStatelessPort::stop_rx_capture() {
966    TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStopCapture(m_port_id);
967    send_message_to_rx(msg);
968}
969
970void
971TrexStatelessPort::start_rx_queue(uint64_t size) {
972    static MsgReply<bool> reply;
973
974    reply.reset();
975
976    TrexStatelessRxStartQueue *msg = new TrexStatelessRxStartQueue(m_port_id, size, reply);
977    send_message_to_rx( (TrexStatelessCpToRxMsgBase *)msg );
978
979    /* we cannot return ACK to the user until the RX core has approved
980       this might cause the user to lose some packets from the queue
981     */
982    reply.wait_for_reply();
983}
984
985void
986TrexStatelessPort::stop_rx_queue() {
987    TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStopQueue(m_port_id);
988    send_message_to_rx(msg);
989}
990
991
992const RXPacketBuffer *
993TrexStatelessPort::get_rx_queue_pkts() {
994    static MsgReply<const RXPacketBuffer *> reply;
995
996    reply.reset();
997
998    TrexStatelessRxQueueGetPkts *msg = new TrexStatelessRxQueueGetPkts(m_port_id, reply);
999    send_message_to_rx( (TrexStatelessCpToRxMsgBase *)msg );
1000
1001    return reply.wait_for_reply();
1002}
1003
1004
1005/**
1006 * configures port in L2 mode
1007 *
1008 */
1009void
1010TrexStatelessPort::set_l2_mode(const uint8_t *dest_mac) {
1011
1012    /* not valid under traffic */
1013    verify_state(PORT_STATE_IDLE | PORT_STATE_STREAMS, "set_l2_mode");
1014
1015    /* configure port attributes for L2 */
1016    getPortAttrObj()->set_l2_mode(dest_mac);
1017
1018    /* update RX core */
1019    TrexStatelessRxSetL2Mode *msg = new TrexStatelessRxSetL2Mode(m_port_id);
1020    send_message_to_rx( (TrexStatelessCpToRxMsgBase *)msg );
1021}
1022
1023/**
1024 * configures port in L3 mode - unresolved
1025 */
1026void
1027TrexStatelessPort::set_l3_mode(uint32_t src_ipv4, uint32_t dest_ipv4) {
1028
1029    /* not valid under traffic */
1030    verify_state(PORT_STATE_IDLE | PORT_STATE_STREAMS, "set_l3_mode");
1031
1032    /* configure port attributes with L3 */
1033    getPortAttrObj()->set_l3_mode(src_ipv4, dest_ipv4);
1034
1035    /* send RX core the relevant info */
1036    CManyIPInfo ip_info;
1037    ip_info.insert(COneIPv4Info(src_ipv4, 0, getPortAttrObj()->get_layer_cfg().get_ether().get_src()));
1038
1039    TrexStatelessRxSetL3Mode *msg = new TrexStatelessRxSetL3Mode(m_port_id, ip_info, false);
1040    send_message_to_rx( (TrexStatelessCpToRxMsgBase *)msg );
1041}
1042
1043/**
1044 * configures port in L3 mode - resolved
1045 *
1046 */
1047void
1048TrexStatelessPort::set_l3_mode(uint32_t src_ipv4, uint32_t dest_ipv4, const uint8_t *resolved_mac) {
1049
1050    verify_state(PORT_STATE_IDLE | PORT_STATE_STREAMS, "set_l3_mode");
1051
1052    /* configure port attributes with L3 */
1053    getPortAttrObj()->set_l3_mode(src_ipv4, dest_ipv4, resolved_mac);
1054
1055    /* send RX core the relevant info */
1056    CManyIPInfo ip_info;
1057    ip_info.insert(COneIPv4Info(src_ipv4, 0, getPortAttrObj()->get_layer_cfg().get_ether().get_src()));
1058
1059    bool is_grat_arp_needed = !getPortAttrObj()->is_loopback();
1060
1061    TrexStatelessRxSetL3Mode *msg = new TrexStatelessRxSetL3Mode(m_port_id, ip_info, is_grat_arp_needed);
1062    send_message_to_rx( (TrexStatelessCpToRxMsgBase *)msg );
1063}
1064
1065
1066Json::Value
1067TrexStatelessPort::rx_features_to_json() {
1068    static MsgReply<Json::Value> reply;
1069
1070    reply.reset();
1071
1072    TrexStatelessRxFeaturesToJson *msg = new TrexStatelessRxFeaturesToJson(m_port_id, reply);
1073    send_message_to_rx( (TrexStatelessCpToRxMsgBase *)msg );
1074
1075    return reply.wait_for_reply();
1076}
1077
1078/************* Trex Port Owner **************/
1079
1080TrexPortOwner::TrexPortOwner() {
1081    m_is_free = true;
1082    m_session_id = 0;
1083
1084    /* for handlers random generation */
1085    m_seed = time(NULL);
1086}
1087
1088const std::string TrexPortOwner::g_unowned_name = "<FREE>";
1089const std::string TrexPortOwner::g_unowned_handler = "";
1090