trex_stateless_dp_core.cpp revision 7a3be366
1/*
2 Itay Marom
3 Hanoch Haim
4 Cisco Systems, Inc.
5*/
6
7/*
8Copyright (c) 2015-2016 Cisco Systems, Inc.
9
10Licensed under the Apache License, Version 2.0 (the "License");
11you may not use this file except in compliance with the License.
12You may obtain a copy of the License at
13
14    http://www.apache.org/licenses/LICENSE-2.0
15
16Unless required by applicable law or agreed to in writing, software
17distributed under the License is distributed on an "AS IS" BASIS,
18WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19See the License for the specific language governing permissions and
20limitations under the License.
21*/
22#include "bp_sim.h"
23#include "trex_stateless_dp_core.h"
24#include "trex_stateless_messaging.h"
25#include "trex_stream.h"
26#include "trex_stream_node.h"
27#include "trex_streams_compiler.h"
28
29void CDpOneStream::Delete(CFlowGenListPerThread   * core){
30    assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE);
31    core->free_node((CGenNode *)m_node);
32    delete m_dp_stream;
33    m_node=0;
34    m_dp_stream=0;
35}
36
37void CDpOneStream::DeleteOnlyStream(){
38    assert(m_dp_stream);
39    delete m_dp_stream;
40    m_dp_stream=0;
41}
42
43int CGenNodeStateless::get_stream_id(){
44    if (m_state ==CGenNodeStateless::ss_FREE_RESUSE) {
45        return (-1); // not valid
46    }
47    assert(m_ref_stream_info);
48    return ((int)m_ref_stream_info->m_stream_id);
49}
50
51
52void CGenNodeStateless::DumpHeader(FILE *fd){
53    fprintf(fd," pkt_id, time, port , action , state, stream_id , stype , m-burst# , burst# \n");
54
55}
56void CGenNodeStateless::Dump(FILE *fd){
57    fprintf(fd," %2.4f, %3lu, %s,%s, %3d, %s, %3lu, %3lu  \n",
58            m_time,
59            (ulong)m_port_id,
60            "s-pkt", //action
61            get_stream_state_str(m_state ).c_str(),
62            get_stream_id(),   //stream_id
63            TrexStream::get_stream_type_str(m_stream_type).c_str(), //stype
64            (ulong)m_multi_bursts,
65            (ulong)m_single_burst
66            );
67}
68
69
70
71void CGenNodeStateless::refresh_vm_bss(){
72    if ( m_vm_flow_var ) {
73        StreamVmDp  * vm_s=m_ref_stream_info->m_vm_dp;
74        assert(vm_s);
75        memcpy(m_vm_flow_var,vm_s->get_bss(),vm_s->get_bss_size());
76
77        if ( vm_s->is_random_seed() ){
78            /* if we have random seed for this program */
79            if (m_ref_stream_info->m_random_seed) {
80                set_random_seed(m_ref_stream_info->m_random_seed);
81            }
82        }
83    }
84}
85
86
87/**
88 * this function called when stream restart after it was inactive
89 */
90void CGenNodeStateless::refresh(){
91
92    /* refill the stream info */
93    m_single_burst    = m_single_burst_refill;
94    m_multi_bursts    = m_ref_stream_info->m_num_bursts;
95    m_state           = CGenNodeStateless::ss_ACTIVE;
96
97    /* refresh init value */
98#if 0
99    /* TBD should add a JSON varible for that */
100    refresh_vm_bss();
101#endif
102}
103
104
105void CGenNodeCommand::free_command(){
106
107    assert(m_cmd);
108    m_cmd->on_node_remove();
109    delete m_cmd;
110}
111
112
113std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state){
114    std::string res;
115
116    switch (stream_state) {
117    case CGenNodeStateless::ss_FREE_RESUSE :
118         res="FREE    ";
119        break;
120    case CGenNodeStateless::ss_INACTIVE :
121        res="INACTIVE ";
122        break;
123    case CGenNodeStateless::ss_ACTIVE :
124        res="ACTIVE   ";
125        break;
126    default:
127        res="Unknow   ";
128    };
129    return(res);
130}
131
132
133rte_mbuf_t   * CGenNodeStateless::alloc_node_with_vm(){
134
135    rte_mbuf_t        * m;
136    /* alloc small packet buffer*/
137    uint16_t prefix_size = prefix_header_size();
138    m = CGlobalInfo::pktmbuf_alloc( get_socket_id(), prefix_size );
139    if (m==0) {
140        return (m);
141    }
142    /* TBD remove this, should handle cases of error */
143    assert(m);
144    char *p=rte_pktmbuf_append(m, prefix_size);
145    memcpy( p ,m_original_packet_data_prefix, prefix_size);
146
147
148    /* run the VM program */
149    StreamDPVmInstructionsRunner runner;
150
151    runner.run( (uint32_t*)m_vm_flow_var,
152                m_vm_program_size,
153                m_vm_program,
154                m_vm_flow_var,
155                (uint8_t*)p);
156
157    uint16_t pkt_new_size=runner.get_new_pkt_size();
158    if ( likely( pkt_new_size == 0) ) {
159        /* no packet size change */
160        rte_mbuf_t * m_const = get_const_mbuf();
161        if (  m_const != NULL) {
162            utl_rte_pktmbuf_add_after(m,m_const);
163        }
164        return (m);
165    }
166
167    /* packet size change there are a few changes */
168    rte_mbuf_t * m_const = get_const_mbuf();
169    if ( (m_const == 0 ) || (pkt_new_size<=prefix_size) ) {
170        /* one mbuf , just trim it */
171        m->data_len = pkt_new_size;
172        m->pkt_len  = pkt_new_size;
173        return (m);
174    }
175
176    rte_mbuf_t * mi= CGlobalInfo::pktmbuf_alloc_small(get_socket_id());
177    assert(mi);
178    rte_pktmbuf_attach(mi,m_const);
179    utl_rte_pktmbuf_add_after2(m,mi);
180
181    if ( pkt_new_size < m->pkt_len) {
182        /* need to trim it */
183        mi->data_len = (pkt_new_size - prefix_size);
184        m->pkt_len   = pkt_new_size;
185    }
186    return (m);
187}
188
189
190void CGenNodeStateless::free_stl_node(){
191    /* if we have cache mbuf free it */
192    rte_mbuf_t * m=get_cache_mbuf();
193    if (m) {
194        rte_pktmbuf_free(m);
195        m_cache_mbuf=0;
196    }else{
197        /* non cache - must have an header */
198         m=get_const_mbuf();
199         if (m) {
200             rte_pktmbuf_free(m); /* reduce the ref counter */
201         }
202         free_prefix_header();
203    }
204    if (m_vm_flow_var) {
205        /* free flow var */
206        free(m_vm_flow_var);
207        m_vm_flow_var=0;
208    }
209}
210
211
212bool TrexStatelessDpPerPort::update_number_of_active_streams(uint32_t d){
213    m_active_streams-=d; /* reduce the number of streams */
214    if (m_active_streams == 0) {
215        return (true);
216    }
217    return (false);
218}
219
220bool TrexStatelessDpPerPort::resume_traffic(uint8_t port_id){
221
222    /* we are working with continues streams so we must be in transmit mode */
223    assert(m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE);
224
225    for (auto dp_stream : m_active_nodes) {
226        CGenNodeStateless * node =dp_stream.m_node;
227        assert(node->get_port_id() == port_id);
228        assert(node->is_pause() == true);
229        node->set_pause(false);
230    }
231    m_state = TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
232    return (true);
233}
234
235bool TrexStatelessDpPerPort::update_traffic(uint8_t port_id, double factor) {
236
237    assert( (m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING ||
238            (m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE)) );
239
240    for (auto dp_stream : m_active_nodes) {
241        CGenNodeStateless * node = dp_stream.m_node;
242        assert(node->get_port_id() == port_id);
243
244        node->update_rate(factor);
245    }
246
247    return (true);
248}
249
250bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){
251
252    /* we are working with continues streams so we must be in transmit mode */
253    assert(m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING);
254
255    for (auto dp_stream : m_active_nodes) {
256        CGenNodeStateless * node =dp_stream.m_node;
257        assert(node->get_port_id() == port_id);
258        assert(node->is_pause() == false);
259        node->set_pause(true);
260    }
261    m_state = TrexStatelessDpPerPort::ppSTATE_PAUSE;
262    return (true);
263}
264
265
266bool TrexStatelessDpPerPort::stop_traffic(uint8_t  port_id,
267                                          bool     stop_on_id,
268                                          int      event_id){
269
270
271    if (m_state == TrexStatelessDpPerPort::ppSTATE_IDLE) {
272        assert(m_active_streams==0);
273        return false;
274    }
275
276    /* there could be race of stop after stop */
277    if ( stop_on_id ) {
278        if (event_id != m_event_id){
279            /* we can't stop it is an old message */
280            return false;
281        }
282    }
283
284    for (auto dp_stream : m_active_nodes) {
285        CGenNodeStateless * node =dp_stream.m_node;
286        assert(node->get_port_id() == port_id);
287        if ( node->get_state() == CGenNodeStateless::ss_ACTIVE) {
288            node->mark_for_free();
289            m_active_streams--;
290            dp_stream.DeleteOnlyStream();
291
292        }else{
293            dp_stream.Delete(m_core);
294        }
295    }
296
297    /* active stream should be zero */
298    assert(m_active_streams==0);
299    m_active_nodes.clear();
300    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
301    return (true);
302}
303
304
305void TrexStatelessDpPerPort::create(CFlowGenListPerThread   *  core){
306    m_core=core;
307    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
308    m_port_id=0;
309    m_active_streams=0;
310    m_active_nodes.clear();
311}
312
313
314
315void
316TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) {
317    m_thread_id = thread_id;
318    m_core = core;
319    m_local_port_offset = 2*core->getDualPortId();
320
321    CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
322
323    m_ring_from_cp = cp_dp->getRingCpToDp(thread_id);
324    m_ring_to_cp   = cp_dp->getRingDpToCp(thread_id);
325
326    m_state = STATE_IDLE;
327
328    int i;
329    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
330        m_ports[i].create(core);
331    }
332}
333
334
335/* move to the next stream, old stream move to INACTIVE */
336bool TrexStatelessDpCore::set_stateless_next_node(CGenNodeStateless * cur_node,
337                                                  CGenNodeStateless * next_node){
338
339    assert(cur_node);
340    TrexStatelessDpPerPort * lp_port = get_port_db(cur_node->m_port_id);
341    bool schedule =false;
342
343    bool to_stop_port=false;
344
345    if (next_node == NULL) {
346        /* there is no next stream , reduce the number of active streams*/
347        to_stop_port = lp_port->update_number_of_active_streams(1);
348
349    }else{
350        uint8_t state=next_node->get_state();
351
352        /* can't be FREE_RESUSE */
353        assert(state != CGenNodeStateless::ss_FREE_RESUSE);
354        if (state == CGenNodeStateless::ss_INACTIVE ) {
355
356            if (cur_node->m_action_counter > 0) {
357                cur_node->m_action_counter--;
358                if (cur_node->m_action_counter==0) {
359                    to_stop_port = lp_port->update_number_of_active_streams(1);
360                }else{
361                    /* refill start info and scedule, no update in active streams  */
362                    next_node->refresh();
363                    schedule = true;
364                }
365            }else{
366                /* refill start info and scedule, no update in active streams  */
367                next_node->refresh();
368                schedule = true;
369            }
370
371        }else{
372            to_stop_port = lp_port->update_number_of_active_streams(1);
373        }
374    }
375
376    if ( to_stop_port ) {
377        /* call stop port explictly to move the state */
378        stop_traffic(cur_node->m_port_id,false,0);
379    }
380
381    return ( schedule );
382}
383
384
385
386/**
387 * in idle state loop, the processor most of the time sleeps
388 * and periodically checks for messages
389 *
390 * @author imarom (01-Nov-15)
391 */
392void
393TrexStatelessDpCore::idle_state_loop() {
394
395    const int SHORT_DELAY_MS    = 2;
396    const int LONG_DELAY_MS     = 50;
397    const int DEEP_SLEEP_LIMIT  = 2000;
398
399    int counter = 0;
400
401    while (m_state == STATE_IDLE) {
402        m_core->m_node_gen.m_v_if->flush_dp_rx_queue();
403        bool had_msg = periodic_check_for_cp_messages();
404        if (had_msg) {
405            counter = 0;
406            continue;
407        }
408
409        /* enter deep sleep only if enough time had passed */
410        if (counter < DEEP_SLEEP_LIMIT) {
411            delay(SHORT_DELAY_MS);
412            counter++;
413        } else {
414            delay(LONG_DELAY_MS);
415        }
416
417    }
418}
419
420
421
422void TrexStatelessDpCore::quit_main_loop(){
423    m_core->set_terminate_mode(true); /* mark it as terminated */
424    m_state = STATE_TERMINATE;
425    add_global_duration(0.0001);
426}
427
428
429/**
430 * scehduler runs when traffic exists
431 * it will return when no more transmitting is done on this
432 * core
433 *
434 * @author imarom (01-Nov-15)
435 */
436void
437TrexStatelessDpCore::start_scheduler() {
438    /* creates a maintenace job using the scheduler */
439    CGenNode * node_sync = m_core->create_node() ;
440    node_sync->m_type = CGenNode::FLOW_SYNC;
441    node_sync->m_time = m_core->m_cur_time_sec + SYNC_TIME_OUT;
442    m_core->m_node_gen.add_node(node_sync);
443
444    double old_offset = 0.0;
445    m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset);
446    /* bail out in case of terminate */
447    if (m_state != TrexStatelessDpCore::STATE_TERMINATE) {
448        m_core->m_node_gen.close_file(m_core);
449        m_state = STATE_IDLE; /* we exit from all ports and we have nothing to do, we move to IDLE state */
450    }
451}
452
453
454void
455TrexStatelessDpCore::run_once(){
456
457    idle_state_loop();
458
459    if ( m_state == STATE_TERMINATE ){
460        return;
461    }
462
463    start_scheduler();
464}
465
466
467
468
469void
470TrexStatelessDpCore::start() {
471
472    while (true) {
473        run_once();
474
475        if ( m_core->is_terminated_by_master() ) {
476            break;
477        }
478    }
479}
480
481/* only if both port are idle we can exit */
482void
483TrexStatelessDpCore::schedule_exit(){
484
485    CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
486
487    node->m_type = CGenNode::COMMAND;
488
489    node->m_cmd = new TrexStatelessDpCanQuit();
490
491    /* make sure it will be scheduled after the current node */
492    node->m_time = m_core->m_cur_time_sec ;
493
494    m_core->m_node_gen.add_node((CGenNode *)node);
495}
496
497
498void
499TrexStatelessDpCore::add_global_duration(double duration){
500    if (duration > 0.0) {
501        CGenNode *node = m_core->create_node() ;
502
503        node->m_type = CGenNode::EXIT_SCHED;
504
505        /* make sure it will be scheduled after the current node */
506        node->m_time = m_core->m_cur_time_sec + duration ;
507
508        m_core->m_node_gen.add_node(node);
509    }
510}
511
512/* add per port exit */
513void
514TrexStatelessDpCore::add_port_duration(double duration,
515                                       uint8_t port_id,
516                                       int event_id){
517    if (duration > 0.0) {
518        CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
519
520        node->m_type = CGenNode::COMMAND;
521
522        /* make sure it will be scheduled after the current node */
523        node->m_time = m_core->m_cur_time_sec + duration ;
524
525        TrexStatelessDpStop * cmd=new TrexStatelessDpStop(port_id);
526
527
528        /* test this */
529        m_core->m_non_active_nodes++;
530        cmd->set_core_ptr(m_core);
531        cmd->set_event_id(event_id);
532        cmd->set_wait_for_event_id(true);
533
534        node->m_cmd = cmd;
535
536        m_core->m_node_gen.add_node((CGenNode *)node);
537    }
538}
539
540
541void TrexStatelessDpCore::update_mac_addr(TrexStream * stream,
542                                          CGenNodeStateless *node,
543                                          pkt_dir_t dir,
544                                          char *raw_pkt){
545    bool              ov_src = stream->get_override_src_mac_by_pkt_data();
546    TrexStream::stream_dst_mac_t  ov_dst = stream->get_override_dst_mac_mode();
547
548
549    if ( (ov_src == true) && (ov_dst == TrexStream::stPKT) ) {
550        /* nothing to do, take from the packet both */
551        return;
552    }
553
554        /* take from cfg_file */
555    if ( (ov_src == false) &&
556         (ov_dst == TrexStream::stCFG_FILE) ){
557
558          m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*)raw_pkt);
559          return;
560    }
561
562    /* save the pkt*/
563    char tmp_pkt[12];
564    memcpy(tmp_pkt,raw_pkt,12);
565
566    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*)raw_pkt);
567
568    if ((ov_src == true) && (ov_dst == TrexStream::stCFG_FILE)) {
569        memcpy(raw_pkt+6,tmp_pkt+6,6);
570    }
571
572    if ((ov_src == false) && (ov_dst == TrexStream::stPKT)) {
573        memcpy(raw_pkt,tmp_pkt,6);
574    }
575}
576
577
578void
579TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port,
580                                TrexStream * stream,
581                                TrexStreamsCompiledObj *comp) {
582    CGenNodeStateless *node = m_core->create_node_sl();
583
584    /* add periodic */
585    node->m_cache_mbuf=0;
586    node->m_type = CGenNode::STATELESS_PKT;
587
588    node->m_action_counter = stream->m_action_count;
589
590    /* clone the stream from control plane memory to DP memory */
591    node->m_ref_stream_info = stream->clone();
592    /* no need for this memory anymore on the control plane memory */
593    stream->release_dp_object();
594
595    node->m_next_stream=0; /* will be fixed later */
596
597    if ( stream->m_self_start ){
598        /* if self start it is in active mode */
599        node->m_state =CGenNodeStateless::ss_ACTIVE;
600        lp_port->m_active_streams++;
601    }else{
602        node->m_state =CGenNodeStateless::ss_INACTIVE;
603    }
604
605    node->m_time = m_core->m_cur_time_sec + stream->get_start_delay_sec();
606
607    pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(stream->m_port_id);
608    node->m_flags = 0;
609    node->m_src_port =0;
610    node->m_original_packet_data_prefix = 0;
611
612    if (stream->m_rx_check.m_enabled) {
613        node->set_stat_needed();
614        uint8_t hw_id = stream->m_rx_check.m_hw_id;
615        assert (hw_id < MAX_FLOW_STATS);
616        node->set_stat_hw_id(hw_id);
617    }
618
619    /* set socket id */
620    node->set_socket_id(m_core->m_node_gen.m_socket_id);
621
622    /* build a mbuf from a packet */
623
624    uint16_t pkt_size = stream->m_pkt.len;
625    const uint8_t *stream_pkt = stream->m_pkt.binary;
626
627    node->m_pause =0;
628    node->m_stream_type = stream->m_type;
629    node->m_next_time_offset = 1.0 / stream->get_pps();
630    node->m_null_stream = (stream->m_null_stream ? 1 : 0);
631
632    /* stateless specific fields */
633    switch ( stream->m_type ) {
634
635    case TrexStream::stCONTINUOUS :
636        node->m_single_burst=0;
637        node->m_single_burst_refill=0;
638        node->m_multi_bursts=0;
639        break;
640
641    case TrexStream::stSINGLE_BURST :
642        node->m_stream_type             = TrexStream::stMULTI_BURST;
643        node->m_single_burst            = stream->m_burst_total_pkts;
644        node->m_single_burst_refill     = stream->m_burst_total_pkts;
645        node->m_multi_bursts            = 1;  /* single burst in multi burst of 1 */
646        break;
647
648    case TrexStream::stMULTI_BURST :
649        node->m_single_burst        = stream->m_burst_total_pkts;
650        node->m_single_burst_refill = stream->m_burst_total_pkts;
651        node->m_multi_bursts        = stream->m_num_bursts;
652        break;
653    default:
654
655        assert(0);
656    };
657
658    node->m_port_id = stream->m_port_id;
659
660    /* set dir 0 or 1 client or server */
661    node->set_mbuf_cache_dir(dir);
662
663
664    if (node->m_ref_stream_info->getDpVm() == NULL) {
665        /* no VM */
666
667        node->m_vm_flow_var =  NULL;
668        node->m_vm_program  =  NULL;
669        node->m_vm_program_size =0;
670
671                /* allocate const mbuf */
672        rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size);
673        assert(m);
674
675        char *p = rte_pktmbuf_append(m, pkt_size);
676        assert(p);
677        /* copy the packet */
678        memcpy(p,stream_pkt,pkt_size);
679
680        update_mac_addr(stream,node,dir,p);
681
682        /* set the packet as a readonly */
683        node->set_cache_mbuf(m);
684
685        node->m_original_packet_data_prefix =0;
686    }else{
687
688        /* set the program */
689        TrexStream * local_mem_stream = node->m_ref_stream_info;
690
691        StreamVmDp  * lpDpVm = local_mem_stream->getDpVm();
692
693        node->m_vm_flow_var      = lpDpVm->clone_bss(); /* clone the flow var */
694        node->m_vm_program       = lpDpVm->get_program(); /* same ref to the program */
695        node->m_vm_program_size  = lpDpVm->get_program_size();
696
697
698        /* set the random seed if was set */
699        if ( lpDpVm->is_random_seed() ){
700            /* if we have random seed for this program */
701            if (stream->m_random_seed) {
702                node->set_random_seed(stream->m_random_seed);
703            }
704        }
705
706        /* we need to copy the object */
707        if ( pkt_size > lpDpVm->get_prefix_size() ) {
708            /* we need const packet */
709            uint16_t const_pkt_size  = pkt_size - lpDpVm->get_prefix_size() ;
710            rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), const_pkt_size );
711            assert(m);
712
713            char *p = rte_pktmbuf_append(m, const_pkt_size);
714            assert(p);
715
716            /* copy packet data */
717            memcpy(p,(stream_pkt + lpDpVm->get_prefix_size()),const_pkt_size);
718
719            node->set_const_mbuf(m);
720        }
721
722
723        if ( lpDpVm->is_pkt_size_var() ) {
724            // mark the node as varible size
725            node->set_var_pkt_size();
726        }
727
728
729        if (lpDpVm->get_prefix_size() > pkt_size ) {
730            lpDpVm->set_prefix_size(pkt_size);
731        }
732
733        /* copy the headr */
734        uint16_t header_size = lpDpVm->get_prefix_size();
735        assert(header_size);
736        node->alloc_prefix_header(header_size);
737        uint8_t *p=node->m_original_packet_data_prefix;
738        assert(p);
739
740        memcpy(p,stream_pkt , header_size);
741
742        update_mac_addr(stream,node,dir,(char *)p);
743    }
744
745
746    CDpOneStream one_stream;
747
748    one_stream.m_dp_stream = node->m_ref_stream_info;
749    one_stream.m_node =node;
750
751    lp_port->m_active_nodes.push_back(one_stream);
752
753    /* schedule only if active */
754    if (node->m_state == CGenNodeStateless::ss_ACTIVE) {
755        m_core->m_node_gen.add_node((CGenNode *)node);
756    }
757}
758
759void
760TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj,
761                                   double duration,
762                                   int event_id) {
763
764
765    TrexStatelessDpPerPort * lp_port=get_port_db(obj->get_port_id());
766    lp_port->m_active_streams = 0;
767    lp_port->set_event_id(event_id);
768
769    /* no nodes in the list */
770    assert(lp_port->m_active_nodes.size()==0);
771
772    for (auto single_stream : obj->get_objects()) {
773        /* all commands should be for the same port */
774        assert(obj->get_port_id() == single_stream.m_stream->m_port_id);
775        add_stream(lp_port,single_stream.m_stream,obj);
776    }
777
778    uint32_t nodes = lp_port->m_active_nodes.size();
779    /* find next stream */
780    assert(nodes == obj->get_objects().size());
781
782    int cnt=0;
783
784    /* set the next_stream pointer  */
785    for (auto single_stream : obj->get_objects()) {
786
787        if (single_stream.m_stream->is_dp_next_stream() ) {
788            int stream_id = single_stream.m_stream->m_next_stream_id;
789            assert(stream_id<nodes);
790            /* point to the next stream , stream_id is fixed */
791            lp_port->m_active_nodes[cnt].m_node->m_next_stream = lp_port->m_active_nodes[stream_id].m_node ;
792        }
793        cnt++;
794    }
795
796    lp_port->m_state =TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
797    m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
798
799
800    if ( duration > 0.0 ){
801        add_port_duration( duration ,obj->get_port_id(),event_id );
802    }
803
804}
805
806
807bool TrexStatelessDpCore::are_all_ports_idle(){
808
809    bool res=true;
810    int i;
811    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
812        if ( m_ports[i].m_state != TrexStatelessDpPerPort::ppSTATE_IDLE ){
813            res=false;
814        }
815    }
816    return (res);
817}
818
819
820void
821TrexStatelessDpCore::resume_traffic(uint8_t port_id){
822
823    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
824
825    lp_port->resume_traffic(port_id);
826}
827
828
829void
830TrexStatelessDpCore::pause_traffic(uint8_t port_id){
831
832    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
833
834    lp_port->pause_traffic(port_id);
835}
836
837void
838TrexStatelessDpCore::update_traffic(uint8_t port_id, double factor) {
839
840    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
841
842    lp_port->update_traffic(port_id, factor);
843}
844
845
846void
847TrexStatelessDpCore::stop_traffic(uint8_t  port_id,
848                                  bool     stop_on_id,
849                                  int      event_id) {
850    /* we cannot remove nodes not from the top of the queue so
851       for every active node - make sure next time
852       the scheduler invokes it, it will be free */
853
854    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
855
856    if ( lp_port->stop_traffic(port_id,stop_on_id,event_id) == false){
857        /* nothing to do ! already stopped */
858        //printf(" skip .. %f\n",m_core->m_cur_time_sec);
859        return;
860    }
861
862    /* inform the control plane we stopped - this might be a async stop
863       (streams ended)
864    */
865    #if 0
866    if ( are_all_ports_idle() ) {
867        /* just a place holder if we will need to do somthing in that case */
868    }
869    #endif
870
871    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
872    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
873                                                                   port_id,
874                                                                   lp_port->get_event_id());
875    ring->Enqueue((CGenNode *)event_msg);
876
877}
878
879/**
880 * handle a message from CP to DP
881 *
882 */
883void
884TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) {
885    msg->handle(this);
886    delete msg;
887}
888
889void
890TrexStatelessDpCore::barrier(uint8_t port_id, int event_id) {
891
892    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
893    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
894                                                                   port_id,
895                                                                   event_id);
896    ring->Enqueue((CGenNode *)event_msg);
897}
898