trex_stateless_dp_core.cpp revision c2912dfc
1/*
2 Itay Marom
3 Hanoch Haim
4 Cisco Systems, Inc.
5*/
6
7/*
8Copyright (c) 2015-2016 Cisco Systems, Inc.
9
10Licensed under the Apache License, Version 2.0 (the "License");
11you may not use this file except in compliance with the License.
12You may obtain a copy of the License at
13
14    http://www.apache.org/licenses/LICENSE-2.0
15
16Unless required by applicable law or agreed to in writing, software
17distributed under the License is distributed on an "AS IS" BASIS,
18WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19See the License for the specific language governing permissions and
20limitations under the License.
21*/
22#include "bp_sim.h"
23#include "trex_stateless_dp_core.h"
24#include "trex_stateless_messaging.h"
25#include "trex_stream.h"
26#include "trex_stream_node.h"
27#include "trex_streams_compiler.h"
28
29
30
31
32void CGenNodeStateless::cache_mbuf_array_init(){
33    m_cache_size=0;
34    m_cache_array_cnt=0;
35}
36
37
38
39rte_mbuf_t ** CGenNodeStateless::cache_mbuf_array_alloc(uint16_t size){
40
41    uint32_t buf_size = CGenNodeCacheMbuf::get_object_size(size);
42    /* TBD  replace with align, zero API */
43    m_cache_mbuf = (void *)malloc(buf_size);
44    assert(m_cache_mbuf);
45    memset(m_cache_mbuf,0,buf_size);
46
47    m_flags |= SL_NODE_CONST_MBUF_CACHE_ARRAY;
48    m_cache_size=size;
49    m_cache_array_cnt=0;
50    return ((rte_mbuf_t **)m_cache_mbuf);
51}
52
53void CGenNodeStateless::cache_mbuf_array_free(){
54
55    assert(m_cache_mbuf);
56    int i;
57    for (i=0; i<(int)m_cache_size; i++) {
58        rte_mbuf_t * m=cache_mbuf_array_get((uint16_t)i);
59        assert(m);
60        rte_pktmbuf_free(m);
61    }
62
63    /* free the const */
64    rte_mbuf_t * m=cache_mbuf_array_get_const_mbuf() ;
65    if (m) {
66        rte_pktmbuf_free(m);
67    }
68
69    free(m_cache_mbuf);
70    m_cache_mbuf=0;
71}
72
73
74rte_mbuf_t * CGenNodeStateless::cache_mbuf_array_get(uint16_t index){
75
76    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
77    return (p->m_array[index]);
78}
79
80void CGenNodeStateless::cache_mbuf_array_set_const_mbuf(rte_mbuf_t * m){
81    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
82    p->m_mbuf_const=m;
83}
84
85rte_mbuf_t * CGenNodeStateless::cache_mbuf_array_get_const_mbuf(){
86    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
87    return (p->m_mbuf_const);
88}
89
90
91void CGenNodeStateless::cache_mbuf_array_set(uint16_t index,
92                                             rte_mbuf_t * m){
93    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
94    p->m_array[index]=m;
95}
96
97
98void CDpOneStream::Delete(CFlowGenListPerThread   * core){
99    assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE);
100    core->free_node((CGenNode *)m_node);
101    delete m_dp_stream;
102    m_node=0;
103    m_dp_stream=0;
104}
105
106void CDpOneStream::DeleteOnlyStream(){
107    assert(m_dp_stream);
108    delete m_dp_stream;
109    m_dp_stream=0;
110}
111
112int CGenNodeStateless::get_stream_id(){
113    if (m_state ==CGenNodeStateless::ss_FREE_RESUSE) {
114        return (-1); // not valid
115    }
116    assert(m_ref_stream_info);
117    return ((int)m_ref_stream_info->m_stream_id);
118}
119
120
121void CGenNodeStateless::DumpHeader(FILE *fd){
122    fprintf(fd," pkt_id, time, port , action , state, stream_id , stype , m-burst# , burst# \n");
123
124}
125void CGenNodeStateless::Dump(FILE *fd){
126    fprintf(fd," %2.4f, %3lu, %s,%s, %3d, %s, %3lu, %3lu  \n",
127            m_time,
128            (ulong)m_port_id,
129            "s-pkt", //action
130            get_stream_state_str(m_state ).c_str(),
131            get_stream_id(),   //stream_id
132            TrexStream::get_stream_type_str(m_stream_type).c_str(), //stype
133            (ulong)m_multi_bursts,
134            (ulong)m_single_burst
135            );
136}
137
138
139
140void CGenNodeStateless::refresh_vm_bss(){
141    if ( m_vm_flow_var ) {
142        StreamVmDp  * vm_s=m_ref_stream_info->m_vm_dp;
143        assert(vm_s);
144        memcpy(m_vm_flow_var,vm_s->get_bss(),vm_s->get_bss_size());
145
146        if ( vm_s->is_random_seed() ){
147            /* if we have random seed for this program */
148            if (m_ref_stream_info->m_random_seed) {
149                set_random_seed(m_ref_stream_info->m_random_seed);
150            }
151        }
152    }
153}
154
155
156/**
157 * this function called when stream restart after it was inactive
158 */
159void CGenNodeStateless::refresh(){
160
161    /* refill the stream info */
162    m_single_burst    = m_single_burst_refill;
163    m_multi_bursts    = m_ref_stream_info->m_num_bursts;
164    m_state           = CGenNodeStateless::ss_ACTIVE;
165
166    /* refresh init value */
167#if 0
168    /* TBD should add a JSON varible for that */
169    refresh_vm_bss();
170#endif
171}
172
173
174void CGenNodeCommand::free_command(){
175
176    assert(m_cmd);
177    m_cmd->on_node_remove();
178    delete m_cmd;
179}
180
181
182std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state){
183    std::string res;
184
185    switch (stream_state) {
186    case CGenNodeStateless::ss_FREE_RESUSE :
187         res="FREE    ";
188        break;
189    case CGenNodeStateless::ss_INACTIVE :
190        res="INACTIVE ";
191        break;
192    case CGenNodeStateless::ss_ACTIVE :
193        res="ACTIVE   ";
194        break;
195    default:
196        res="Unknow   ";
197    };
198    return(res);
199}
200
201
202rte_mbuf_t   * CGenNodeStateless::alloc_node_with_vm(){
203
204    rte_mbuf_t        * m;
205    /* alloc small packet buffer*/
206    uint16_t prefix_size = prefix_header_size();
207    m = CGlobalInfo::pktmbuf_alloc( get_socket_id(), prefix_size );
208    if (m==0) {
209        return (m);
210    }
211    /* TBD remove this, should handle cases of error */
212    assert(m);
213    char *p=rte_pktmbuf_append(m, prefix_size);
214    memcpy( p ,m_original_packet_data_prefix, prefix_size);
215
216
217    /* run the VM program */
218    StreamDPVmInstructionsRunner runner;
219
220    runner.run( (uint32_t*)m_vm_flow_var,
221                m_vm_program_size,
222                m_vm_program,
223                m_vm_flow_var,
224                (uint8_t*)p);
225
226    uint16_t pkt_new_size=runner.get_new_pkt_size();
227    if ( likely( pkt_new_size == 0) ) {
228        /* no packet size change */
229        rte_mbuf_t * m_const = get_const_mbuf();
230        if (  m_const != NULL) {
231            utl_rte_pktmbuf_add_after(m,m_const);
232        }
233        return (m);
234    }
235
236    /* packet size change there are a few changes */
237    rte_mbuf_t * m_const = get_const_mbuf();
238    if ( (m_const == 0 ) || (pkt_new_size<=prefix_size) ) {
239        /* one mbuf , just trim it */
240        m->data_len = pkt_new_size;
241        m->pkt_len  = pkt_new_size;
242        return (m);
243    }
244
245    rte_mbuf_t * mi= CGlobalInfo::pktmbuf_alloc_small(get_socket_id());
246    assert(mi);
247    rte_pktmbuf_attach(mi,m_const);
248    utl_rte_pktmbuf_add_after2(m,mi);
249
250    if ( pkt_new_size < m->pkt_len) {
251        /* need to trim it */
252        mi->data_len = (pkt_new_size - prefix_size);
253        m->pkt_len   = pkt_new_size;
254    }
255    return (m);
256}
257
258
259void CGenNodeStateless::free_stl_node(){
260
261    if ( is_cache_mbuf_array() ){
262        /* do we have cache of mbuf pre allocated */
263        cache_mbuf_array_free();
264    }else{
265        /* if we have cache mbuf free it */
266        rte_mbuf_t * m=get_cache_mbuf();
267        if (m) {
268                rte_pktmbuf_free(m);
269                m_cache_mbuf=0;
270        }else{
271            /* non cache - must have an header */
272             m=get_const_mbuf();
273             if (m) {
274                 rte_pktmbuf_free(m); /* reduce the ref counter */
275             }
276             free_prefix_header();
277        }
278    }
279    if (m_vm_flow_var) {
280        /* free flow var */
281        free(m_vm_flow_var);
282        m_vm_flow_var=0;
283    }
284}
285
286
287bool TrexStatelessDpPerPort::update_number_of_active_streams(uint32_t d){
288    m_active_streams-=d; /* reduce the number of streams */
289    if (m_active_streams == 0) {
290        return (true);
291    }
292    return (false);
293}
294
295bool TrexStatelessDpPerPort::resume_traffic(uint8_t port_id){
296
297    /* we are working with continues streams so we must be in transmit mode */
298    assert(m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE);
299
300    for (auto dp_stream : m_active_nodes) {
301        CGenNodeStateless * node =dp_stream.m_node;
302        assert(node->get_port_id() == port_id);
303        assert(node->is_pause() == true);
304        node->set_pause(false);
305    }
306    m_state = TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
307    return (true);
308}
309
310bool TrexStatelessDpPerPort::update_traffic(uint8_t port_id, double factor) {
311
312    assert( (m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING ||
313            (m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE)) );
314
315    for (auto dp_stream : m_active_nodes) {
316        CGenNodeStateless * node = dp_stream.m_node;
317        assert(node->get_port_id() == port_id);
318
319        node->update_rate(factor);
320    }
321
322    return (true);
323}
324
325bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){
326
327    /* we are working with continues streams so we must be in transmit mode */
328    assert(m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING);
329
330    for (auto dp_stream : m_active_nodes) {
331        CGenNodeStateless * node =dp_stream.m_node;
332        assert(node->get_port_id() == port_id);
333        assert(node->is_pause() == false);
334        node->set_pause(true);
335    }
336    m_state = TrexStatelessDpPerPort::ppSTATE_PAUSE;
337    return (true);
338}
339
340
341bool TrexStatelessDpPerPort::stop_traffic(uint8_t  port_id,
342                                          bool     stop_on_id,
343                                          int      event_id){
344
345
346    if (m_state == TrexStatelessDpPerPort::ppSTATE_IDLE) {
347        assert(m_active_streams==0);
348        return false;
349    }
350
351    /* there could be race of stop after stop */
352    if ( stop_on_id ) {
353        if (event_id != m_event_id){
354            /* we can't stop it is an old message */
355            return false;
356        }
357    }
358
359    for (auto dp_stream : m_active_nodes) {
360        CGenNodeStateless * node =dp_stream.m_node;
361        assert(node->get_port_id() == port_id);
362        if ( node->get_state() == CGenNodeStateless::ss_ACTIVE) {
363            node->mark_for_free();
364            m_active_streams--;
365            dp_stream.DeleteOnlyStream();
366
367        }else{
368            dp_stream.Delete(m_core);
369        }
370    }
371
372    /* active stream should be zero */
373    assert(m_active_streams==0);
374    m_active_nodes.clear();
375    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
376    return (true);
377}
378
379
380void TrexStatelessDpPerPort::create(CFlowGenListPerThread   *  core){
381    m_core=core;
382    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
383    m_port_id=0;
384    m_active_streams=0;
385    m_active_nodes.clear();
386}
387
388
389
390void
391TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) {
392    m_thread_id = thread_id;
393    m_core = core;
394    m_local_port_offset = 2*core->getDualPortId();
395
396    CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
397
398    m_ring_from_cp = cp_dp->getRingCpToDp(thread_id);
399    m_ring_to_cp   = cp_dp->getRingDpToCp(thread_id);
400
401    m_state = STATE_IDLE;
402
403    int i;
404    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
405        m_ports[i].create(core);
406    }
407}
408
409
410/* move to the next stream, old stream move to INACTIVE */
411bool TrexStatelessDpCore::set_stateless_next_node(CGenNodeStateless * cur_node,
412                                                  CGenNodeStateless * next_node){
413
414    assert(cur_node);
415    TrexStatelessDpPerPort * lp_port = get_port_db(cur_node->m_port_id);
416    bool schedule =false;
417
418    bool to_stop_port=false;
419
420    if (next_node == NULL) {
421        /* there is no next stream , reduce the number of active streams*/
422        to_stop_port = lp_port->update_number_of_active_streams(1);
423
424    }else{
425        uint8_t state=next_node->get_state();
426
427        /* can't be FREE_RESUSE */
428        assert(state != CGenNodeStateless::ss_FREE_RESUSE);
429        if (state == CGenNodeStateless::ss_INACTIVE ) {
430
431            if (cur_node->m_action_counter > 0) {
432                cur_node->m_action_counter--;
433                if (cur_node->m_action_counter==0) {
434                    to_stop_port = lp_port->update_number_of_active_streams(1);
435                }else{
436                    /* refill start info and scedule, no update in active streams  */
437                    next_node->refresh();
438                    schedule = true;
439                }
440            }else{
441                /* refill start info and scedule, no update in active streams  */
442                next_node->refresh();
443                schedule = true;
444            }
445
446        }else{
447            to_stop_port = lp_port->update_number_of_active_streams(1);
448        }
449    }
450
451    if ( to_stop_port ) {
452        /* call stop port explictly to move the state */
453        stop_traffic(cur_node->m_port_id,false,0);
454    }
455
456    return ( schedule );
457}
458
459
460
461/**
462 * in idle state loop, the processor most of the time sleeps
463 * and periodically checks for messages
464 *
465 * @author imarom (01-Nov-15)
466 */
467void
468TrexStatelessDpCore::idle_state_loop() {
469
470    const int SHORT_DELAY_MS    = 2;
471    const int LONG_DELAY_MS     = 50;
472    const int DEEP_SLEEP_LIMIT  = 2000;
473
474    int counter = 0;
475
476    while (m_state == STATE_IDLE) {
477        m_core->m_node_gen.m_v_if->flush_dp_rx_queue();
478        bool had_msg = periodic_check_for_cp_messages();
479        if (had_msg) {
480            counter = 0;
481            continue;
482        }
483
484        /* enter deep sleep only if enough time had passed */
485        if (counter < DEEP_SLEEP_LIMIT) {
486            delay(SHORT_DELAY_MS);
487            counter++;
488        } else {
489            delay(LONG_DELAY_MS);
490        }
491
492    }
493}
494
495
496
497void TrexStatelessDpCore::quit_main_loop(){
498    m_core->set_terminate_mode(true); /* mark it as terminated */
499    m_state = STATE_TERMINATE;
500    add_global_duration(0.0001);
501}
502
503
504/**
505 * scehduler runs when traffic exists
506 * it will return when no more transmitting is done on this
507 * core
508 *
509 * @author imarom (01-Nov-15)
510 */
511void
512TrexStatelessDpCore::start_scheduler() {
513    /* creates a maintenace job using the scheduler */
514    CGenNode * node_sync = m_core->create_node() ;
515    node_sync->m_type = CGenNode::FLOW_SYNC;
516    node_sync->m_time = m_core->m_cur_time_sec + SYNC_TIME_OUT;
517    m_core->m_node_gen.add_node(node_sync);
518
519    double old_offset = 0.0;
520    m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset);
521    /* bail out in case of terminate */
522    if (m_state != TrexStatelessDpCore::STATE_TERMINATE) {
523        m_core->m_node_gen.close_file(m_core);
524        m_state = STATE_IDLE; /* we exit from all ports and we have nothing to do, we move to IDLE state */
525    }
526}
527
528
529void
530TrexStatelessDpCore::run_once(){
531
532    idle_state_loop();
533
534    if ( m_state == STATE_TERMINATE ){
535        return;
536    }
537
538    start_scheduler();
539}
540
541
542
543
544void
545TrexStatelessDpCore::start() {
546
547    while (true) {
548        run_once();
549
550        if ( m_core->is_terminated_by_master() ) {
551            break;
552        }
553    }
554}
555
556/* only if both port are idle we can exit */
557void
558TrexStatelessDpCore::schedule_exit(){
559
560    CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
561
562    node->m_type = CGenNode::COMMAND;
563
564    node->m_cmd = new TrexStatelessDpCanQuit();
565
566    /* make sure it will be scheduled after the current node */
567    node->m_time = m_core->m_cur_time_sec ;
568
569    m_core->m_node_gen.add_node((CGenNode *)node);
570}
571
572
573void
574TrexStatelessDpCore::add_global_duration(double duration){
575    if (duration > 0.0) {
576        CGenNode *node = m_core->create_node() ;
577
578        node->m_type = CGenNode::EXIT_SCHED;
579
580        /* make sure it will be scheduled after the current node */
581        node->m_time = m_core->m_cur_time_sec + duration ;
582
583        m_core->m_node_gen.add_node(node);
584    }
585}
586
587/* add per port exit */
588void
589TrexStatelessDpCore::add_port_duration(double duration,
590                                       uint8_t port_id,
591                                       int event_id){
592    if (duration > 0.0) {
593        CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
594
595        node->m_type = CGenNode::COMMAND;
596
597        /* make sure it will be scheduled after the current node */
598        node->m_time = m_core->m_cur_time_sec + duration ;
599
600        TrexStatelessDpStop * cmd=new TrexStatelessDpStop(port_id);
601
602
603        /* test this */
604        m_core->m_non_active_nodes++;
605        cmd->set_core_ptr(m_core);
606        cmd->set_event_id(event_id);
607        cmd->set_wait_for_event_id(true);
608
609        node->m_cmd = cmd;
610
611        m_core->m_node_gen.add_node((CGenNode *)node);
612    }
613}
614
615
616void TrexStatelessDpCore::update_mac_addr(TrexStream * stream,
617                                          CGenNodeStateless *node,
618                                          pkt_dir_t dir,
619                                          char *raw_pkt){
620    bool              ov_src = stream->get_override_src_mac_by_pkt_data();
621    TrexStream::stream_dst_mac_t  ov_dst = stream->get_override_dst_mac_mode();
622
623
624    if ( (ov_src == true) && (ov_dst == TrexStream::stPKT) ) {
625        /* nothing to do, take from the packet both */
626        return;
627    }
628
629        /* take from cfg_file */
630    if ( (ov_src == false) &&
631         (ov_dst == TrexStream::stCFG_FILE) ){
632
633          m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*)raw_pkt);
634          return;
635    }
636
637    /* save the pkt*/
638    char tmp_pkt[12];
639    memcpy(tmp_pkt,raw_pkt,12);
640
641    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*)raw_pkt);
642
643    if ((ov_src == true) && (ov_dst == TrexStream::stCFG_FILE)) {
644        memcpy(raw_pkt+6,tmp_pkt+6,6);
645    }
646
647    if ((ov_src == false) && (ov_dst == TrexStream::stPKT)) {
648        memcpy(raw_pkt,tmp_pkt,6);
649    }
650}
651
652
653void
654TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port,
655                                TrexStream * stream,
656                                TrexStreamsCompiledObj *comp) {
657    CGenNodeStateless *node = m_core->create_node_sl();
658
659    /* add periodic */
660    node->m_cache_mbuf=0;
661    node->m_type = CGenNode::STATELESS_PKT;
662
663    node->m_action_counter = stream->m_action_count;
664
665    /* clone the stream from control plane memory to DP memory */
666    node->m_ref_stream_info = stream->clone();
667    /* no need for this memory anymore on the control plane memory */
668    stream->release_dp_object();
669
670    node->m_next_stream=0; /* will be fixed later */
671
672    if ( stream->m_self_start ){
673        /* if self start it is in active mode */
674        node->m_state =CGenNodeStateless::ss_ACTIVE;
675        lp_port->m_active_streams++;
676    }else{
677        node->m_state =CGenNodeStateless::ss_INACTIVE;
678    }
679
680    node->m_time = m_core->m_cur_time_sec + stream->get_start_delay_sec();
681
682    pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(stream->m_port_id);
683    node->m_flags = 0;
684    node->m_src_port =0;
685    node->m_original_packet_data_prefix = 0;
686
687    if (stream->m_rx_check.m_enabled) {
688        node->set_stat_needed();
689        uint8_t hw_id = stream->m_rx_check.m_hw_id;
690        assert (hw_id < MAX_FLOW_STATS);
691        node->set_stat_hw_id(hw_id);
692    }
693
694    /* set socket id */
695    node->set_socket_id(m_core->m_node_gen.m_socket_id);
696
697    /* build a mbuf from a packet */
698
699    uint16_t pkt_size = stream->m_pkt.len;
700    const uint8_t *stream_pkt = stream->m_pkt.binary;
701
702    node->m_pause =0;
703    node->m_stream_type = stream->m_type;
704    node->m_next_time_offset = 1.0 / stream->get_pps();
705    node->m_null_stream = (stream->m_null_stream ? 1 : 0);
706
707    /* stateless specific fields */
708    switch ( stream->m_type ) {
709
710    case TrexStream::stCONTINUOUS :
711        node->m_single_burst=0;
712        node->m_single_burst_refill=0;
713        node->m_multi_bursts=0;
714        break;
715
716    case TrexStream::stSINGLE_BURST :
717        node->m_stream_type             = TrexStream::stMULTI_BURST;
718        node->m_single_burst            = stream->m_burst_total_pkts;
719        node->m_single_burst_refill     = stream->m_burst_total_pkts;
720        node->m_multi_bursts            = 1;  /* single burst in multi burst of 1 */
721        break;
722
723    case TrexStream::stMULTI_BURST :
724        node->m_single_burst        = stream->m_burst_total_pkts;
725        node->m_single_burst_refill = stream->m_burst_total_pkts;
726        node->m_multi_bursts        = stream->m_num_bursts;
727        break;
728    default:
729
730        assert(0);
731    };
732
733    node->m_port_id = stream->m_port_id;
734
735    /* set dir 0 or 1 client or server */
736    node->set_mbuf_cache_dir(dir);
737
738
739    if (node->m_ref_stream_info->getDpVm() == NULL) {
740        /* no VM */
741
742        node->m_vm_flow_var =  NULL;
743        node->m_vm_program  =  NULL;
744        node->m_vm_program_size =0;
745
746                /* allocate const mbuf */
747        rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size);
748        assert(m);
749
750        char *p = rte_pktmbuf_append(m, pkt_size);
751        assert(p);
752        /* copy the packet */
753        memcpy(p,stream_pkt,pkt_size);
754
755        update_mac_addr(stream,node,dir,p);
756
757        /* set the packet as a readonly */
758        node->set_cache_mbuf(m);
759
760        node->m_original_packet_data_prefix =0;
761    }else{
762
763        /* set the program */
764        TrexStream * local_mem_stream = node->m_ref_stream_info;
765
766        StreamVmDp  * lpDpVm = local_mem_stream->getDpVm();
767
768        node->m_vm_flow_var      = lpDpVm->clone_bss(); /* clone the flow var */
769        node->m_vm_program       = lpDpVm->get_program(); /* same ref to the program */
770        node->m_vm_program_size  = lpDpVm->get_program_size();
771
772
773        /* set the random seed if was set */
774        if ( lpDpVm->is_random_seed() ){
775            /* if we have random seed for this program */
776            if (stream->m_random_seed) {
777                node->set_random_seed(stream->m_random_seed);
778            }
779        }
780
781        /* we need to copy the object */
782        if ( pkt_size > lpDpVm->get_prefix_size() ) {
783            /* we need const packet */
784            uint16_t const_pkt_size  = pkt_size - lpDpVm->get_prefix_size() ;
785            rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), const_pkt_size );
786            assert(m);
787
788            char *p = rte_pktmbuf_append(m, const_pkt_size);
789            assert(p);
790
791            /* copy packet data */
792            memcpy(p,(stream_pkt + lpDpVm->get_prefix_size()),const_pkt_size);
793
794            node->set_const_mbuf(m);
795        }
796
797
798        if ( lpDpVm->is_pkt_size_var() ) {
799            // mark the node as varible size
800            node->set_var_pkt_size();
801        }
802
803
804        if (lpDpVm->get_prefix_size() > pkt_size ) {
805            lpDpVm->set_prefix_size(pkt_size);
806        }
807
808        /* copy the headr */
809        uint16_t header_size = lpDpVm->get_prefix_size();
810        assert(header_size);
811        node->alloc_prefix_header(header_size);
812        uint8_t *p=node->m_original_packet_data_prefix;
813        assert(p);
814
815        memcpy(p,stream_pkt , header_size);
816
817        update_mac_addr(stream,node,dir,(char *)p);
818    }
819
820
821    CDpOneStream one_stream;
822
823    one_stream.m_dp_stream = node->m_ref_stream_info;
824    one_stream.m_node =node;
825
826    lp_port->m_active_nodes.push_back(one_stream);
827
828    /* schedule only if active */
829    if (node->m_state == CGenNodeStateless::ss_ACTIVE) {
830        m_core->m_node_gen.add_node((CGenNode *)node);
831    }
832}
833
834void
835TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj,
836                                   double duration,
837                                   int event_id) {
838
839
840    TrexStatelessDpPerPort * lp_port=get_port_db(obj->get_port_id());
841    lp_port->m_active_streams = 0;
842    lp_port->set_event_id(event_id);
843
844    /* no nodes in the list */
845    assert(lp_port->m_active_nodes.size()==0);
846
847    for (auto single_stream : obj->get_objects()) {
848        /* all commands should be for the same port */
849        assert(obj->get_port_id() == single_stream.m_stream->m_port_id);
850        add_stream(lp_port,single_stream.m_stream,obj);
851    }
852
853    uint32_t nodes = lp_port->m_active_nodes.size();
854    /* find next stream */
855    assert(nodes == obj->get_objects().size());
856
857    int cnt=0;
858
859    /* set the next_stream pointer  */
860    for (auto single_stream : obj->get_objects()) {
861
862        if (single_stream.m_stream->is_dp_next_stream() ) {
863            int stream_id = single_stream.m_stream->m_next_stream_id;
864            assert(stream_id<nodes);
865            /* point to the next stream , stream_id is fixed */
866            lp_port->m_active_nodes[cnt].m_node->m_next_stream = lp_port->m_active_nodes[stream_id].m_node ;
867        }
868        cnt++;
869    }
870
871    lp_port->m_state =TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
872    m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
873
874
875    if ( duration > 0.0 ){
876        add_port_duration( duration ,obj->get_port_id(),event_id );
877    }
878
879}
880
881
882bool TrexStatelessDpCore::are_all_ports_idle(){
883
884    bool res=true;
885    int i;
886    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
887        if ( m_ports[i].m_state != TrexStatelessDpPerPort::ppSTATE_IDLE ){
888            res=false;
889        }
890    }
891    return (res);
892}
893
894
895void
896TrexStatelessDpCore::resume_traffic(uint8_t port_id){
897
898    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
899
900    lp_port->resume_traffic(port_id);
901}
902
903
904void
905TrexStatelessDpCore::pause_traffic(uint8_t port_id){
906
907    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
908
909    lp_port->pause_traffic(port_id);
910}
911
912void
913TrexStatelessDpCore::update_traffic(uint8_t port_id, double factor) {
914
915    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
916
917    lp_port->update_traffic(port_id, factor);
918}
919
920
921void
922TrexStatelessDpCore::stop_traffic(uint8_t  port_id,
923                                  bool     stop_on_id,
924                                  int      event_id) {
925    /* we cannot remove nodes not from the top of the queue so
926       for every active node - make sure next time
927       the scheduler invokes it, it will be free */
928
929    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
930
931    if ( lp_port->stop_traffic(port_id,stop_on_id,event_id) == false){
932        /* nothing to do ! already stopped */
933        //printf(" skip .. %f\n",m_core->m_cur_time_sec);
934        return;
935    }
936
937    /* inform the control plane we stopped - this might be a async stop
938       (streams ended)
939    */
940    #if 0
941    if ( are_all_ports_idle() ) {
942        /* just a place holder if we will need to do somthing in that case */
943    }
944    #endif
945
946    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
947    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
948                                                                   port_id,
949                                                                   lp_port->get_event_id());
950    ring->Enqueue((CGenNode *)event_msg);
951
952}
953
954/**
955 * handle a message from CP to DP
956 *
957 */
958void
959TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) {
960    msg->handle(this);
961    delete msg;
962}
963
964void
965TrexStatelessDpCore::barrier(uint8_t port_id, int event_id) {
966
967    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
968    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
969                                                                   port_id,
970                                                                   event_id);
971    ring->Enqueue((CGenNode *)event_msg);
972}
973