1/*
2 Itay Marom
3 Cisco Systems, Inc.
4*/
5
6/*
7Copyright (c) 2015-2015 Cisco Systems, Inc.
8
9Licensed under the Apache License, Version 2.0 (the "License");
10you may not use this file except in compliance with the License.
11You may obtain a copy of the License at
12
13    http://www.apache.org/licenses/LICENSE-2.0
14
15Unless required by applicable law or agreed to in writing, software
16distributed under the License is distributed on an "AS IS" BASIS,
17WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18See the License for the specific language governing permissions and
19limitations under the License.
20*/
21
22#include <trex_stateless.h>
23#include <trex_stateless_port.h>
24#include <trex_stateless_messaging.h>
25#include <trex_streams_compiler.h>
26#include <common/basic_utils.h>
27#include <common/captureFile.h>
28#include "trex_stateless_rx_defs.h"
29
30#include <string>
31
32#ifndef TREX_RPC_MOCK_SERVER
33
34// DPDK c++ issue
35#ifndef UINT8_MAX
36    #define UINT8_MAX 255
37#endif
38
39#ifndef UINT16_MAX
40    #define UINT16_MAX 0xFFFF
41#endif
42
43// DPDK c++ issue
44#endif
45
46#include <os_time.h>
47
48void
49port_id_to_cores(uint8_t port_id, std::vector<std::pair<uint8_t, uint8_t>> &cores_id_list);
50
51using namespace std;
52
53
54
55/***************************
56 * trex DP events handlers
57 *
58 **************************/
59class AsyncStopEvent : public TrexDpPortEvent {
60
61protected:
62    /**
63     * when an async event occurs (all cores have reported in)
64     *
65     * @author imarom (29-Feb-16)
66     */
67    virtual void on_event() {
68        get_port()->change_state(TrexStatelessPort::PORT_STATE_STREAMS);
69
70        get_port()->common_port_stop_actions(true);
71
72        assert(get_port()->m_pending_async_stop_event != TrexDpPortEvents::INVALID_ID);
73        get_port()->m_pending_async_stop_event = TrexDpPortEvents::INVALID_ID;
74    }
75
76    /**
77     * when a DP core encountered an error
78     *
79     * @author imarom (20-Apr-16)
80     */
81    virtual void on_error(int thread_id) {
82        Json::Value data;
83
84        data["port_id"]   = get_port()->get_port_id();
85        data["thread_id"] = thread_id;
86
87        get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_ERROR, data);
88    }
89};
90
91/*************************************
92 * Streams Feeder
93 * A class that holds a temporary
94 * clone of streams that can be
95 * manipulated
96 *
97 * this is a RAII object meant for
98 * graceful cleanup
99 ************************************/
100class StreamsFeeder {
101public:
102
103    StreamsFeeder(TrexStatelessPort *port) {
104        /* start pesimistic */
105        m_success = false;
106
107        m_port = port;
108    }
109
110    void feed() {
111
112        /* fetch the original streams */
113        m_port->get_object_list(m_in_streams);
114
115        for (const TrexStream *in_stream : m_in_streams) {
116            TrexStream *out_stream = in_stream->clone(true);
117
118            m_out_streams.push_back(out_stream);
119
120            get_stateless_obj()->m_rx_flow_stat.start_stream(out_stream);
121
122        }
123    }
124
125    void set_status(bool status) {
126        m_success = status;
127    }
128
129    vector<TrexStream *> &get_streams() {
130        return m_out_streams;
131    }
132
133    /**
134     * RAII
135     */
136    ~StreamsFeeder() {
137        for (int i = 0; i < m_out_streams.size(); i++) {
138            TrexStream *out_stream = m_out_streams[i];
139            TrexStream *in_stream  = m_in_streams[i];
140
141            if (m_success) {
142                /* success path */
143                get_stateless_obj()->m_rx_flow_stat.copy_state(out_stream, in_stream);
144            } else {
145                /* fail path */
146                get_stateless_obj()->m_rx_flow_stat.stop_stream(out_stream);
147            }
148            delete out_stream;
149        }
150    }
151
152private:
153    vector<TrexStream *>  m_in_streams;
154    vector<TrexStream *>  m_out_streams;
155    bool                  m_success;
156
157    TrexStatelessPort    *m_port;
158};
159
160
161/***************************
162 * trex stateless port
163 *
164 **************************/
165TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api) : m_dp_events(this) {
166    std::vector<std::pair<uint8_t, uint8_t>> core_pair_list;
167
168    m_port_id             = port_id;
169    m_port_state          = PORT_STATE_IDLE;
170    m_platform_api        = api;
171    m_is_service_mode_on  = false;
172
173    /* get the platform specific data */
174    api->get_interface_info(port_id, m_api_info);
175
176    /* get RX caps */
177    uint16_t ip_id_base;
178    api->get_interface_stat_info(port_id, m_rx_count_num, m_rx_caps, ip_id_base);
179
180    /* get the DP cores belonging to this port */
181    api->port_id_to_cores(m_port_id, core_pair_list);
182
183    for (auto core_pair : core_pair_list) {
184
185        /* send the core id */
186        m_cores_id_list.push_back(core_pair.first);
187    }
188
189    m_graph_obj = NULL;
190
191    m_pending_async_stop_event = TrexDpPortEvents::INVALID_ID;
192}
193
194TrexStatelessPort::~TrexStatelessPort() {
195
196    stop_traffic();
197    remove_and_delete_all_streams();
198}
199
200/**
201 * acquire the port
202 *
203 * @author imarom (09-Nov-15)
204 *
205 * @param user
206 * @param force
207 */
208void
209TrexStatelessPort::acquire(const std::string &user, uint32_t session_id, bool force) {
210
211    bool used_force = !get_owner().is_free() && force;
212
213    if (get_owner().is_free() || force) {
214        get_owner().own(user, session_id);
215
216    } else {
217        /* not same user or session id and not force - report error */
218        if (get_owner().get_name() == user) {
219            throw TrexException("port is already owned by another session of '" + user + "'");
220        } else {
221            throw TrexException("port is already taken by '" + get_owner().get_name() + "'");
222        }
223    }
224
225    Json::Value data;
226
227    data["port_id"]    = m_port_id;
228    data["who"]        = user;
229    data["session_id"] = session_id;
230    data["force"]      = used_force;
231
232    get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_ACQUIRED, data);
233
234}
235
236void
237TrexStatelessPort::release(void) {
238
239
240    Json::Value data;
241
242    data["port_id"]    = m_port_id;
243    data["who"]        = get_owner().get_name();
244    data["session_id"] = get_owner().get_session_id();
245
246    get_owner().release();
247
248    get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_RELEASED, data);
249}
250
251/**
252 * starts the traffic on the port
253 *
254 */
255void
256TrexStatelessPort::start_traffic(const TrexPortMultiplier &mul, double duration, bool force, uint64_t core_mask) {
257
258    /* command allowed only on state stream */
259    verify_state(PORT_STATE_STREAMS, "start");
260
261    /* just making sure no leftovers... */
262    delete_streams_graph();
263
264    /* on start - we can only provide absolute values */
265    assert(mul.m_op == TrexPortMultiplier::OP_ABS);
266
267    /* check link state */
268    if ( !m_platform_api->getPortAttrObj(m_port_id)->is_link_up() && !force ) {
269        throw TrexException("Link state is DOWN.");
270    }
271
272    /* caclulate the effective factor for DP */
273    double factor = calculate_effective_factor(mul, force);
274
275    StreamsFeeder feeder(this);
276    feeder.feed();
277
278    /* compiler it */
279    std::vector<TrexStreamsCompiledObj *> compiled_objs;
280    std::string fail_msg;
281
282    TrexStreamsCompiler compiler;
283    TrexDPCoreMask mask(get_dp_core_count(), core_mask);
284
285    bool rc = compiler.compile(m_port_id,
286                               feeder.get_streams(),
287                               compiled_objs,
288                               mask,
289                               factor,
290                               &fail_msg);
291
292    if (!rc) {
293        feeder.set_status(false);
294        throw TrexException(fail_msg);
295    }
296
297    feeder.set_status(true);
298
299    /* generate a message to all the relevant DP cores to stop transmitting */
300    assert(m_pending_async_stop_event == TrexDpPortEvents::INVALID_ID);
301    m_pending_async_stop_event = m_dp_events.create_event(new AsyncStopEvent());
302
303    /* update object status */
304    m_factor = factor;
305    m_last_all_streams_continues = compiled_objs[mask.get_active_cores()[0]]->get_all_streams_continues();
306    m_last_duration = duration;
307
308    change_state(PORT_STATE_TX);
309
310    /* update the DP - messages will be freed by the DP */
311    int index = 0;
312    for (auto core_id : m_cores_id_list) {
313
314        /* was the core assigned a compiled object ? */
315        if (compiled_objs[index]) {
316            TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(m_port_id,
317                                                                             m_pending_async_stop_event,
318                                                                             compiled_objs[index],
319                                                                             duration);
320            send_message_to_dp(core_id, start_msg);
321        } else {
322
323            /* mimic an end event */
324            m_dp_events.on_core_reporting_in(m_pending_async_stop_event, core_id);
325        }
326
327        index++;
328    }
329
330    /* for debug - this can be turn on */
331    //m_dp_events.barrier();
332
333    /* update subscribers */
334    Json::Value data;
335    data["port_id"] = m_port_id;
336    get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_STARTED, data);
337
338}
339
340
341bool TrexStatelessPort::is_active() const {
342    return   (  (m_port_state == PORT_STATE_TX)
343             || (m_port_state == PORT_STATE_PAUSE)
344             || (m_port_state == PORT_STATE_PCAP_TX)
345             );
346}
347
348/**
349 * stop traffic on port
350 *
351 * @author imarom (09-Nov-15)
352 *
353 * @return TrexStatelessPort::rc_e
354 */
355void
356TrexStatelessPort::stop_traffic(void) {
357    if (!is_active()) {
358        return;
359    }
360
361    /* delete any previous graphs */
362    delete_streams_graph();
363
364    /* to avoid race, first destroy any previous stop/pause events */
365    if (m_pending_async_stop_event != TrexDpPortEvents::INVALID_ID) {
366        m_dp_events.destroy_event(m_pending_async_stop_event);
367        m_pending_async_stop_event = TrexDpPortEvents::INVALID_ID;
368
369    }
370
371    /* generate a message to all the relevant DP cores to start transmitting */
372    TrexStatelessCpToDpMsgBase *stop_msg = new TrexStatelessDpStop(m_port_id);
373    send_message_to_all_dp(stop_msg);
374
375    /* a barrier - make sure all the DP cores stopped */
376    m_dp_events.barrier();
377
378    change_state(PORT_STATE_STREAMS);
379
380    common_port_stop_actions(false);
381}
382
383/**
384 * remove all RX filters from port
385 *
386 * @author imarom (28-Mar-16)
387 */
388void
389TrexStatelessPort::remove_rx_filters(void) {
390    /* only valid when IDLE or with streams and not TXing */
391    verify_state(PORT_STATE_STREAMS, "remove_rx_filters");
392
393    for (auto entry : m_stream_table) {
394        get_stateless_obj()->m_rx_flow_stat.stop_stream(entry.second);
395    }
396
397}
398
399/**
400 * when a port stops, perform various actions
401 *
402 */
403void
404TrexStatelessPort::common_port_stop_actions(bool async) {
405
406    Json::Value data;
407    data["port_id"] = m_port_id;
408
409    if (async) {
410        get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_FINISHED_TX, data);
411    } else {
412        get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_STOPPED, data);
413    }
414
415}
416
417/**
418 * core is considered active if it has a pending for async stop
419 *
420 */
421bool
422TrexStatelessPort::is_core_active(int core_id) {
423    return ( (m_pending_async_stop_event != TrexDpPortEvents::INVALID_ID) &&
424             (m_dp_events.is_core_pending_on_event(m_pending_async_stop_event, core_id))
425           );
426}
427
428void
429TrexStatelessPort::pause_traffic(void) {
430
431    verify_state(PORT_STATE_TX, "pause");
432
433    if (m_last_all_streams_continues == false) {
434        throw TrexException(" pause is supported when all streams are in continues mode ");
435    }
436
437    if ( m_last_duration>0.0 ) {
438        throw TrexException(" pause is supported when duration is not enable is start command ");
439    }
440
441    /* send a pause message */
442    TrexStatelessCpToDpMsgBase *pause_msg = new TrexStatelessDpPause(m_port_id);
443
444    /* send message to all cores */
445    send_message_to_all_dp(pause_msg, true);
446
447    /* make sure all DP cores paused */
448    m_dp_events.barrier();
449
450    /* change state */
451    change_state(PORT_STATE_PAUSE);
452
453    Json::Value data;
454    data["port_id"] = m_port_id;
455    get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_PAUSED, data);
456}
457
458
459void
460TrexStatelessPort::resume_traffic(void) {
461
462    verify_state(PORT_STATE_PAUSE, "resume");
463
464    /* generate a message to all the relevant DP cores to start transmitting */
465    TrexStatelessCpToDpMsgBase *resume_msg = new TrexStatelessDpResume(m_port_id);
466
467    send_message_to_all_dp(resume_msg, true);
468    change_state(PORT_STATE_TX);
469
470    Json::Value data;
471    data["port_id"] = m_port_id;
472    get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_RESUMED, data);
473}
474
475void
476TrexStatelessPort::update_traffic(const TrexPortMultiplier &mul, bool force) {
477
478    double factor;
479
480    verify_state(PORT_STATE_TX | PORT_STATE_PAUSE, "update");
481
482    /* generate a message to all the relevant DP cores to start transmitting */
483    double new_factor = calculate_effective_factor(mul, force);
484
485    switch (mul.m_op) {
486    case TrexPortMultiplier::OP_ABS:
487        factor = new_factor / m_factor;
488        break;
489
490    case TrexPortMultiplier::OP_ADD:
491        factor = (m_factor + new_factor) / m_factor;
492        break;
493
494    case TrexPortMultiplier::OP_SUB:
495        factor = (m_factor - new_factor) / m_factor;
496        if (factor <= 0) {
497            throw TrexException("Update request will lower traffic to less than zero");
498        }
499        break;
500
501    default:
502        assert(0);
503        break;
504    }
505
506    TrexStatelessCpToDpMsgBase *update_msg = new TrexStatelessDpUpdate(m_port_id, factor);
507    send_message_to_all_dp(update_msg, true);
508
509    m_factor *= factor;
510
511}
512
513void
514TrexStatelessPort::push_remote(const std::string &pcap_filename,
515                               double ipg_usec,
516                               double min_ipg_sec,
517                               double speedup,
518                               uint32_t count,
519                               double duration,
520                               bool is_dual) {
521
522    /* command allowed only on state stream */
523    verify_state(PORT_STATE_IDLE | PORT_STATE_STREAMS, "push_remote");
524
525    /* check that file exists */
526    std::stringstream ss;
527    CCapReaderBase *reader = CCapReaderFactory::CreateReader((char *)pcap_filename.c_str(), 0, ss);
528    if (!reader) {
529        throw TrexException(ss.str());
530    }
531
532    if ( (is_dual) && (reader->get_type() != ERF) ) {
533        throw TrexException("dual mode is only supported on ERF format");
534    }
535    delete reader;
536
537    /* only one core gets to play */
538    int tx_core = m_cores_id_list[0];
539
540    /* create async event */
541    assert(m_pending_async_stop_event == TrexDpPortEvents::INVALID_ID);
542    m_pending_async_stop_event = m_dp_events.create_event(new AsyncStopEvent());
543
544    /* mark all other cores as done */
545    for (int index = 1; index < m_cores_id_list.size(); index++) {
546        /* mimic an end event */
547        m_dp_events.on_core_reporting_in(m_pending_async_stop_event, m_cores_id_list[index]);
548    }
549
550    /* send a message to core */
551    change_state(PORT_STATE_PCAP_TX);
552    TrexStatelessCpToDpMsgBase *push_msg = new TrexStatelessDpPushPCAP(m_port_id,
553                                                                       m_pending_async_stop_event,
554                                                                       pcap_filename,
555                                                                       ipg_usec,
556                                                                       min_ipg_sec,
557                                                                       speedup,
558                                                                       count,
559                                                                       duration,
560                                                                       is_dual);
561    send_message_to_dp(tx_core, push_msg);
562
563    /* update subscribers */
564    Json::Value data;
565    data["port_id"] = m_port_id;
566    get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_STARTED, data);
567}
568
569std::string
570TrexStatelessPort::get_state_as_string() const {
571
572    switch (get_state()) {
573    case PORT_STATE_DOWN:
574        return "DOWN";
575
576    case PORT_STATE_IDLE:
577        return  "IDLE";
578
579    case PORT_STATE_STREAMS:
580        return "STREAMS";
581
582    case PORT_STATE_TX:
583        return "TX";
584
585    case PORT_STATE_PAUSE:
586        return "PAUSE";
587
588    case PORT_STATE_PCAP_TX:
589        return "PCAP_TX";
590    }
591
592    return "UNKNOWN";
593}
594
595int
596TrexStatelessPort::get_max_stream_id() const {
597    return m_stream_table.get_max_stream_id();
598}
599
600void
601TrexStatelessPort::get_properties(std::string &driver) {
602
603    driver = m_api_info.driver_name;
604}
605
606bool
607TrexStatelessPort::verify_state(int state, const char *cmd_name, bool should_throw) const {
608    if ( (state & m_port_state) == 0 ) {
609        if (should_throw) {
610            std::stringstream ss;
611            ss << "command '" << cmd_name << "' cannot be executed on current state: '" << get_state_as_string() << "'";
612            throw TrexException(ss.str());
613        } else {
614            return false;
615        }
616    }
617
618    return true;
619}
620
621void
622TrexStatelessPort::change_state(port_state_e new_state) {
623
624    m_port_state = new_state;
625}
626
627
628void
629TrexStatelessPort::encode_stats(Json::Value &port) {
630
631    TrexPlatformInterfaceStats stats;
632    m_platform_api->get_interface_stats(m_port_id, stats);
633
634    port["tx_bps"]          = stats.m_stats.m_tx_bps;
635    port["rx_bps"]          = stats.m_stats.m_rx_bps;
636
637    port["tx_pps"]          = stats.m_stats.m_tx_pps;
638    port["rx_pps"]          = stats.m_stats.m_rx_pps;
639
640    port["total_tx_pkts"]   = Json::Value::UInt64(stats.m_stats.m_total_tx_pkts);
641    port["total_rx_pkts"]   = Json::Value::UInt64(stats.m_stats.m_total_rx_pkts);
642
643    port["total_tx_bytes"]  = Json::Value::UInt64(stats.m_stats.m_total_tx_bytes);
644    port["total_rx_bytes"]  = Json::Value::UInt64(stats.m_stats.m_total_rx_bytes);
645
646    port["tx_rx_errors"]    = Json::Value::UInt64(stats.m_stats.m_tx_rx_errors);
647}
648
649void
650TrexStatelessPort::send_message_to_all_dp(TrexStatelessCpToDpMsgBase *msg, bool send_to_active_only) {
651
652    for (auto core_id : m_cores_id_list) {
653
654        /* skip non active cores if requested */
655        if ( (send_to_active_only) && (!is_core_active(core_id)) ) {
656            continue;
657        }
658
659        send_message_to_dp(core_id, msg->clone());
660    }
661
662    /* original was not sent - delete it */
663    delete msg;
664}
665
666void
667TrexStatelessPort::send_message_to_dp(uint8_t core_id, TrexStatelessCpToDpMsgBase *msg) {
668
669    /* send the message to the core */
670    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingCpToDp(core_id);
671    ring->Enqueue((CGenNode *)msg);
672}
673
674void
675TrexStatelessPort::send_message_to_rx(TrexStatelessCpToRxMsgBase *msg) {
676
677    /* send the message to the core */
678    CNodeRing *ring = CMsgIns::Ins()->getCpRx()->getRingCpToDp(0);
679    ring->Enqueue((CGenNode *)msg);
680}
681
682uint64_t
683TrexStatelessPort::get_port_speed_bps() const {
684    return (uint64_t) m_platform_api->getPortAttrObj(m_port_id)->get_link_speed() * 1000 * 1000;
685}
686
687static inline double
688bps_to_gbps(double bps) {
689    return (bps / (1000.0 * 1000 * 1000));
690}
691
692double
693TrexStatelessPort::calculate_effective_factor(const TrexPortMultiplier &mul, bool force) {
694
695    double factor = calculate_effective_factor_internal(mul);
696
697    /* did we exceeded the max L1 line rate ? */
698    double expected_l1_rate = m_graph_obj->get_max_bps_l1(factor);
699
700
701    /* L1 BW must be positive */
702    if (expected_l1_rate <= 0){
703        stringstream ss;
704        ss << "Effective bandwidth must be positive, got: " << expected_l1_rate;
705        throw TrexException(ss.str());
706    }
707
708    /* factor must be positive */
709    if (factor <= 0) {
710        stringstream ss;
711        ss << "Factor must be positive, got: " << factor;
712        throw TrexException(ss.str());
713    }
714
715    /* if force simply return the value */
716    if (force) {
717        return factor;
718    } else {
719
720        /* due to float calculations we allow 0.1% roundup */
721        if ( (expected_l1_rate / get_port_speed_bps()) > 1.0001 )  {
722            stringstream ss;
723            ss << "Expected L1 B/W: '" << bps_to_gbps(expected_l1_rate) << " Gbps' exceeds port line rate: '" << bps_to_gbps(get_port_speed_bps()) << " Gbps'";
724            throw TrexException(ss.str());
725        }
726
727        /* in any case, without force, do not return any value higher than the max factor */
728        double max_factor = m_graph_obj->get_factor_bps_l1(get_port_speed_bps());
729        return std::min(max_factor, factor);
730    }
731
732}
733
734double
735TrexStatelessPort::calculate_effective_factor_internal(const TrexPortMultiplier &mul) {
736
737    /* we now need the graph - generate it if we don't have it (happens once) */
738    if (!m_graph_obj) {
739        generate_streams_graph();
740    }
741
742    switch (mul.m_type) {
743
744    case TrexPortMultiplier::MUL_FACTOR:
745        return (mul.m_value);
746
747    case TrexPortMultiplier::MUL_BPS:
748        return m_graph_obj->get_factor_bps_l2(mul.m_value);
749
750    case TrexPortMultiplier::MUL_BPSL1:
751        return m_graph_obj->get_factor_bps_l1(mul.m_value);
752
753    case TrexPortMultiplier::MUL_PPS:
754        return m_graph_obj->get_factor_pps(mul.m_value);
755
756    case TrexPortMultiplier::MUL_PERCENTAGE:
757        /* if abs percentage is from the line speed - otherwise its from the current speed */
758
759        if (mul.m_op == TrexPortMultiplier::OP_ABS) {
760            double required = (mul.m_value / 100.0) * get_port_speed_bps();
761            return m_graph_obj->get_factor_bps_l1(required);
762        } else {
763            return (m_factor * (mul.m_value / 100.0));
764        }
765
766    default:
767        assert(0);
768    }
769
770}
771
772
773void
774TrexStatelessPort::generate_streams_graph() {
775
776    /* dispose of the old one */
777    if (m_graph_obj) {
778        delete_streams_graph();
779    }
780
781    /* fetch all the streams from the table */
782    vector<TrexStream *> streams;
783    get_object_list(streams);
784
785    TrexStreamsGraph graph;
786    m_graph_obj = graph.generate(streams);
787}
788
789void
790TrexStatelessPort::delete_streams_graph() {
791    if (m_graph_obj) {
792        delete m_graph_obj;
793        m_graph_obj = NULL;
794    }
795}
796
797
798
799/***************************
800 * port multiplier
801 *
802 **************************/
803const std::initializer_list<std::string> TrexPortMultiplier::g_types = {"raw", "bps", "bpsl1", "pps", "percentage"};
804const std::initializer_list<std::string> TrexPortMultiplier::g_ops   = {"abs", "add", "sub"};
805
806TrexPortMultiplier::
807TrexPortMultiplier(const std::string &type_str, const std::string &op_str, double value) {
808    mul_type_e type;
809    mul_op_e   op;
810
811    if (type_str == "raw") {
812        type = MUL_FACTOR;
813
814    } else if (type_str == "bps") {
815        type = MUL_BPS;
816
817    } else if (type_str == "bpsl1") {
818        type = MUL_BPSL1;
819
820    } else if (type_str == "pps") {
821        type = MUL_PPS;
822
823    } else if (type_str == "percentage") {
824        type = MUL_PERCENTAGE;
825    } else {
826        throw TrexException("bad type str: " + type_str);
827    }
828
829    if (op_str == "abs") {
830        op = OP_ABS;
831
832    } else if (op_str == "add") {
833        op = OP_ADD;
834
835    } else if (op_str == "sub") {
836        op = OP_SUB;
837
838    } else {
839        throw TrexException("bad op str: " + op_str);
840    }
841
842    m_type  = type;
843    m_op    = op;
844    m_value = value;
845
846}
847
848const TrexStreamsGraphObj *
849TrexStatelessPort::validate(void) {
850
851    /* first compile the graph */
852
853    vector<TrexStream *> streams;
854    get_object_list(streams);
855
856    if (streams.size() == 0) {
857        throw TrexException("no streams attached to port");
858    }
859
860    TrexStreamsCompiler compiler;
861
862    /* TODO: think of this mask...*/
863    TrexDPCoreMask core_mask(get_dp_core_count(), TrexDPCoreMask::MASK_ALL);
864
865    std::vector<TrexStreamsCompiledObj *> compiled_objs;
866
867    std::string fail_msg;
868    bool rc = compiler.compile(m_port_id,
869                               streams,
870                               compiled_objs,
871                               core_mask,
872                               1.0,
873                               &fail_msg);
874    if (!rc) {
875        throw TrexException(fail_msg);
876    }
877
878    for (auto obj : compiled_objs) {
879        delete obj;
880    }
881
882    /* now create a stream graph */
883    if (!m_graph_obj) {
884        generate_streams_graph();
885    }
886
887    return m_graph_obj;
888}
889
890
891
892void
893TrexStatelessPort::get_port_effective_rate(double &pps,
894                                           double &bps_L1,
895                                           double &bps_L2,
896                                           double &percentage) {
897
898    if (get_stream_count() == 0) {
899        return;
900    }
901
902    if (!m_graph_obj) {
903        generate_streams_graph();
904    }
905
906    pps        = m_graph_obj->get_max_pps(m_factor);
907    bps_L1     = m_graph_obj->get_max_bps_l1(m_factor);
908    bps_L2     = m_graph_obj->get_max_bps_l2(m_factor);
909    percentage = (bps_L1 / get_port_speed_bps()) * 100.0;
910
911}
912
913void
914TrexStatelessPort::get_pci_info(std::string &pci_addr, int &numa_node) {
915    pci_addr  = m_api_info.pci_addr;
916    numa_node = m_api_info.numa_node;
917}
918
919void
920TrexStatelessPort::get_hw_mac(std::string &hw_mac) {
921    utl_macaddr_to_str(m_api_info.hw_macaddr, hw_mac);
922}
923
924void
925TrexStatelessPort::add_stream(TrexStream *stream) {
926
927    verify_state(PORT_STATE_IDLE | PORT_STATE_STREAMS, "add_stream");
928
929    if (m_stream_table.size() >= MAX_STREAMS) {
930        throw TrexException("Reached limit of " + std::to_string(MAX_STREAMS) + " streams at the port.");
931    }
932    get_stateless_obj()->m_rx_flow_stat.add_stream(stream);
933
934    m_stream_table.add_stream(stream);
935    delete_streams_graph();
936
937    change_state(PORT_STATE_STREAMS);
938}
939
940void
941TrexStatelessPort::remove_stream(TrexStream *stream) {
942
943    verify_state(PORT_STATE_STREAMS, "remove_stream");
944
945    get_stateless_obj()->m_rx_flow_stat.del_stream(stream);
946
947    m_stream_table.remove_stream(stream);
948    delete_streams_graph();
949
950    if (m_stream_table.size() == 0) {
951        change_state(PORT_STATE_IDLE);
952    }
953}
954
955void
956TrexStatelessPort::remove_and_delete_all_streams() {
957    verify_state(PORT_STATE_IDLE | PORT_STATE_STREAMS, "remove_and_delete_all_streams");
958
959    vector<TrexStream *> streams;
960    get_object_list(streams);
961
962    for (auto stream : streams) {
963        remove_stream(stream);
964        delete stream;
965    }
966}
967
968/**
969 * enable/disable service mode
970 * sends a query to the RX core
971 *
972 */
973void
974TrexStatelessPort::set_service_mode(bool enabled) {
975    static MsgReply<TrexStatelessRxQuery::query_rc_e> reply;
976    reply.reset();
977
978    TrexStatelessRxQuery::query_type_e query_type = (enabled ? TrexStatelessRxQuery::SERVICE_MODE_ON : TrexStatelessRxQuery::SERVICE_MODE_OFF);
979
980    TrexStatelessRxQuery *msg = new TrexStatelessRxQuery(m_port_id, query_type, reply);
981    send_message_to_rx( (TrexStatelessCpToRxMsgBase *)msg );
982
983    TrexStatelessRxQuery::query_rc_e rc = reply.wait_for_reply();
984
985    switch (rc) {
986    case TrexStatelessRxQuery::RC_OK:
987        if (enabled) {
988            getPortAttrObj()->set_rx_filter_mode(RX_FILTER_MODE_ALL);
989        } else {
990            getPortAttrObj()->set_rx_filter_mode(RX_FILTER_MODE_HW);
991        }
992        m_is_service_mode_on = enabled;
993        break;
994
995    case TrexStatelessRxQuery::RC_FAIL_RX_QUEUE_ACTIVE:
996        throw TrexException("unable to disable service mode - please remove RX queue");
997
998    case TrexStatelessRxQuery::RC_FAIL_CAPTURE_ACTIVE:
999        throw TrexException("unable to disable service mode - an active capture on port " + std::to_string(m_port_id) + " exists");
1000
1001    default:
1002        assert(0);
1003    }
1004
1005    /* update the all the relevant dp cores to move to service mode */
1006    TrexStatelessDpServiceMode *dp_msg = new TrexStatelessDpServiceMode(m_port_id, enabled);
1007    send_message_to_all_dp(dp_msg);
1008}
1009
1010
1011void
1012TrexStatelessPort::start_rx_queue(uint64_t size) {
1013    static MsgReply<bool> reply;
1014
1015    reply.reset();
1016
1017    TrexStatelessRxStartQueue *msg = new TrexStatelessRxStartQueue(m_port_id, size, reply);
1018    send_message_to_rx( (TrexStatelessCpToRxMsgBase *)msg );
1019
1020    /* we cannot return ACK to the user until the RX core has approved
1021       this might cause the user to lose some packets from the queue
1022     */
1023    reply.wait_for_reply();
1024}
1025
1026void
1027TrexStatelessPort::stop_rx_queue() {
1028    TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStopQueue(m_port_id);
1029    send_message_to_rx(msg);
1030}
1031
1032
1033const TrexPktBuffer *
1034TrexStatelessPort::get_rx_queue_pkts() {
1035    static MsgReply<const TrexPktBuffer *> reply;
1036
1037    reply.reset();
1038
1039    TrexStatelessRxQueueGetPkts *msg = new TrexStatelessRxQueueGetPkts(m_port_id, reply);
1040    send_message_to_rx( (TrexStatelessCpToRxMsgBase *)msg );
1041
1042    return reply.wait_for_reply();
1043}
1044
1045
1046/**
1047 * configures port in L2 mode
1048 *
1049 */
1050void
1051TrexStatelessPort::set_l2_mode(const uint8_t *dest_mac) {
1052
1053    /* not valid under traffic */
1054    verify_state(PORT_STATE_IDLE | PORT_STATE_STREAMS, "set_l2_mode");
1055
1056    /* configure port attributes for L2 */
1057    getPortAttrObj()->set_l2_mode(dest_mac);
1058
1059    /* update RX core */
1060    TrexStatelessRxSetL2Mode *msg = new TrexStatelessRxSetL2Mode(m_port_id);
1061    send_message_to_rx( (TrexStatelessCpToRxMsgBase *)msg );
1062}
1063
1064/**
1065 * configures port in L3 mode - unresolved
1066 */
1067void
1068TrexStatelessPort::set_l3_mode(uint32_t src_ipv4, uint32_t dest_ipv4) {
1069
1070    /* not valid under traffic */
1071    verify_state(PORT_STATE_IDLE | PORT_STATE_STREAMS, "set_l3_mode");
1072
1073    /* configure port attributes with L3 */
1074    getPortAttrObj()->set_l3_mode(src_ipv4, dest_ipv4);
1075
1076    /* send RX core the relevant info */
1077    CManyIPInfo ip_info;
1078    ip_info.insert(COneIPv4Info(src_ipv4, 0, getPortAttrObj()->get_layer_cfg().get_ether().get_src()));
1079
1080    TrexStatelessRxSetL3Mode *msg = new TrexStatelessRxSetL3Mode(m_port_id, ip_info, false);
1081    send_message_to_rx( (TrexStatelessCpToRxMsgBase *)msg );
1082}
1083
1084/**
1085 * configures port in L3 mode - resolved
1086 *
1087 */
1088void
1089TrexStatelessPort::set_l3_mode(uint32_t src_ipv4, uint32_t dest_ipv4, const uint8_t *resolved_mac) {
1090
1091    verify_state(PORT_STATE_IDLE | PORT_STATE_STREAMS, "set_l3_mode");
1092
1093    /* configure port attributes with L3 */
1094    getPortAttrObj()->set_l3_mode(src_ipv4, dest_ipv4, resolved_mac);
1095
1096    /* send RX core the relevant info */
1097    CManyIPInfo ip_info;
1098    ip_info.insert(COneIPv4Info(src_ipv4, 0, getPortAttrObj()->get_layer_cfg().get_ether().get_src()));
1099
1100    bool is_grat_arp_needed = !getPortAttrObj()->is_loopback();
1101
1102    TrexStatelessRxSetL3Mode *msg = new TrexStatelessRxSetL3Mode(m_port_id, ip_info, is_grat_arp_needed);
1103    send_message_to_rx( (TrexStatelessCpToRxMsgBase *)msg );
1104}
1105
1106
1107Json::Value
1108TrexStatelessPort::rx_features_to_json() {
1109    static MsgReply<Json::Value> reply;
1110
1111    reply.reset();
1112
1113    TrexStatelessRxFeaturesToJson *msg = new TrexStatelessRxFeaturesToJson(m_port_id, reply);
1114    send_message_to_rx( (TrexStatelessCpToRxMsgBase *)msg );
1115
1116    return reply.wait_for_reply();
1117}
1118
1119/************* Trex Port Owner **************/
1120
1121TrexPortOwner::TrexPortOwner() {
1122    m_is_free = true;
1123    m_session_id = 0;
1124
1125    /* for handlers random generation */
1126    m_seed = time(NULL);
1127}
1128
1129const std::string TrexPortOwner::g_unowned_name = "<FREE>";
1130const std::string TrexPortOwner::g_unowned_handler = "";
1131