trex_stateless_dp_core.cpp revision c70b71af
1/*
2 Itay Marom
3 Hanoch Haim
4 Cisco Systems, Inc.
5*/
6
7/*
8Copyright (c) 2015-2016 Cisco Systems, Inc.
9
10Licensed under the Apache License, Version 2.0 (the "License");
11you may not use this file except in compliance with the License.
12You may obtain a copy of the License at
13
14    http://www.apache.org/licenses/LICENSE-2.0
15
16Unless required by applicable law or agreed to in writing, software
17distributed under the License is distributed on an "AS IS" BASIS,
18WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19See the License for the specific language governing permissions and
20limitations under the License.
21*/
22#include "bp_sim.h"
23#include "trex_stateless_dp_core.h"
24#include "trex_stateless_messaging.h"
25#include "trex_stream.h"
26#include "trex_stream_node.h"
27#include "trex_streams_compiler.h"
28
29void CDpOneStream::Delete(CFlowGenListPerThread   * core){
30    assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE);
31    core->free_node((CGenNode *)m_node);
32    delete m_dp_stream;
33    m_node=0;
34    m_dp_stream=0;
35}
36
37void CDpOneStream::DeleteOnlyStream(){
38    assert(m_dp_stream);
39    delete m_dp_stream;
40    m_dp_stream=0;
41}
42
43int CGenNodeStateless::get_stream_id(){
44    if (m_state ==CGenNodeStateless::ss_FREE_RESUSE) {
45        return (-1); // not valid
46    }
47    assert(m_ref_stream_info);
48    return ((int)m_ref_stream_info->m_stream_id);
49}
50
51
52void CGenNodeStateless::DumpHeader(FILE *fd){
53    fprintf(fd," pkt_id, time, port , action , state, stream_id , stype , m-burst# , burst# \n");
54
55}
56void CGenNodeStateless::Dump(FILE *fd){
57    fprintf(fd," %2.4f, %3lu, %s,%s, %3d, %s, %3lu, %3lu  \n",
58            m_time,
59            (ulong)m_port_id,
60            "s-pkt", //action
61            get_stream_state_str(m_state ).c_str(),
62            get_stream_id(),   //stream_id
63            TrexStream::get_stream_type_str(m_stream_type).c_str(), //stype
64            (ulong)m_multi_bursts,
65            (ulong)m_single_burst
66            );
67}
68
69
70
71void CGenNodeStateless::refresh_vm_bss(){
72    if ( m_vm_flow_var ) {
73        StreamVmDp  * vm_s=m_ref_stream_info->m_vm_dp;
74        assert(vm_s);
75        memcpy(m_vm_flow_var,vm_s->get_bss(),vm_s->get_bss_size());
76
77        if ( vm_s->is_random_seed() ){
78            /* if we have random seed for this program */
79            if (m_ref_stream_info->m_random_seed) {
80                set_random_seed(m_ref_stream_info->m_random_seed);
81            }
82        }
83    }
84}
85
86
87/**
88 * this function called when stream restart after it was inactive
89 */
90void CGenNodeStateless::refresh(){
91
92    /* refill the stream info */
93    m_single_burst    = m_single_burst_refill;
94    m_multi_bursts    = m_ref_stream_info->m_num_bursts;
95    m_state           = CGenNodeStateless::ss_ACTIVE;
96
97    /* refresh init value */
98#if 0
99    /* TBD should add a JSON varible for that */
100    refresh_vm_bss();
101#endif
102}
103
104
105void CGenNodeCommand::free_command(){
106
107    assert(m_cmd);
108    m_cmd->on_node_remove();
109    delete m_cmd;
110}
111
112
113std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state){
114    std::string res;
115
116    switch (stream_state) {
117    case CGenNodeStateless::ss_FREE_RESUSE :
118         res="FREE    ";
119        break;
120    case CGenNodeStateless::ss_INACTIVE :
121        res="INACTIVE ";
122        break;
123    case CGenNodeStateless::ss_ACTIVE :
124        res="ACTIVE   ";
125        break;
126    default:
127        res="Unknow   ";
128    };
129    return(res);
130}
131
132
133rte_mbuf_t   * CGenNodeStateless::alloc_node_with_vm(){
134
135    rte_mbuf_t        * m;
136    /* alloc small packet buffer*/
137    uint16_t prefix_size = prefix_header_size();
138    m = CGlobalInfo::pktmbuf_alloc( get_socket_id(), prefix_size );
139    if (m==0) {
140        return (m);
141    }
142    /* TBD remove this, should handle cases of error */
143    assert(m);
144    char *p=rte_pktmbuf_append(m, prefix_size);
145    memcpy( p ,m_original_packet_data_prefix, prefix_size);
146
147
148    /* run the VM program */
149    StreamDPVmInstructionsRunner runner;
150
151    runner.run( (uint32_t*)m_vm_flow_var,
152                m_vm_program_size,
153                m_vm_program,
154                m_vm_flow_var,
155                (uint8_t*)p);
156
157    uint16_t pkt_new_size=runner.get_new_pkt_size();
158    if ( likely( pkt_new_size == 0) ) {
159        /* no packet size change */
160        rte_mbuf_t * m_const = get_const_mbuf();
161        if (  m_const != NULL) {
162            utl_rte_pktmbuf_add_after(m,m_const);
163        }
164        return (m);
165    }
166
167    /* packet size change there are a few changes */
168    rte_mbuf_t * m_const = get_const_mbuf();
169    if ( (m_const == 0 ) || (pkt_new_size<=prefix_size) ) {
170        /* one mbuf , just trim it */
171        m->data_len = pkt_new_size;
172        m->pkt_len  = pkt_new_size;
173        return (m);
174    }
175
176    rte_mbuf_t * mi= CGlobalInfo::pktmbuf_alloc_small(get_socket_id());
177    assert(mi);
178    rte_pktmbuf_attach(mi,m_const);
179    utl_rte_pktmbuf_add_after2(m,mi);
180
181    if ( pkt_new_size < m->pkt_len) {
182        /* need to trim it */
183        mi->data_len = (pkt_new_size - prefix_size);
184        m->pkt_len   = pkt_new_size;
185    }
186    return (m);
187}
188
189
190void CGenNodeStateless::free_stl_node(){
191    /* if we have cache mbuf free it */
192    rte_mbuf_t * m=get_cache_mbuf();
193    if (m) {
194        rte_pktmbuf_free(m);
195        m_cache_mbuf=0;
196    }else{
197        /* non cache - must have an header */
198         m=get_const_mbuf();
199         if (m) {
200             rte_pktmbuf_free(m); /* reduce the ref counter */
201         }
202         free_prefix_header();
203    }
204    if (m_vm_flow_var) {
205        /* free flow var */
206        free(m_vm_flow_var);
207        m_vm_flow_var=0;
208    }
209}
210
211
212bool TrexStatelessDpPerPort::update_number_of_active_streams(uint32_t d){
213    m_active_streams-=d; /* reduce the number of streams */
214    if (m_active_streams == 0) {
215        return (true);
216    }
217    return (false);
218}
219
220bool TrexStatelessDpPerPort::resume_traffic(uint8_t port_id){
221
222    /* we are working with continues streams so we must be in transmit mode */
223    assert(m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE);
224
225    for (auto dp_stream : m_active_nodes) {
226        CGenNodeStateless * node =dp_stream.m_node;
227        assert(node->get_port_id() == port_id);
228        assert(node->is_pause() == true);
229        node->set_pause(false);
230    }
231    m_state = TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
232    return (true);
233}
234
235bool TrexStatelessDpPerPort::update_traffic(uint8_t port_id, double factor) {
236
237    assert( (m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING ||
238            (m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE)) );
239
240    for (auto dp_stream : m_active_nodes) {
241        CGenNodeStateless * node = dp_stream.m_node;
242        assert(node->get_port_id() == port_id);
243
244        node->update_rate(factor);
245    }
246
247    return (true);
248}
249
250bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){
251
252    /* we are working with continues streams so we must be in transmit mode */
253    assert(m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING);
254
255    for (auto dp_stream : m_active_nodes) {
256        CGenNodeStateless * node =dp_stream.m_node;
257        assert(node->get_port_id() == port_id);
258        assert(node->is_pause() == false);
259        node->set_pause(true);
260    }
261    m_state = TrexStatelessDpPerPort::ppSTATE_PAUSE;
262    return (true);
263}
264
265
266bool TrexStatelessDpPerPort::stop_traffic(uint8_t  port_id,
267                                          bool     stop_on_id,
268                                          int      event_id){
269
270
271    if (m_state == TrexStatelessDpPerPort::ppSTATE_IDLE) {
272        assert(m_active_streams==0);
273        return false;
274    }
275
276    /* there could be race of stop after stop */
277    if ( stop_on_id ) {
278        if (event_id != m_event_id){
279            /* we can't stop it is an old message */
280            return false;
281        }
282    }
283
284    for (auto dp_stream : m_active_nodes) {
285        CGenNodeStateless * node =dp_stream.m_node;
286        assert(node->get_port_id() == port_id);
287        if ( node->get_state() == CGenNodeStateless::ss_ACTIVE) {
288            node->mark_for_free();
289            m_active_streams--;
290            dp_stream.DeleteOnlyStream();
291
292        }else{
293            dp_stream.Delete(m_core);
294        }
295    }
296
297    /* active stream should be zero */
298    assert(m_active_streams==0);
299    m_active_nodes.clear();
300    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
301    return (true);
302}
303
304
305void TrexStatelessDpPerPort::create(CFlowGenListPerThread   *  core){
306    m_core=core;
307    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
308    m_port_id=0;
309    m_active_streams=0;
310    m_active_nodes.clear();
311}
312
313
314
315void
316TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) {
317    m_thread_id = thread_id;
318    m_core = core;
319    m_local_port_offset = 2*core->getDualPortId();
320
321    CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
322
323    m_ring_from_cp = cp_dp->getRingCpToDp(thread_id);
324    m_ring_to_cp   = cp_dp->getRingDpToCp(thread_id);
325
326    m_state = STATE_IDLE;
327
328    int i;
329    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
330        m_ports[i].create(core);
331    }
332}
333
334
335/* move to the next stream, old stream move to INACTIVE */
336bool TrexStatelessDpCore::set_stateless_next_node(CGenNodeStateless * cur_node,
337                                                  CGenNodeStateless * next_node){
338
339    assert(cur_node);
340    TrexStatelessDpPerPort * lp_port = get_port_db(cur_node->m_port_id);
341    bool schedule =false;
342
343    bool to_stop_port=false;
344
345    if (next_node == NULL) {
346        /* there is no next stream , reduce the number of active streams*/
347        to_stop_port = lp_port->update_number_of_active_streams(1);
348
349    }else{
350        uint8_t state=next_node->get_state();
351
352        /* can't be FREE_RESUSE */
353        assert(state != CGenNodeStateless::ss_FREE_RESUSE);
354        if (state == CGenNodeStateless::ss_INACTIVE ) {
355
356            if (cur_node->m_action_counter > 0) {
357                cur_node->m_action_counter--;
358                if (cur_node->m_action_counter==0) {
359                    to_stop_port = lp_port->update_number_of_active_streams(1);
360                }else{
361                    /* refill start info and scedule, no update in active streams  */
362                    next_node->refresh();
363                    schedule = true;
364                }
365            }else{
366                /* refill start info and scedule, no update in active streams  */
367                next_node->refresh();
368                schedule = true;
369            }
370
371        }else{
372            to_stop_port = lp_port->update_number_of_active_streams(1);
373        }
374    }
375
376    if ( to_stop_port ) {
377        /* call stop port explictly to move the state */
378        stop_traffic(cur_node->m_port_id,false,0);
379    }
380
381    return ( schedule );
382}
383
384
385
386/**
387 * in idle state loop, the processor most of the time sleeps
388 * and periodically checks for messages
389 *
390 * @author imarom (01-Nov-15)
391 */
392void
393TrexStatelessDpCore::idle_state_loop() {
394
395    const int SHORT_DELAY_MS    = 2;
396    const int LONG_DELAY_MS     = 50;
397    const int DEEP_SLEEP_LIMIT  = 2000;
398
399    int counter = 0;
400
401    while (m_state == STATE_IDLE) {
402        m_core->m_node_gen.m_v_if->flush_dp_rx_queue();
403        bool had_msg = periodic_check_for_cp_messages();
404        if (had_msg) {
405            counter = 0;
406            continue;
407        }
408
409        /* enter deep sleep only if enough time had passed */
410        if (counter < DEEP_SLEEP_LIMIT) {
411            delay(SHORT_DELAY_MS);
412            counter++;
413        } else {
414            delay(LONG_DELAY_MS);
415        }
416
417    }
418}
419
420
421
422void TrexStatelessDpCore::quit_main_loop(){
423    m_core->set_terminate_mode(true); /* mark it as terminated */
424    m_state = STATE_TERMINATE;
425    add_global_duration(0.0001);
426}
427
428
429/**
430 * scehduler runs when traffic exists
431 * it will return when no more transmitting is done on this
432 * core
433 *
434 * @author imarom (01-Nov-15)
435 */
436void
437TrexStatelessDpCore::start_scheduler() {
438    /* creates a maintenace job using the scheduler */
439    CGenNode * node_sync = m_core->create_node() ;
440    node_sync->m_type = CGenNode::FLOW_SYNC;
441    node_sync->m_time = m_core->m_cur_time_sec + SYNC_TIME_OUT;
442    m_core->m_node_gen.add_node(node_sync);
443
444    double old_offset = 0.0;
445    m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset);
446    /* bail out in case of terminate */
447    if (m_state != TrexStatelessDpCore::STATE_TERMINATE) {
448        m_core->m_node_gen.close_file(m_core);
449        m_state = STATE_IDLE; /* we exit from all ports and we have nothing to do, we move to IDLE state */
450    }
451}
452
453
454void
455TrexStatelessDpCore::run_once(){
456
457    idle_state_loop();
458
459    if ( m_state == STATE_TERMINATE ){
460        return;
461    }
462
463    start_scheduler();
464}
465
466
467
468
469void
470TrexStatelessDpCore::start() {
471
472    while (true) {
473        run_once();
474
475        if ( m_core->is_terminated_by_master() ) {
476            break;
477        }
478    }
479}
480
481/* only if both port are idle we can exit */
482void
483TrexStatelessDpCore::schedule_exit(){
484
485    CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
486
487    node->m_type = CGenNode::COMMAND;
488
489    node->m_cmd = new TrexStatelessDpCanQuit();
490
491    /* make sure it will be scheduled after the current node */
492    node->m_time = m_core->m_cur_time_sec ;
493
494    m_core->m_node_gen.add_node((CGenNode *)node);
495}
496
497
498void
499TrexStatelessDpCore::add_global_duration(double duration){
500    if (duration > 0.0) {
501        CGenNode *node = m_core->create_node() ;
502
503        node->m_type = CGenNode::EXIT_SCHED;
504
505        /* make sure it will be scheduled after the current node */
506        node->m_time = m_core->m_cur_time_sec + duration ;
507
508        m_core->m_node_gen.add_node(node);
509    }
510}
511
512/* add per port exit */
513void
514TrexStatelessDpCore::add_port_duration(double duration,
515                                       uint8_t port_id,
516                                       int event_id){
517    if (duration > 0.0) {
518        CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
519
520        node->m_type = CGenNode::COMMAND;
521
522        /* make sure it will be scheduled after the current node */
523        node->m_time = m_core->m_cur_time_sec + duration ;
524
525        TrexStatelessDpStop * cmd=new TrexStatelessDpStop(port_id);
526
527
528        /* test this */
529        m_core->m_non_active_nodes++;
530        cmd->set_core_ptr(m_core);
531        cmd->set_event_id(event_id);
532        cmd->set_wait_for_event_id(true);
533
534        node->m_cmd = cmd;
535
536        m_core->m_node_gen.add_node((CGenNode *)node);
537    }
538}
539
540
541void TrexStatelessDpCore::update_mac_addr(TrexStream * stream,
542                                          CGenNodeStateless *node,
543                                          pkt_dir_t dir,
544                                          char *raw_pkt){
545    bool              ov_src = stream->get_override_src_mac_by_pkt_data();
546    TrexStream::stream_dst_mac_t  ov_dst = stream->get_override_dst_mac_mode();
547
548
549    if ( (ov_src == true) && (ov_dst == TrexStream::stPKT) ) {
550        /* nothing to do, take from the packet both */
551        return;
552    }
553
554        /* take from cfg_file */
555    if ( (ov_src == false) &&
556         (ov_dst == TrexStream::stCFG_FILE) ){
557
558          m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*)raw_pkt);
559          return;
560    }
561
562    /* save the pkt*/
563    char tmp_pkt[12];
564    memcpy(tmp_pkt,raw_pkt,12);
565
566    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*)raw_pkt);
567
568    if ((ov_src == true) && (ov_dst == TrexStream::stCFG_FILE)) {
569        memcpy(raw_pkt+6,tmp_pkt+6,6);
570    }
571
572    if ((ov_src == false) && (ov_dst == TrexStream::stPKT)) {
573        memcpy(raw_pkt,tmp_pkt,6);
574    }
575}
576
577
578void
579TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port,
580                                TrexStream * stream,
581                                TrexStreamsCompiledObj *comp) {
582    CGenNodeStateless *node = m_core->create_node_sl();
583
584    /* add periodic */
585    node->m_cache_mbuf=0;
586    node->m_type = CGenNode::STATELESS_PKT;
587
588    node->m_action_counter = stream->m_action_count;
589
590    /* clone the stream from control plane memory to DP memory */
591    node->m_ref_stream_info = stream->clone();
592    /* no need for this memory anymore on the control plane memory */
593    stream->release_dp_object();
594
595    node->m_next_stream=0; /* will be fixed later */
596
597    if ( stream->m_self_start ){
598        /* if self start it is in active mode */
599        node->m_state =CGenNodeStateless::ss_ACTIVE;
600        lp_port->m_active_streams++;
601    }else{
602        node->m_state =CGenNodeStateless::ss_INACTIVE;
603    }
604
605    node->m_time = m_core->m_cur_time_sec + usec_to_sec(stream->m_isg_usec);
606
607    pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(stream->m_port_id);
608    node->m_flags = 0;
609    node->m_src_port =0;
610    node->m_original_packet_data_prefix = 0;
611
612    if (stream->m_rx_check.m_enabled) {
613        node->set_stat_needed();
614        uint8_t hw_id = stream->m_rx_check.m_hw_id;
615        assert (hw_id < MAX_FLOW_STATS);
616        node->set_stat_hw_id(hw_id);
617    }
618
619    /* set socket id */
620    node->set_socket_id(m_core->m_node_gen.m_socket_id);
621
622    /* build a mbuf from a packet */
623
624    uint16_t pkt_size = stream->m_pkt.len;
625    const uint8_t *stream_pkt = stream->m_pkt.binary;
626
627    node->m_pause =0;
628    node->m_stream_type = stream->m_type;
629    node->m_next_time_offset = 1.0 / stream->get_pps();
630
631    /* stateless specific fields */
632    switch ( stream->m_type ) {
633
634    case TrexStream::stCONTINUOUS :
635        node->m_single_burst=0;
636        node->m_single_burst_refill=0;
637        node->m_multi_bursts=0;
638        break;
639
640    case TrexStream::stSINGLE_BURST :
641        node->m_stream_type             = TrexStream::stMULTI_BURST;
642        node->m_single_burst            = stream->m_burst_total_pkts;
643        node->m_single_burst_refill     = stream->m_burst_total_pkts;
644        node->m_multi_bursts            = 1;  /* single burst in multi burst of 1 */
645        break;
646
647    case TrexStream::stMULTI_BURST :
648        node->m_single_burst        = stream->m_burst_total_pkts;
649        node->m_single_burst_refill = stream->m_burst_total_pkts;
650        node->m_multi_bursts        = stream->m_num_bursts;
651        break;
652    default:
653
654        assert(0);
655    };
656
657    node->m_port_id = stream->m_port_id;
658
659    /* set dir 0 or 1 client or server */
660    node->set_mbuf_cache_dir(dir);
661
662
663    if (node->m_ref_stream_info->getDpVm() == NULL) {
664        /* no VM */
665
666        node->m_vm_flow_var =  NULL;
667        node->m_vm_program  =  NULL;
668        node->m_vm_program_size =0;
669
670                /* allocate const mbuf */
671        rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size);
672        assert(m);
673
674        char *p = rte_pktmbuf_append(m, pkt_size);
675        assert(p);
676        /* copy the packet */
677        memcpy(p,stream_pkt,pkt_size);
678
679        update_mac_addr(stream,node,dir,p);
680
681        /* set the packet as a readonly */
682        node->set_cache_mbuf(m);
683
684        node->m_original_packet_data_prefix =0;
685    }else{
686
687        /* set the program */
688        TrexStream * local_mem_stream = node->m_ref_stream_info;
689
690        StreamVmDp  * lpDpVm = local_mem_stream->getDpVm();
691
692        node->m_vm_flow_var      = lpDpVm->clone_bss(); /* clone the flow var */
693        node->m_vm_program       = lpDpVm->get_program(); /* same ref to the program */
694        node->m_vm_program_size  = lpDpVm->get_program_size();
695
696
697        /* set the random seed if was set */
698        if ( lpDpVm->is_random_seed() ){
699            /* if we have random seed for this program */
700            if (stream->m_random_seed) {
701                node->set_random_seed(stream->m_random_seed);
702            }
703        }
704
705        /* we need to copy the object */
706        if ( pkt_size > lpDpVm->get_prefix_size() ) {
707            /* we need const packet */
708            uint16_t const_pkt_size  = pkt_size - lpDpVm->get_prefix_size() ;
709            rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), const_pkt_size );
710            assert(m);
711
712            char *p = rte_pktmbuf_append(m, const_pkt_size);
713            assert(p);
714
715            /* copy packet data */
716            memcpy(p,(stream_pkt + lpDpVm->get_prefix_size()),const_pkt_size);
717
718            node->set_const_mbuf(m);
719        }
720
721
722        if ( lpDpVm->is_pkt_size_var() ) {
723            // mark the node as varible size
724            node->set_var_pkt_size();
725        }
726
727
728        if (lpDpVm->get_prefix_size() > pkt_size ) {
729            lpDpVm->set_prefix_size(pkt_size);
730        }
731
732        /* copy the headr */
733        uint16_t header_size = lpDpVm->get_prefix_size();
734        assert(header_size);
735        node->alloc_prefix_header(header_size);
736        uint8_t *p=node->m_original_packet_data_prefix;
737        assert(p);
738
739        memcpy(p,stream_pkt , header_size);
740
741        update_mac_addr(stream,node,dir,(char *)p);
742    }
743
744
745    CDpOneStream one_stream;
746
747    one_stream.m_dp_stream = node->m_ref_stream_info;
748    one_stream.m_node =node;
749
750    lp_port->m_active_nodes.push_back(one_stream);
751
752    /* schedule only if active */
753    if (node->m_state == CGenNodeStateless::ss_ACTIVE) {
754        m_core->m_node_gen.add_node((CGenNode *)node);
755    }
756}
757
758void
759TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj,
760                                   double duration,
761                                   int event_id) {
762
763
764    TrexStatelessDpPerPort * lp_port=get_port_db(obj->get_port_id());
765    lp_port->m_active_streams = 0;
766    lp_port->set_event_id(event_id);
767
768    /* no nodes in the list */
769    assert(lp_port->m_active_nodes.size()==0);
770
771    for (auto single_stream : obj->get_objects()) {
772        /* all commands should be for the same port */
773        assert(obj->get_port_id() == single_stream.m_stream->m_port_id);
774        add_stream(lp_port,single_stream.m_stream,obj);
775    }
776
777    uint32_t nodes = lp_port->m_active_nodes.size();
778    /* find next stream */
779    assert(nodes == obj->get_objects().size());
780
781    int cnt=0;
782
783    /* set the next_stream pointer  */
784    for (auto single_stream : obj->get_objects()) {
785
786        if (single_stream.m_stream->is_dp_next_stream() ) {
787            int stream_id = single_stream.m_stream->m_next_stream_id;
788            assert(stream_id<nodes);
789            /* point to the next stream , stream_id is fixed */
790            lp_port->m_active_nodes[cnt].m_node->m_next_stream = lp_port->m_active_nodes[stream_id].m_node ;
791        }
792        cnt++;
793    }
794
795    lp_port->m_state =TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
796    m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
797
798
799    if ( duration > 0.0 ){
800        add_port_duration( duration ,obj->get_port_id(),event_id );
801    }
802
803}
804
805
806bool TrexStatelessDpCore::are_all_ports_idle(){
807
808    bool res=true;
809    int i;
810    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
811        if ( m_ports[i].m_state != TrexStatelessDpPerPort::ppSTATE_IDLE ){
812            res=false;
813        }
814    }
815    return (res);
816}
817
818
819void
820TrexStatelessDpCore::resume_traffic(uint8_t port_id){
821
822    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
823
824    lp_port->resume_traffic(port_id);
825}
826
827
828void
829TrexStatelessDpCore::pause_traffic(uint8_t port_id){
830
831    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
832
833    lp_port->pause_traffic(port_id);
834}
835
836void
837TrexStatelessDpCore::update_traffic(uint8_t port_id, double factor) {
838
839    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
840
841    lp_port->update_traffic(port_id, factor);
842}
843
844
845void
846TrexStatelessDpCore::stop_traffic(uint8_t  port_id,
847                                  bool     stop_on_id,
848                                  int      event_id) {
849    /* we cannot remove nodes not from the top of the queue so
850       for every active node - make sure next time
851       the scheduler invokes it, it will be free */
852
853    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
854
855    if ( lp_port->stop_traffic(port_id,stop_on_id,event_id) == false){
856        /* nothing to do ! already stopped */
857        //printf(" skip .. %f\n",m_core->m_cur_time_sec);
858        return;
859    }
860
861    /* inform the control plane we stopped - this might be a async stop
862       (streams ended)
863    */
864    #if 0
865    if ( are_all_ports_idle() ) {
866        /* just a place holder if we will need to do somthing in that case */
867    }
868    #endif
869
870    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
871    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
872                                                                   port_id,
873                                                                   lp_port->get_event_id());
874    ring->Enqueue((CGenNode *)event_msg);
875
876}
877
878/**
879 * handle a message from CP to DP
880 *
881 */
882void
883TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) {
884    msg->handle(this);
885    delete msg;
886}
887
888void
889TrexStatelessDpCore::barrier(uint8_t port_id, int event_id) {
890
891    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
892    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
893                                                                   port_id,
894                                                                   event_id);
895    ring->Enqueue((CGenNode *)event_msg);
896}
897