trex_stateless_dp_core.cpp revision 996f2451
1/*
2 Itay Marom
3 Hanoch Haim
4 Cisco Systems, Inc.
5*/
6
7/*
8Copyright (c) 2015-2016 Cisco Systems, Inc.
9
10Licensed under the Apache License, Version 2.0 (the "License");
11you may not use this file except in compliance with the License.
12You may obtain a copy of the License at
13
14    http://www.apache.org/licenses/LICENSE-2.0
15
16Unless required by applicable law or agreed to in writing, software
17distributed under the License is distributed on an "AS IS" BASIS,
18WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19See the License for the specific language governing permissions and
20limitations under the License.
21*/
22#include "bp_sim.h"
23#include "trex_stateless_dp_core.h"
24#include "trex_stateless_messaging.h"
25#include "trex_stream.h"
26#include "trex_stream_node.h"
27#include "trex_streams_compiler.h"
28#include "mbuf.h"
29
30
31
32
33void CGenNodeStateless::cache_mbuf_array_init(){
34    m_cache_size=0;
35    m_cache_array_cnt=0;
36}
37
38
39
40void CGenNodeStateless::cache_mbuf_array_copy(CGenNodeCacheMbuf *obj,
41                                              uint16_t size){
42
43    int i;
44    cache_mbuf_array_alloc(size);
45    for (i=0; i<size; i++) {
46        cache_mbuf_array_set(i,obj->m_array[i]);
47    }
48    cache_mbuf_array_set_const_mbuf(obj->m_mbuf_const);
49}
50
51
52rte_mbuf_t ** CGenNodeStateless::cache_mbuf_array_alloc(uint16_t size){
53
54    uint32_t buf_size = CGenNodeCacheMbuf::get_object_size(size);
55    /* TBD  replace with align, zero API */
56    m_cache_mbuf = (void *)malloc(buf_size);
57    assert(m_cache_mbuf);
58    memset(m_cache_mbuf,0,buf_size);
59
60    m_flags |= SL_NODE_CONST_MBUF_CACHE_ARRAY;
61    m_cache_size=size;
62    m_cache_array_cnt=0;
63    return ((rte_mbuf_t **)m_cache_mbuf);
64}
65
66void CGenNodeStateless::cache_mbuf_array_free(){
67
68    assert(m_cache_mbuf);
69    int i;
70    for (i=0; i<(int)m_cache_size; i++) {
71        rte_mbuf_t * m=cache_mbuf_array_get((uint16_t)i);
72        assert(m);
73        rte_pktmbuf_free(m);
74    }
75
76    /* free the const */
77    rte_mbuf_t * m=cache_mbuf_array_get_const_mbuf() ;
78    if (m) {
79        rte_pktmbuf_free(m);
80    }
81
82    free(m_cache_mbuf);
83    m_cache_mbuf=0;
84}
85
86
87rte_mbuf_t * CGenNodeStateless::cache_mbuf_array_get(uint16_t index){
88
89    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
90    return (p->m_array[index]);
91}
92
93void CGenNodeStateless::cache_mbuf_array_set_const_mbuf(rte_mbuf_t * m){
94    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
95    p->m_mbuf_const=m;
96}
97
98rte_mbuf_t * CGenNodeStateless::cache_mbuf_array_get_const_mbuf(){
99    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
100    return (p->m_mbuf_const);
101}
102
103
104void CGenNodeStateless::cache_mbuf_array_set(uint16_t index,
105                                             rte_mbuf_t * m){
106    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
107    p->m_array[index]=m;
108}
109
110
111void CDpOneStream::Delete(CFlowGenListPerThread   * core){
112    assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE);
113    core->free_node((CGenNode *)m_node);
114    delete m_dp_stream;
115    m_node=0;
116    m_dp_stream=0;
117}
118
119void CDpOneStream::DeleteOnlyStream(){
120    assert(m_dp_stream);
121    delete m_dp_stream;
122    m_dp_stream=0;
123}
124
125int CGenNodeStateless::get_stream_id(){
126    if (m_state ==CGenNodeStateless::ss_FREE_RESUSE) {
127        return (-1); // not valid
128    }
129    assert(m_ref_stream_info);
130    return ((int)m_ref_stream_info->m_stream_id);
131}
132
133
134void CGenNodeStateless::DumpHeader(FILE *fd){
135    fprintf(fd," pkt_id, time, port , action , state, stream_id , stype , m-burst# , burst# \n");
136
137}
138void CGenNodeStateless::Dump(FILE *fd){
139    fprintf(fd," %2.4f, %3lu, %s,%s, %3d, %s, %3lu, %3lu  \n",
140            m_time,
141            (ulong)m_port_id,
142            "s-pkt", //action
143            get_stream_state_str(m_state ).c_str(),
144            get_stream_id(),   //stream_id
145            TrexStream::get_stream_type_str(m_stream_type).c_str(), //stype
146            (ulong)m_multi_bursts,
147            (ulong)m_single_burst
148            );
149}
150
151
152
153void CGenNodeStateless::refresh_vm_bss(){
154    if ( m_vm_flow_var ) {
155        StreamVmDp  * vm_s=m_ref_stream_info->m_vm_dp;
156        assert(vm_s);
157        memcpy(m_vm_flow_var,vm_s->get_bss(),vm_s->get_bss_size());
158
159        if ( vm_s->is_random_seed() ){
160            /* if we have random seed for this program */
161            if (m_ref_stream_info->m_random_seed) {
162                set_random_seed(m_ref_stream_info->m_random_seed);
163            }
164        }
165    }
166}
167
168
169/**
170 * this function called when stream restart after it was inactive
171 */
172void CGenNodeStateless::refresh(){
173
174    /* refill the stream info */
175    m_single_burst    = m_single_burst_refill;
176    m_multi_bursts    = m_ref_stream_info->m_num_bursts;
177    m_state           = CGenNodeStateless::ss_ACTIVE;
178
179    /* refresh init value */
180#if 0
181    /* TBD should add a JSON varible for that */
182    refresh_vm_bss();
183#endif
184}
185
186
187void CGenNodeCommand::free_command(){
188
189    assert(m_cmd);
190    m_cmd->on_node_remove();
191    delete m_cmd;
192}
193
194
195std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state){
196    std::string res;
197
198    switch (stream_state) {
199    case CGenNodeStateless::ss_FREE_RESUSE :
200         res="FREE    ";
201        break;
202    case CGenNodeStateless::ss_INACTIVE :
203        res="INACTIVE ";
204        break;
205    case CGenNodeStateless::ss_ACTIVE :
206        res="ACTIVE   ";
207        break;
208    default:
209        res="Unknow   ";
210    };
211    return(res);
212}
213
214
215rte_mbuf_t   * CGenNodeStateless::alloc_node_with_vm(){
216
217    rte_mbuf_t        * m;
218    /* alloc small packet buffer*/
219    uint16_t prefix_size = prefix_header_size();
220    m = CGlobalInfo::pktmbuf_alloc( get_socket_id(), prefix_size );
221    if (m==0) {
222        return (m);
223    }
224    /* TBD remove this, should handle cases of error */
225    assert(m);
226    char *p=rte_pktmbuf_append(m, prefix_size);
227    memcpy( p ,m_original_packet_data_prefix, prefix_size);
228
229
230    /* run the VM program */
231    StreamDPVmInstructionsRunner runner;
232
233    runner.run( (uint32_t*)m_vm_flow_var,
234                m_vm_program_size,
235                m_vm_program,
236                m_vm_flow_var,
237                (uint8_t*)p);
238
239    uint16_t pkt_new_size=runner.get_new_pkt_size();
240    if ( likely( pkt_new_size == 0) ) {
241        /* no packet size change */
242        rte_mbuf_t * m_const = get_const_mbuf();
243        if (  m_const != NULL) {
244            utl_rte_pktmbuf_add_after(m,m_const);
245        }
246        return (m);
247    }
248
249    /* packet size change there are a few changes */
250    rte_mbuf_t * m_const = get_const_mbuf();
251    if ( (m_const == 0 ) || (pkt_new_size<=prefix_size) ) {
252        /* one mbuf , just trim it */
253        m->data_len = pkt_new_size;
254        m->pkt_len  = pkt_new_size;
255        return (m);
256    }
257
258    rte_mbuf_t * mi= CGlobalInfo::pktmbuf_alloc_small(get_socket_id());
259    assert(mi);
260    rte_pktmbuf_attach(mi,m_const);
261    utl_rte_pktmbuf_add_after2(m,mi);
262
263    if ( pkt_new_size < m->pkt_len) {
264        /* need to trim it */
265        mi->data_len = (pkt_new_size - prefix_size);
266        m->pkt_len   = pkt_new_size;
267    }
268    return (m);
269}
270
271void CGenNodeStateless::free_stl_vm_buf(){
272        rte_mbuf_t * m ;
273         m=get_const_mbuf();
274         if (m) {
275             rte_pktmbuf_free(m); /* reduce the ref counter */
276             /* clear the const marker */
277             clear_const_mbuf();
278         }
279
280         free_prefix_header();
281
282         if (m_vm_flow_var) {
283             /* free flow var */
284             free(m_vm_flow_var);
285             m_vm_flow_var=0;
286         }
287}
288
289
290
291void CGenNodeStateless::free_stl_node(){
292
293    if ( is_cache_mbuf_array() ){
294        /* do we have cache of mbuf pre allocated */
295        cache_mbuf_array_free();
296    }else{
297        /* if we have cache mbuf free it */
298        rte_mbuf_t * m=get_cache_mbuf();
299        if (m) {
300                rte_pktmbuf_free(m);
301                m_cache_mbuf=0;
302        }
303    }
304    free_stl_vm_buf();
305}
306
307
308bool TrexStatelessDpPerPort::update_number_of_active_streams(uint32_t d){
309    m_active_streams-=d; /* reduce the number of streams */
310    if (m_active_streams == 0) {
311        return (true);
312    }
313    return (false);
314}
315
316bool TrexStatelessDpPerPort::resume_traffic(uint8_t port_id){
317
318    /* we are working with continues streams so we must be in transmit mode */
319    assert(m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE);
320
321    for (auto dp_stream : m_active_nodes) {
322        CGenNodeStateless * node =dp_stream.m_node;
323        assert(node->get_port_id() == port_id);
324        assert(node->is_pause() == true);
325        node->set_pause(false);
326    }
327    m_state = TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
328    return (true);
329}
330
331bool TrexStatelessDpPerPort::update_traffic(uint8_t port_id, double factor) {
332
333    assert( (m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING ||
334            (m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE)) );
335
336    for (auto dp_stream : m_active_nodes) {
337        CGenNodeStateless * node = dp_stream.m_node;
338        assert(node->get_port_id() == port_id);
339
340        node->update_rate(factor);
341    }
342
343    return (true);
344}
345
346bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){
347
348    /* we are working with continues streams so we must be in transmit mode */
349    assert(m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING);
350
351    for (auto dp_stream : m_active_nodes) {
352        CGenNodeStateless * node =dp_stream.m_node;
353        assert(node->get_port_id() == port_id);
354        assert(node->is_pause() == false);
355        node->set_pause(true);
356    }
357    m_state = TrexStatelessDpPerPort::ppSTATE_PAUSE;
358    return (true);
359}
360
361
362bool TrexStatelessDpPerPort::stop_traffic(uint8_t  port_id,
363                                          bool     stop_on_id,
364                                          int      event_id){
365
366
367    if (m_state == TrexStatelessDpPerPort::ppSTATE_IDLE) {
368        assert(m_active_streams==0);
369        return false;
370    }
371
372    /* there could be race of stop after stop */
373    if ( stop_on_id ) {
374        if (event_id != m_event_id){
375            /* we can't stop it is an old message */
376            return false;
377        }
378    }
379
380    for (auto dp_stream : m_active_nodes) {
381        CGenNodeStateless * node =dp_stream.m_node;
382        assert(node->get_port_id() == port_id);
383        if ( node->get_state() == CGenNodeStateless::ss_ACTIVE) {
384            node->mark_for_free();
385            m_active_streams--;
386            dp_stream.DeleteOnlyStream();
387
388        }else{
389            dp_stream.Delete(m_core);
390        }
391    }
392
393    /* active stream should be zero */
394    assert(m_active_streams==0);
395    m_active_nodes.clear();
396    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
397    return (true);
398}
399
400
401void TrexStatelessDpPerPort::create(CFlowGenListPerThread   *  core){
402    m_core=core;
403    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
404    m_port_id=0;
405    m_active_streams=0;
406    m_active_nodes.clear();
407}
408
409
410
411void
412TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) {
413    m_thread_id = thread_id;
414    m_core = core;
415    m_local_port_offset = 2*core->getDualPortId();
416
417    CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
418
419    m_ring_from_cp = cp_dp->getRingCpToDp(thread_id);
420    m_ring_to_cp   = cp_dp->getRingDpToCp(thread_id);
421
422    m_state = STATE_IDLE;
423
424    int i;
425    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
426        m_ports[i].create(core);
427    }
428}
429
430
431/* move to the next stream, old stream move to INACTIVE */
432bool TrexStatelessDpCore::set_stateless_next_node(CGenNodeStateless * cur_node,
433                                                  CGenNodeStateless * next_node){
434
435    assert(cur_node);
436    TrexStatelessDpPerPort * lp_port = get_port_db(cur_node->m_port_id);
437    bool schedule =false;
438
439    bool to_stop_port=false;
440
441    if (next_node == NULL) {
442        /* there is no next stream , reduce the number of active streams*/
443        to_stop_port = lp_port->update_number_of_active_streams(1);
444
445    }else{
446        uint8_t state=next_node->get_state();
447
448        /* can't be FREE_RESUSE */
449        assert(state != CGenNodeStateless::ss_FREE_RESUSE);
450        if (state == CGenNodeStateless::ss_INACTIVE ) {
451
452            if (cur_node->m_action_counter > 0) {
453                cur_node->m_action_counter--;
454                if (cur_node->m_action_counter==0) {
455                    to_stop_port = lp_port->update_number_of_active_streams(1);
456                }else{
457                    /* refill start info and scedule, no update in active streams  */
458                    next_node->refresh();
459                    schedule = true;
460                }
461            }else{
462                /* refill start info and scedule, no update in active streams  */
463                next_node->refresh();
464                schedule = true;
465            }
466
467        }else{
468            to_stop_port = lp_port->update_number_of_active_streams(1);
469        }
470    }
471
472    if ( to_stop_port ) {
473        /* call stop port explictly to move the state */
474        stop_traffic(cur_node->m_port_id,false,0);
475    }
476
477    return ( schedule );
478}
479
480
481
482/**
483 * in idle state loop, the processor most of the time sleeps
484 * and periodically checks for messages
485 *
486 * @author imarom (01-Nov-15)
487 */
488void
489TrexStatelessDpCore::idle_state_loop() {
490
491    const int SHORT_DELAY_MS    = 2;
492    const int LONG_DELAY_MS     = 50;
493    const int DEEP_SLEEP_LIMIT  = 2000;
494
495    int counter = 0;
496
497    while (m_state == STATE_IDLE) {
498        m_core->m_node_gen.m_v_if->flush_dp_rx_queue();
499        bool had_msg = periodic_check_for_cp_messages();
500        if (had_msg) {
501            counter = 0;
502            continue;
503        }
504
505        /* enter deep sleep only if enough time had passed */
506        if (counter < DEEP_SLEEP_LIMIT) {
507            delay(SHORT_DELAY_MS);
508            counter++;
509        } else {
510            delay(LONG_DELAY_MS);
511        }
512
513    }
514}
515
516
517
518void TrexStatelessDpCore::quit_main_loop(){
519    m_core->set_terminate_mode(true); /* mark it as terminated */
520    m_state = STATE_TERMINATE;
521    add_global_duration(0.0001);
522}
523
524
525/**
526 * scehduler runs when traffic exists
527 * it will return when no more transmitting is done on this
528 * core
529 *
530 * @author imarom (01-Nov-15)
531 */
532void
533TrexStatelessDpCore::start_scheduler() {
534    /* creates a maintenace job using the scheduler */
535    CGenNode * node_sync = m_core->create_node() ;
536    node_sync->m_type = CGenNode::FLOW_SYNC;
537    node_sync->m_time = m_core->m_cur_time_sec + SYNC_TIME_OUT;
538    m_core->m_node_gen.add_node(node_sync);
539
540    double old_offset = 0.0;
541    m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset);
542    /* bail out in case of terminate */
543    if (m_state != TrexStatelessDpCore::STATE_TERMINATE) {
544        m_core->m_node_gen.close_file(m_core);
545        m_state = STATE_IDLE; /* we exit from all ports and we have nothing to do, we move to IDLE state */
546    }
547}
548
549
550void
551TrexStatelessDpCore::run_once(){
552
553    idle_state_loop();
554
555    if ( m_state == STATE_TERMINATE ){
556        return;
557    }
558
559    start_scheduler();
560}
561
562
563
564
565void
566TrexStatelessDpCore::start() {
567
568    while (true) {
569        run_once();
570
571        if ( m_core->is_terminated_by_master() ) {
572            break;
573        }
574    }
575}
576
577/* only if both port are idle we can exit */
578void
579TrexStatelessDpCore::schedule_exit(){
580
581    CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
582
583    node->m_type = CGenNode::COMMAND;
584
585    node->m_cmd = new TrexStatelessDpCanQuit();
586
587    /* make sure it will be scheduled after the current node */
588    node->m_time = m_core->m_cur_time_sec ;
589
590    m_core->m_node_gen.add_node((CGenNode *)node);
591}
592
593
594void
595TrexStatelessDpCore::add_global_duration(double duration){
596    if (duration > 0.0) {
597        CGenNode *node = m_core->create_node() ;
598
599        node->m_type = CGenNode::EXIT_SCHED;
600
601        /* make sure it will be scheduled after the current node */
602        node->m_time = m_core->m_cur_time_sec + duration ;
603
604        m_core->m_node_gen.add_node(node);
605    }
606}
607
608/* add per port exit */
609void
610TrexStatelessDpCore::add_port_duration(double duration,
611                                       uint8_t port_id,
612                                       int event_id){
613    if (duration > 0.0) {
614        CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
615
616        node->m_type = CGenNode::COMMAND;
617
618        /* make sure it will be scheduled after the current node */
619        node->m_time = m_core->m_cur_time_sec + duration ;
620
621        TrexStatelessDpStop * cmd=new TrexStatelessDpStop(port_id);
622
623
624        /* test this */
625        m_core->m_non_active_nodes++;
626        cmd->set_core_ptr(m_core);
627        cmd->set_event_id(event_id);
628        cmd->set_wait_for_event_id(true);
629
630        node->m_cmd = cmd;
631
632        m_core->m_node_gen.add_node((CGenNode *)node);
633    }
634}
635
636
637void TrexStatelessDpCore::update_mac_addr(TrexStream * stream,
638                                          CGenNodeStateless *node,
639                                          pkt_dir_t dir,
640                                          char *raw_pkt){
641    bool              ov_src = stream->get_override_src_mac_by_pkt_data();
642    TrexStream::stream_dst_mac_t  ov_dst = stream->get_override_dst_mac_mode();
643
644
645    if ( (ov_src == true) && (ov_dst == TrexStream::stPKT) ) {
646        /* nothing to do, take from the packet both */
647        return;
648    }
649
650        /* take from cfg_file */
651    if ( (ov_src == false) &&
652         (ov_dst == TrexStream::stCFG_FILE) ){
653
654          m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*)raw_pkt);
655          return;
656    }
657
658    /* save the pkt*/
659    char tmp_pkt[12];
660    memcpy(tmp_pkt,raw_pkt,12);
661
662    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*)raw_pkt);
663
664    if ((ov_src == true) && (ov_dst == TrexStream::stCFG_FILE)) {
665        memcpy(raw_pkt+6,tmp_pkt+6,6);
666    }
667
668    if ((ov_src == false) && (ov_dst == TrexStream::stPKT)) {
669        memcpy(raw_pkt,tmp_pkt,6);
670    }
671}
672
673
674void TrexStatelessDpCore::replay_vm_into_cache(TrexStream * stream,
675                                               CGenNodeStateless *node){
676
677    uint16_t      cache_size = stream->m_cache_size;
678    assert(cache_size>0);
679    rte_mbuf_t * m=0;
680
681    uint32_t buf_size = CGenNodeCacheMbuf::get_object_size(cache_size);
682    CGenNodeCacheMbuf * p = (CGenNodeCacheMbuf *)malloc(buf_size);
683    assert(p);
684    memset(p,0,buf_size);
685
686    int i;
687    for (i=0; i<cache_size; i++) {
688        p->m_array[i] =  node->alloc_node_with_vm();
689    }
690    /* save const */
691    m=node->get_const_mbuf();
692    if (m) {
693        p->m_mbuf_const=m;
694        rte_pktmbuf_refcnt_update(m,1);
695    }
696
697    /* free all VM and const mbuf */
698    node->free_stl_vm_buf();
699
700    /* copy to local node meory */
701    node->cache_mbuf_array_copy(p,cache_size);
702
703    /* free the memory */
704    free(p);
705}
706
707
708void
709TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port,
710                                TrexStream * stream,
711                                TrexStreamsCompiledObj *comp) {
712    CGenNodeStateless *node = m_core->create_node_sl();
713
714    node->cache_mbuf_array_init();
715    node->m_batch_size=0;
716
717    /* add periodic */
718    node->m_cache_mbuf=0;
719    node->m_type = CGenNode::STATELESS_PKT;
720
721    node->m_action_counter = stream->m_action_count;
722
723    /* clone the stream from control plane memory to DP memory */
724    node->m_ref_stream_info = stream->clone();
725    /* no need for this memory anymore on the control plane memory */
726    stream->release_dp_object();
727
728    node->m_next_stream=0; /* will be fixed later */
729
730    if ( stream->m_self_start ){
731        /* if self start it is in active mode */
732        node->m_state =CGenNodeStateless::ss_ACTIVE;
733        lp_port->m_active_streams++;
734    }else{
735        node->m_state =CGenNodeStateless::ss_INACTIVE;
736    }
737
738    node->m_time = m_core->m_cur_time_sec + stream->get_start_delay_sec();
739
740    pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(stream->m_port_id);
741    node->m_flags = 0;
742    node->m_src_port =0;
743    node->m_original_packet_data_prefix = 0;
744
745    if (stream->m_rx_check.m_enabled) {
746        node->set_stat_needed();
747        uint8_t hw_id = stream->m_rx_check.m_hw_id;
748        assert (hw_id < MAX_FLOW_STATS);
749        node->set_stat_hw_id(hw_id);
750    }
751
752    /* set socket id */
753    node->set_socket_id(m_core->m_node_gen.m_socket_id);
754
755    /* build a mbuf from a packet */
756
757    uint16_t pkt_size = stream->m_pkt.len;
758    const uint8_t *stream_pkt = stream->m_pkt.binary;
759
760    node->m_pause =0;
761    node->m_stream_type = stream->m_type;
762    node->m_next_time_offset = 1.0 / stream->get_pps();
763    node->m_null_stream = (stream->m_null_stream ? 1 : 0);
764
765    /* stateless specific fields */
766    switch ( stream->m_type ) {
767
768    case TrexStream::stCONTINUOUS :
769        node->m_single_burst=0;
770        node->m_single_burst_refill=0;
771        node->m_multi_bursts=0;
772        break;
773
774    case TrexStream::stSINGLE_BURST :
775        node->m_stream_type             = TrexStream::stMULTI_BURST;
776        node->m_single_burst            = stream->m_burst_total_pkts;
777        node->m_single_burst_refill     = stream->m_burst_total_pkts;
778        node->m_multi_bursts            = 1;  /* single burst in multi burst of 1 */
779        break;
780
781    case TrexStream::stMULTI_BURST :
782        node->m_single_burst        = stream->m_burst_total_pkts;
783        node->m_single_burst_refill = stream->m_burst_total_pkts;
784        node->m_multi_bursts        = stream->m_num_bursts;
785        break;
786    default:
787
788        assert(0);
789    };
790
791    node->m_port_id = stream->m_port_id;
792
793    /* set dir 0 or 1 client or server */
794    node->set_mbuf_cache_dir(dir);
795
796
797    if (node->m_ref_stream_info->getDpVm() == NULL) {
798        /* no VM */
799
800        node->m_vm_flow_var =  NULL;
801        node->m_vm_program  =  NULL;
802        node->m_vm_program_size =0;
803
804                /* allocate const mbuf */
805        rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size);
806        assert(m);
807
808        char *p = rte_pktmbuf_append(m, pkt_size);
809        assert(p);
810        /* copy the packet */
811        memcpy(p,stream_pkt,pkt_size);
812
813        update_mac_addr(stream,node,dir,p);
814
815        /* set the packet as a readonly */
816        node->set_cache_mbuf(m);
817
818        node->m_original_packet_data_prefix =0;
819    }else{
820
821        /* set the program */
822        TrexStream * local_mem_stream = node->m_ref_stream_info;
823
824        StreamVmDp  * lpDpVm = local_mem_stream->getDpVm();
825
826        node->m_vm_flow_var      = lpDpVm->clone_bss(); /* clone the flow var */
827        node->m_vm_program       = lpDpVm->get_program(); /* same ref to the program */
828        node->m_vm_program_size  = lpDpVm->get_program_size();
829
830
831        /* set the random seed if was set */
832        if ( lpDpVm->is_random_seed() ){
833            /* if we have random seed for this program */
834            if (stream->m_random_seed) {
835                node->set_random_seed(stream->m_random_seed);
836            }
837        }
838
839        /* we need to copy the object */
840        if ( pkt_size > lpDpVm->get_prefix_size() ) {
841            /* we need const packet */
842            uint16_t const_pkt_size  = pkt_size - lpDpVm->get_prefix_size() ;
843            rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), const_pkt_size );
844            assert(m);
845
846            char *p = rte_pktmbuf_append(m, const_pkt_size);
847            assert(p);
848
849            /* copy packet data */
850            memcpy(p,(stream_pkt + lpDpVm->get_prefix_size()),const_pkt_size);
851
852            node->set_const_mbuf(m);
853        }
854
855
856        if ( lpDpVm->is_pkt_size_var() ) {
857            // mark the node as varible size
858            node->set_var_pkt_size();
859        }
860
861
862        if (lpDpVm->get_prefix_size() > pkt_size ) {
863            lpDpVm->set_prefix_size(pkt_size);
864        }
865
866        /* copy the headr */
867        uint16_t header_size = lpDpVm->get_prefix_size();
868        assert(header_size);
869        node->alloc_prefix_header(header_size);
870        uint8_t *p=node->m_original_packet_data_prefix;
871        assert(p);
872
873        memcpy(p,stream_pkt , header_size);
874
875        update_mac_addr(stream,node,dir,(char *)p);
876
877        if (stream->m_cache_size > 0 ) {
878            /* we need to create cache of objects */
879            replay_vm_into_cache(stream, node);
880        }
881    }
882
883
884    CDpOneStream one_stream;
885
886    one_stream.m_dp_stream = node->m_ref_stream_info;
887    one_stream.m_node =node;
888
889    lp_port->m_active_nodes.push_back(one_stream);
890
891    /* schedule only if active */
892    if (node->m_state == CGenNodeStateless::ss_ACTIVE) {
893        m_core->m_node_gen.add_node((CGenNode *)node);
894    }
895}
896
897void
898TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj,
899                                   double duration,
900                                   int event_id) {
901
902
903    TrexStatelessDpPerPort * lp_port=get_port_db(obj->get_port_id());
904    lp_port->m_active_streams = 0;
905    lp_port->set_event_id(event_id);
906
907    /* update cur time */
908    if ( CGlobalInfo::is_realtime()  ){
909        m_core->m_cur_time_sec = now_sec() + SCHD_OFFSET_DTIME ;
910    }
911
912    /* no nodes in the list */
913    assert(lp_port->m_active_nodes.size()==0);
914
915    for (auto single_stream : obj->get_objects()) {
916        /* all commands should be for the same port */
917        assert(obj->get_port_id() == single_stream.m_stream->m_port_id);
918        add_stream(lp_port,single_stream.m_stream,obj);
919    }
920
921    uint32_t nodes = lp_port->m_active_nodes.size();
922    /* find next stream */
923    assert(nodes == obj->get_objects().size());
924
925    int cnt=0;
926
927    /* set the next_stream pointer  */
928    for (auto single_stream : obj->get_objects()) {
929
930        if (single_stream.m_stream->is_dp_next_stream() ) {
931            int stream_id = single_stream.m_stream->m_next_stream_id;
932            assert(stream_id<nodes);
933            /* point to the next stream , stream_id is fixed */
934            lp_port->m_active_nodes[cnt].m_node->m_next_stream = lp_port->m_active_nodes[stream_id].m_node ;
935        }
936        cnt++;
937    }
938
939    lp_port->m_state =TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
940    m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
941
942
943    if ( duration > 0.0 ){
944        add_port_duration( duration ,obj->get_port_id(),event_id );
945    }
946
947}
948
949
950bool TrexStatelessDpCore::are_all_ports_idle(){
951
952    bool res=true;
953    int i;
954    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
955        if ( m_ports[i].m_state != TrexStatelessDpPerPort::ppSTATE_IDLE ){
956            res=false;
957        }
958    }
959    return (res);
960}
961
962
963void
964TrexStatelessDpCore::resume_traffic(uint8_t port_id){
965
966    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
967
968    lp_port->resume_traffic(port_id);
969}
970
971
972void
973TrexStatelessDpCore::pause_traffic(uint8_t port_id){
974
975    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
976
977    lp_port->pause_traffic(port_id);
978}
979
980void
981TrexStatelessDpCore::update_traffic(uint8_t port_id, double factor) {
982
983    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
984
985    lp_port->update_traffic(port_id, factor);
986}
987
988
989void
990TrexStatelessDpCore::stop_traffic(uint8_t  port_id,
991                                  bool     stop_on_id,
992                                  int      event_id) {
993    /* we cannot remove nodes not from the top of the queue so
994       for every active node - make sure next time
995       the scheduler invokes it, it will be free */
996
997    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
998
999    if ( lp_port->stop_traffic(port_id,stop_on_id,event_id) == false){
1000        /* nothing to do ! already stopped */
1001        //printf(" skip .. %f\n",m_core->m_cur_time_sec);
1002        return;
1003    }
1004
1005    /* inform the control plane we stopped - this might be a async stop
1006       (streams ended)
1007    */
1008    #if 0
1009    if ( are_all_ports_idle() ) {
1010        /* just a place holder if we will need to do somthing in that case */
1011    }
1012    #endif
1013
1014    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
1015    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
1016                                                                   port_id,
1017                                                                   lp_port->get_event_id());
1018    ring->Enqueue((CGenNode *)event_msg);
1019
1020}
1021
1022/**
1023 * handle a message from CP to DP
1024 *
1025 */
1026void
1027TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) {
1028    msg->handle(this);
1029    delete msg;
1030}
1031
1032void
1033TrexStatelessDpCore::barrier(uint8_t port_id, int event_id) {
1034
1035    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
1036    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
1037                                                                   port_id,
1038                                                                   event_id);
1039    ring->Enqueue((CGenNode *)event_msg);
1040}
1041