trex_stateless_dp_core.cpp revision 59a3b58d
1/*
2 Itay Marom
3 Hanoch Haim
4 Cisco Systems, Inc.
5*/
6
7/*
8Copyright (c) 2015-2015 Cisco Systems, Inc.
9
10Licensed under the Apache License, Version 2.0 (the "License");
11you may not use this file except in compliance with the License.
12You may obtain a copy of the License at
13
14    http://www.apache.org/licenses/LICENSE-2.0
15
16Unless required by applicable law or agreed to in writing, software
17distributed under the License is distributed on an "AS IS" BASIS,
18WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19See the License for the specific language governing permissions and
20limitations under the License.
21*/
22#include <trex_stateless_dp_core.h>
23#include <trex_stateless_messaging.h>
24#include <trex_streams_compiler.h>
25#include <trex_stream_node.h>
26#include <trex_stream.h>
27
28#include <bp_sim.h>
29
30
31void CDpOneStream::Delete(CFlowGenListPerThread   * core){
32    assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE);
33    core->free_node((CGenNode *)m_node);
34    delete m_dp_stream;
35    m_node=0;
36    m_dp_stream=0;
37}
38
39void CDpOneStream::DeleteOnlyStream(){
40    assert(m_dp_stream);
41    delete m_dp_stream;
42    m_dp_stream=0;
43}
44
45int CGenNodeStateless::get_stream_id(){
46    if (m_state ==CGenNodeStateless::ss_FREE_RESUSE) {
47        return (-1); // not valid
48    }
49    assert(m_ref_stream_info);
50    return ((int)m_ref_stream_info->m_stream_id);
51}
52
53
54void CGenNodeStateless::DumpHeader(FILE *fd){
55    fprintf(fd," pkt_id, time, port , action , state, stream_id , stype , m-burst# , burst# \n");
56
57}
58void CGenNodeStateless::Dump(FILE *fd){
59    fprintf(fd," %2.4f, %3lu, %s,%s, %3d, %s, %3lu, %3lu  \n",
60            m_time,
61            (ulong)m_port_id,
62            "s-pkt", //action
63            get_stream_state_str(m_state ).c_str(),
64            get_stream_id(),   //stream_id
65            TrexStream::get_stream_type_str(m_stream_type).c_str(), //stype
66            (ulong)m_multi_bursts,
67            (ulong)m_single_burst
68            );
69}
70
71
72
73void CGenNodeStateless::refresh_vm_bss(){
74    if ( m_vm_flow_var ) {
75        StreamVmDp  * vm_s=m_ref_stream_info->m_vm_dp;
76        assert(vm_s);
77        memcpy(m_vm_flow_var,vm_s->get_bss(),vm_s->get_bss_size());
78
79        if ( vm_s->is_random_seed() ){
80            /* if we have random seed for this program */
81            if (m_ref_stream_info->m_random_seed) {
82                set_random_seed(m_ref_stream_info->m_random_seed);
83            }
84        }
85    }
86}
87
88
89/**
90 * this function called when stream restart after it was inactive
91 */
92void CGenNodeStateless::refresh(){
93
94    /* refill the stream info */
95    m_single_burst    = m_single_burst_refill;
96    m_multi_bursts    = m_ref_stream_info->m_num_bursts;
97    m_state           = CGenNodeStateless::ss_ACTIVE;
98
99    /* refresh init value */
100#if 0
101    /* TBD should add a JSON varible for that */
102    refresh_vm_bss();
103#endif
104}
105
106
107void CGenNodeCommand::free_command(){
108
109    assert(m_cmd);
110    m_cmd->on_node_remove();
111    delete m_cmd;
112}
113
114
115std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state){
116    std::string res;
117
118    switch (stream_state) {
119    case CGenNodeStateless::ss_FREE_RESUSE :
120         res="FREE    ";
121        break;
122    case CGenNodeStateless::ss_INACTIVE :
123        res="INACTIVE ";
124        break;
125    case CGenNodeStateless::ss_ACTIVE :
126        res="ACTIVE   ";
127        break;
128    default:
129        res="Unknow   ";
130    };
131    return(res);
132}
133
134
135rte_mbuf_t   * CGenNodeStateless::alloc_node_with_vm(){
136
137    rte_mbuf_t        * m;
138    /* alloc small packet buffer*/
139    uint16_t prefix_size = prefix_header_size();
140    m = CGlobalInfo::pktmbuf_alloc( get_socket_id(), prefix_size );
141    if (m==0) {
142        return (m);
143    }
144    /* TBD remove this, should handle cases of error */
145    assert(m);
146    char *p=rte_pktmbuf_append(m, prefix_size);
147    memcpy( p ,m_original_packet_data_prefix, prefix_size);
148
149
150    /* run the VM program */
151    StreamDPVmInstructionsRunner runner;
152
153    runner.run( (uint32_t*)m_vm_flow_var,
154                m_vm_program_size,
155                m_vm_program,
156                m_vm_flow_var,
157                (uint8_t*)p);
158
159    uint16_t pkt_new_size=runner.get_new_pkt_size();
160    if ( likely( pkt_new_size == 0) ) {
161        /* no packet size change */
162        rte_mbuf_t * m_const = get_const_mbuf();
163        if (  m_const != NULL) {
164            utl_rte_pktmbuf_add_after(m,m_const);
165        }
166        return (m);
167    }
168
169    /* packet size change there are a few changes */
170    rte_mbuf_t * m_const = get_const_mbuf();
171    if ( (m_const == 0 ) || (pkt_new_size<=prefix_size) ) {
172        /* one mbuf , just trim it */
173        m->data_len = pkt_new_size;
174        m->pkt_len  = pkt_new_size;
175        return (m);
176    }
177
178    rte_mbuf_t * mi= CGlobalInfo::pktmbuf_alloc_small(get_socket_id());
179    assert(mi);
180    rte_pktmbuf_attach(mi,m_const);
181    utl_rte_pktmbuf_add_after2(m,mi);
182
183    if ( pkt_new_size < m->pkt_len) {
184        /* need to trim it */
185        mi->data_len = (pkt_new_size - prefix_size);
186        m->pkt_len   = pkt_new_size;
187    }
188    return (m);
189}
190
191
192void CGenNodeStateless::free_stl_node(){
193    /* if we have cache mbuf free it */
194    rte_mbuf_t * m=get_cache_mbuf();
195    if (m) {
196        rte_pktmbuf_free(m);
197        m_cache_mbuf=0;
198    }else{
199        /* non cache - must have an header */
200         m=get_const_mbuf();
201         if (m) {
202             rte_pktmbuf_free(m); /* reduce the ref counter */
203         }
204         free_prefix_header();
205    }
206    if (m_vm_flow_var) {
207        /* free flow var */
208        free(m_vm_flow_var);
209        m_vm_flow_var=0;
210    }
211}
212
213
214bool TrexStatelessDpPerPort::update_number_of_active_streams(uint32_t d){
215    m_active_streams-=d; /* reduce the number of streams */
216    if (m_active_streams == 0) {
217        return (true);
218    }
219    return (false);
220}
221
222bool TrexStatelessDpPerPort::resume_traffic(uint8_t port_id){
223
224    /* we are working with continues streams so we must be in transmit mode */
225    assert(m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE);
226
227    for (auto dp_stream : m_active_nodes) {
228        CGenNodeStateless * node =dp_stream.m_node;
229        assert(node->get_port_id() == port_id);
230        assert(node->is_pause() == true);
231        node->set_pause(false);
232    }
233    m_state = TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
234    return (true);
235}
236
237bool TrexStatelessDpPerPort::update_traffic(uint8_t port_id, double factor) {
238
239    assert( (m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING ||
240            (m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE)) );
241
242    for (auto dp_stream : m_active_nodes) {
243        CGenNodeStateless * node = dp_stream.m_node;
244        assert(node->get_port_id() == port_id);
245
246        node->update_rate(factor);
247    }
248
249    return (true);
250}
251
252bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){
253
254    /* we are working with continues streams so we must be in transmit mode */
255    assert(m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING);
256
257    for (auto dp_stream : m_active_nodes) {
258        CGenNodeStateless * node =dp_stream.m_node;
259        assert(node->get_port_id() == port_id);
260        assert(node->is_pause() == false);
261        node->set_pause(true);
262    }
263    m_state = TrexStatelessDpPerPort::ppSTATE_PAUSE;
264    return (true);
265}
266
267
268bool TrexStatelessDpPerPort::stop_traffic(uint8_t  port_id,
269                                          bool     stop_on_id,
270                                          int      event_id){
271
272
273    if (m_state == TrexStatelessDpPerPort::ppSTATE_IDLE) {
274        assert(m_active_streams==0);
275        return false;
276    }
277
278    /* there could be race of stop after stop */
279    if ( stop_on_id ) {
280        if (event_id != m_event_id){
281            /* we can't stop it is an old message */
282            return false;
283        }
284    }
285
286    for (auto dp_stream : m_active_nodes) {
287        CGenNodeStateless * node =dp_stream.m_node;
288        assert(node->get_port_id() == port_id);
289        if ( node->get_state() == CGenNodeStateless::ss_ACTIVE) {
290            node->mark_for_free();
291            m_active_streams--;
292            dp_stream.DeleteOnlyStream();
293
294        }else{
295            dp_stream.Delete(m_core);
296        }
297    }
298
299    /* active stream should be zero */
300    assert(m_active_streams==0);
301    m_active_nodes.clear();
302    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
303    return (true);
304}
305
306
307void TrexStatelessDpPerPort::create(CFlowGenListPerThread   *  core){
308    m_core=core;
309    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
310    m_port_id=0;
311    m_active_streams=0;
312    m_active_nodes.clear();
313}
314
315
316
317void
318TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) {
319    m_thread_id = thread_id;
320    m_core = core;
321    m_local_port_offset = 2*core->getDualPortId();
322
323    CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
324
325    m_ring_from_cp = cp_dp->getRingCpToDp(thread_id);
326    m_ring_to_cp   = cp_dp->getRingDpToCp(thread_id);
327
328    m_state = STATE_IDLE;
329
330    int i;
331    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
332        m_ports[i].create(core);
333    }
334}
335
336
337/* move to the next stream, old stream move to INACTIVE */
338bool TrexStatelessDpCore::set_stateless_next_node(CGenNodeStateless * cur_node,
339                                                  CGenNodeStateless * next_node){
340
341    assert(cur_node);
342    TrexStatelessDpPerPort * lp_port = get_port_db(cur_node->m_port_id);
343    bool schedule =false;
344
345    bool to_stop_port=false;
346
347    if (next_node == NULL) {
348        /* there is no next stream , reduce the number of active streams*/
349        to_stop_port = lp_port->update_number_of_active_streams(1);
350
351    }else{
352        uint8_t state=next_node->get_state();
353
354        /* can't be FREE_RESUSE */
355        assert(state != CGenNodeStateless::ss_FREE_RESUSE);
356        if (state == CGenNodeStateless::ss_INACTIVE ) {
357
358            if (cur_node->m_action_counter > 0) {
359                cur_node->m_action_counter--;
360                if (cur_node->m_action_counter==0) {
361                    to_stop_port = lp_port->update_number_of_active_streams(1);
362                }else{
363                    /* refill start info and scedule, no update in active streams  */
364                    next_node->refresh();
365                    schedule = true;
366                }
367            }else{
368                /* refill start info and scedule, no update in active streams  */
369                next_node->refresh();
370                schedule = true;
371            }
372
373        }else{
374            to_stop_port = lp_port->update_number_of_active_streams(1);
375        }
376    }
377
378    if ( to_stop_port ) {
379        /* call stop port explictly to move the state */
380        stop_traffic(cur_node->m_port_id,false,0);
381    }
382
383    return ( schedule );
384}
385
386
387
388/**
389 * in idle state loop, the processor most of the time sleeps
390 * and periodically checks for messages
391 *
392 * @author imarom (01-Nov-15)
393 */
394void
395TrexStatelessDpCore::idle_state_loop() {
396
397    while (m_state == STATE_IDLE) {
398        bool had_msg = periodic_check_for_cp_messages();
399        /* if no message - backoff for some time */
400        if (!had_msg) {
401            delay(200);
402        }
403    }
404}
405
406
407
408void TrexStatelessDpCore::quit_main_loop(){
409    m_core->set_terminate_mode(true); /* mark it as terminated */
410    m_state = STATE_TERMINATE;
411    add_global_duration(0.0001);
412}
413
414
415/**
416 * scehduler runs when traffic exists
417 * it will return when no more transmitting is done on this
418 * core
419 *
420 * @author imarom (01-Nov-15)
421 */
422void
423TrexStatelessDpCore::start_scheduler() {
424    /* creates a maintenace job using the scheduler */
425    CGenNode * node_sync = m_core->create_node() ;
426    node_sync->m_type = CGenNode::FLOW_SYNC;
427    node_sync->m_time = m_core->m_cur_time_sec + SYNC_TIME_OUT;
428    m_core->m_node_gen.add_node(node_sync);
429
430    double old_offset = 0.0;
431    m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset);
432    /* bail out in case of terminate */
433    if (m_state != TrexStatelessDpCore::STATE_TERMINATE) {
434        m_core->m_node_gen.close_file(m_core);
435        m_state = STATE_IDLE; /* we exit from all ports and we have nothing to do, we move to IDLE state */
436    }
437}
438
439
440void
441TrexStatelessDpCore::run_once(){
442
443    idle_state_loop();
444
445    if ( m_state == STATE_TERMINATE ){
446        return;
447    }
448
449    start_scheduler();
450}
451
452
453
454
455void
456TrexStatelessDpCore::start() {
457
458    while (true) {
459        run_once();
460
461        if ( m_core->is_terminated_by_master() ) {
462            break;
463        }
464    }
465}
466
467/* only if both port are idle we can exit */
468void
469TrexStatelessDpCore::schedule_exit(){
470
471    CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
472
473    node->m_type = CGenNode::COMMAND;
474
475    node->m_cmd = new TrexStatelessDpCanQuit();
476
477    /* make sure it will be scheduled after the current node */
478    node->m_time = m_core->m_cur_time_sec ;
479
480    m_core->m_node_gen.add_node((CGenNode *)node);
481}
482
483
484void
485TrexStatelessDpCore::add_global_duration(double duration){
486    if (duration > 0.0) {
487        CGenNode *node = m_core->create_node() ;
488
489        node->m_type = CGenNode::EXIT_SCHED;
490
491        /* make sure it will be scheduled after the current node */
492        node->m_time = m_core->m_cur_time_sec + duration ;
493
494        m_core->m_node_gen.add_node(node);
495    }
496}
497
498/* add per port exit */
499void
500TrexStatelessDpCore::add_port_duration(double duration,
501                                       uint8_t port_id,
502                                       int event_id){
503    if (duration > 0.0) {
504        CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
505
506        node->m_type = CGenNode::COMMAND;
507
508        /* make sure it will be scheduled after the current node */
509        node->m_time = m_core->m_cur_time_sec + duration ;
510
511        TrexStatelessDpStop * cmd=new TrexStatelessDpStop(port_id);
512
513
514        /* test this */
515        m_core->m_non_active_nodes++;
516        cmd->set_core_ptr(m_core);
517        cmd->set_event_id(event_id);
518        cmd->set_wait_for_event_id(true);
519
520        node->m_cmd = cmd;
521
522        m_core->m_node_gen.add_node((CGenNode *)node);
523    }
524}
525
526
527void TrexStatelessDpCore::update_mac_addr(TrexStream * stream,
528                                          CGenNodeStateless *node,
529                                          pkt_dir_t dir,
530                                          char *raw_pkt){
531    bool              ov_src = stream->get_override_src_mac_by_pkt_data();
532    TrexStream::stream_dst_mac_t  ov_dst = stream->get_override_dst_mac_mode();
533
534
535    if ( (ov_src == true) && (ov_dst == TrexStream::stPKT) ) {
536        /* nothing to do, take from the packet both */
537        return;
538    }
539
540        /* take from cfg_file */
541    if ( (ov_src == false) &&
542         (ov_dst == TrexStream::stCFG_FILE) ){
543
544          m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*)raw_pkt);
545          return;
546    }
547
548    /* save the pkt*/
549    char tmp_pkt[12];
550    memcpy(tmp_pkt,raw_pkt,12);
551
552    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*)raw_pkt);
553
554    if ((ov_src == true) && (ov_dst == TrexStream::stCFG_FILE)) {
555        memcpy(raw_pkt+6,tmp_pkt+6,6);
556    }
557
558    if ((ov_src == false) && (ov_dst == TrexStream::stPKT)) {
559        memcpy(raw_pkt,tmp_pkt,6);
560    }
561}
562
563
564void
565TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port,
566                                TrexStream * stream,
567                                TrexStreamsCompiledObj *comp) {
568    CGenNodeStateless *node = m_core->create_node_sl();
569
570    /* add periodic */
571    node->m_cache_mbuf=0;
572    node->m_type = CGenNode::STATELESS_PKT;
573
574    node->m_action_counter = stream->m_action_count;
575
576    /* clone the stream from control plane memory to DP memory */
577    node->m_ref_stream_info = stream->clone();
578    /* no need for this memory anymore on the control plane memory */
579    stream->release_dp_object();
580
581    node->m_next_stream=0; /* will be fixed later */
582
583    if ( stream->m_self_start ){
584        /* if self start it is in active mode */
585        node->m_state =CGenNodeStateless::ss_ACTIVE;
586        lp_port->m_active_streams++;
587    }else{
588        node->m_state =CGenNodeStateless::ss_INACTIVE;
589    }
590
591    node->m_time = m_core->m_cur_time_sec + usec_to_sec(stream->m_isg_usec);
592
593    pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(stream->m_port_id);
594    node->m_flags = 0;
595    node->m_src_port =0;
596    node->m_original_packet_data_prefix = 0;
597
598    if (stream->m_rx_check.m_enabled) {
599        node->set_stat_needed();
600        uint8_t hw_id = stream->m_rx_check.m_hw_id;
601        assert (hw_id < MAX_FLOW_STATS);
602        node->set_stat_hw_id(hw_id);
603    }
604
605    /* set socket id */
606    node->set_socket_id(m_core->m_node_gen.m_socket_id);
607
608    /* build a mbuf from a packet */
609
610    uint16_t pkt_size = stream->m_pkt.len;
611    const uint8_t *stream_pkt = stream->m_pkt.binary;
612
613    node->m_pause =0;
614    node->m_stream_type = stream->m_type;
615    node->m_next_time_offset = 1.0 / stream->get_pps();
616
617    /* stateless specific fields */
618    switch ( stream->m_type ) {
619
620    case TrexStream::stCONTINUOUS :
621        node->m_single_burst=0;
622        node->m_single_burst_refill=0;
623        node->m_multi_bursts=0;
624        break;
625
626    case TrexStream::stSINGLE_BURST :
627        node->m_stream_type             = TrexStream::stMULTI_BURST;
628        node->m_single_burst            = stream->m_burst_total_pkts;
629        node->m_single_burst_refill     = stream->m_burst_total_pkts;
630        node->m_multi_bursts            = 1;  /* single burst in multi burst of 1 */
631        break;
632
633    case TrexStream::stMULTI_BURST :
634        node->m_single_burst        = stream->m_burst_total_pkts;
635        node->m_single_burst_refill = stream->m_burst_total_pkts;
636        node->m_multi_bursts        = stream->m_num_bursts;
637        break;
638    default:
639
640        assert(0);
641    };
642
643    node->m_port_id = stream->m_port_id;
644
645    /* set dir 0 or 1 client or server */
646    node->set_mbuf_cache_dir(dir);
647
648
649    if (node->m_ref_stream_info->getDpVm() == NULL) {
650        /* no VM */
651
652        node->m_vm_flow_var =  NULL;
653        node->m_vm_program  =  NULL;
654        node->m_vm_program_size =0;
655
656                /* allocate const mbuf */
657        rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size);
658        assert(m);
659
660        char *p = rte_pktmbuf_append(m, pkt_size);
661        assert(p);
662        /* copy the packet */
663        memcpy(p,stream_pkt,pkt_size);
664
665        update_mac_addr(stream,node,dir,p);
666
667        /* set the packet as a readonly */
668        node->set_cache_mbuf(m);
669
670        node->m_original_packet_data_prefix =0;
671    }else{
672
673        /* set the program */
674        TrexStream * local_mem_stream = node->m_ref_stream_info;
675
676        StreamVmDp  * lpDpVm = local_mem_stream->getDpVm();
677
678        node->m_vm_flow_var      = lpDpVm->clone_bss(); /* clone the flow var */
679        node->m_vm_program       = lpDpVm->get_program(); /* same ref to the program */
680        node->m_vm_program_size  = lpDpVm->get_program_size();
681
682
683        /* set the random seed if was set */
684        if ( lpDpVm->is_random_seed() ){
685            /* if we have random seed for this program */
686            if (stream->m_random_seed) {
687                node->set_random_seed(stream->m_random_seed);
688            }
689        }
690
691        /* we need to copy the object */
692        if ( pkt_size > lpDpVm->get_prefix_size() ) {
693            /* we need const packet */
694            uint16_t const_pkt_size  = pkt_size - lpDpVm->get_prefix_size() ;
695            rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), const_pkt_size );
696            assert(m);
697
698            char *p = rte_pktmbuf_append(m, const_pkt_size);
699            assert(p);
700
701            /* copy packet data */
702            memcpy(p,(stream_pkt + lpDpVm->get_prefix_size()),const_pkt_size);
703
704            node->set_const_mbuf(m);
705        }
706
707
708        if ( lpDpVm->is_pkt_size_var() ) {
709            // mark the node as varible size
710            node->set_var_pkt_size();
711        }
712
713
714        if (lpDpVm->get_prefix_size() > pkt_size ) {
715            lpDpVm->set_prefix_size(pkt_size);
716        }
717
718        /* copy the headr */
719        uint16_t header_size = lpDpVm->get_prefix_size();
720        assert(header_size);
721        node->alloc_prefix_header(header_size);
722        uint8_t *p=node->m_original_packet_data_prefix;
723        assert(p);
724
725        memcpy(p,stream_pkt , header_size);
726
727        update_mac_addr(stream,node,dir,(char *)p);
728    }
729
730
731    CDpOneStream one_stream;
732
733    one_stream.m_dp_stream = node->m_ref_stream_info;
734    one_stream.m_node =node;
735
736    lp_port->m_active_nodes.push_back(one_stream);
737
738    /* schedule only if active */
739    if (node->m_state == CGenNodeStateless::ss_ACTIVE) {
740        m_core->m_node_gen.add_node((CGenNode *)node);
741    }
742}
743
744void
745TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj,
746                                   double duration,
747                                   int event_id) {
748
749
750    TrexStatelessDpPerPort * lp_port=get_port_db(obj->get_port_id());
751    lp_port->m_active_streams = 0;
752    lp_port->set_event_id(event_id);
753
754    /* no nodes in the list */
755    assert(lp_port->m_active_nodes.size()==0);
756
757    for (auto single_stream : obj->get_objects()) {
758        /* all commands should be for the same port */
759        assert(obj->get_port_id() == single_stream.m_stream->m_port_id);
760        add_stream(lp_port,single_stream.m_stream,obj);
761    }
762
763    uint32_t nodes = lp_port->m_active_nodes.size();
764    /* find next stream */
765    assert(nodes == obj->get_objects().size());
766
767    int cnt=0;
768
769    /* set the next_stream pointer  */
770    for (auto single_stream : obj->get_objects()) {
771
772        if (single_stream.m_stream->is_dp_next_stream() ) {
773            int stream_id = single_stream.m_stream->m_next_stream_id;
774            assert(stream_id<nodes);
775            /* point to the next stream , stream_id is fixed */
776            lp_port->m_active_nodes[cnt].m_node->m_next_stream = lp_port->m_active_nodes[stream_id].m_node ;
777        }
778        cnt++;
779    }
780
781    lp_port->m_state =TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
782    m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
783
784
785    if ( duration > 0.0 ){
786        add_port_duration( duration ,obj->get_port_id(),event_id );
787    }
788
789}
790
791
792bool TrexStatelessDpCore::are_all_ports_idle(){
793
794    bool res=true;
795    int i;
796    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
797        if ( m_ports[i].m_state != TrexStatelessDpPerPort::ppSTATE_IDLE ){
798            res=false;
799        }
800    }
801    return (res);
802}
803
804
805void
806TrexStatelessDpCore::resume_traffic(uint8_t port_id){
807
808    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
809
810    lp_port->resume_traffic(port_id);
811}
812
813
814void
815TrexStatelessDpCore::pause_traffic(uint8_t port_id){
816
817    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
818
819    lp_port->pause_traffic(port_id);
820}
821
822void
823TrexStatelessDpCore::update_traffic(uint8_t port_id, double factor) {
824
825    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
826
827    lp_port->update_traffic(port_id, factor);
828}
829
830
831void
832TrexStatelessDpCore::stop_traffic(uint8_t  port_id,
833                                  bool     stop_on_id,
834                                  int      event_id) {
835    /* we cannot remove nodes not from the top of the queue so
836       for every active node - make sure next time
837       the scheduler invokes it, it will be free */
838
839    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
840
841    if ( lp_port->stop_traffic(port_id,stop_on_id,event_id) == false){
842        /* nothing to do ! already stopped */
843        //printf(" skip .. %f\n",m_core->m_cur_time_sec);
844        return;
845    }
846
847    /* inform the control plane we stopped - this might be a async stop
848       (streams ended)
849    */
850    #if 0
851    if ( are_all_ports_idle() ) {
852        /* just a place holder if we will need to do somthing in that case */
853    }
854    #endif
855
856    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
857    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
858                                                                   port_id,
859                                                                   lp_port->get_event_id());
860    ring->Enqueue((CGenNode *)event_msg);
861
862}
863
864/**
865 * handle a message from CP to DP
866 *
867 */
868void
869TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) {
870    msg->handle(this);
871    delete msg;
872}
873
874void
875TrexStatelessDpCore::barrier(uint8_t port_id, int event_id) {
876
877    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
878    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
879                                                                   port_id,
880                                                                   event_id);
881    ring->Enqueue((CGenNode *)event_msg);
882}
883