trex_stateless_dp_core.cpp revision a53f6be0
1/*
2 Itay Marom
3 Hanoch Haim
4 Cisco Systems, Inc.
5*/
6
7/*
8Copyright (c) 2015-2016 Cisco Systems, Inc.
9
10Licensed under the Apache License, Version 2.0 (the "License");
11you may not use this file except in compliance with the License.
12You may obtain a copy of the License at
13
14    http://www.apache.org/licenses/LICENSE-2.0
15
16Unless required by applicable law or agreed to in writing, software
17distributed under the License is distributed on an "AS IS" BASIS,
18WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19See the License for the specific language governing permissions and
20limitations under the License.
21*/
22#include "bp_sim.h"
23#include "trex_stateless_dp_core.h"
24#include "trex_stateless_messaging.h"
25#include "trex_stream.h"
26#include "trex_stream_node.h"
27#include "trex_streams_compiler.h"
28#include "mbuf.h"
29
30
31
32
33void CGenNodeStateless::cache_mbuf_array_init(){
34    m_cache_size=0;
35    m_cache_array_cnt=0;
36}
37
38
39
40void CGenNodeStateless::cache_mbuf_array_copy(CGenNodeCacheMbuf *obj,
41                                              uint16_t size){
42
43    int i;
44    cache_mbuf_array_alloc(size);
45    for (i=0; i<size; i++) {
46        cache_mbuf_array_set(i,obj->m_array[i]);
47    }
48    cache_mbuf_array_set_const_mbuf(obj->m_mbuf_const);
49}
50
51
52rte_mbuf_t ** CGenNodeStateless::cache_mbuf_array_alloc(uint16_t size){
53
54    uint32_t buf_size = CGenNodeCacheMbuf::get_object_size(size);
55    /* TBD  replace with align, zero API */
56    m_cache_mbuf = (void *)malloc(buf_size);
57    assert(m_cache_mbuf);
58    memset(m_cache_mbuf,0,buf_size);
59
60    m_flags |= SL_NODE_CONST_MBUF_CACHE_ARRAY;
61    m_cache_size=size;
62    m_cache_array_cnt=0;
63    return ((rte_mbuf_t **)m_cache_mbuf);
64}
65
66void CGenNodeStateless::cache_mbuf_array_free(){
67
68    assert(m_cache_mbuf);
69    int i;
70    for (i=0; i<(int)m_cache_size; i++) {
71        rte_mbuf_t * m=cache_mbuf_array_get((uint16_t)i);
72        assert(m);
73        rte_pktmbuf_free(m);
74    }
75
76    /* free the const */
77    rte_mbuf_t * m=cache_mbuf_array_get_const_mbuf() ;
78    if (m) {
79        rte_pktmbuf_free(m);
80    }
81
82    free(m_cache_mbuf);
83    m_cache_mbuf=0;
84}
85
86
87rte_mbuf_t * CGenNodeStateless::cache_mbuf_array_get(uint16_t index){
88
89    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
90    return (p->m_array[index]);
91}
92
93void CGenNodeStateless::cache_mbuf_array_set_const_mbuf(rte_mbuf_t * m){
94    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
95    p->m_mbuf_const=m;
96}
97
98rte_mbuf_t * CGenNodeStateless::cache_mbuf_array_get_const_mbuf(){
99    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
100    return (p->m_mbuf_const);
101}
102
103
104void CGenNodeStateless::cache_mbuf_array_set(uint16_t index,
105                                             rte_mbuf_t * m){
106    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
107    p->m_array[index]=m;
108}
109
110
111void CDpOneStream::Delete(CFlowGenListPerThread   * core){
112    assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE);
113    core->free_node((CGenNode *)m_node);
114    delete m_dp_stream;
115    m_node=0;
116    m_dp_stream=0;
117}
118
119void CDpOneStream::DeleteOnlyStream(){
120    assert(m_dp_stream);
121    delete m_dp_stream;
122    m_dp_stream=0;
123}
124
125int CGenNodeStateless::get_stream_id(){
126    if (m_state ==CGenNodeStateless::ss_FREE_RESUSE) {
127        return (-1); // not valid
128    }
129    assert(m_ref_stream_info);
130    return ((int)m_ref_stream_info->m_stream_id);
131}
132
133
134void CGenNodeStateless::DumpHeader(FILE *fd){
135    fprintf(fd," pkt_id, time, port , action , state, stream_id , stype , m-burst# , burst# \n");
136
137}
138void CGenNodeStateless::Dump(FILE *fd){
139    fprintf(fd," %2.4f, %3lu, %s,%s, %3d, %s, %3lu, %3lu  \n",
140            m_time,
141            (ulong)m_port_id,
142            "s-pkt", //action
143            get_stream_state_str(m_state ).c_str(),
144            get_stream_id(),   //stream_id
145            TrexStream::get_stream_type_str(m_stream_type).c_str(), //stype
146            (ulong)m_multi_bursts,
147            (ulong)m_single_burst
148            );
149}
150
151
152
153void CGenNodeStateless::refresh_vm_bss(){
154    if ( m_vm_flow_var ) {
155        StreamVmDp  * vm_s=m_ref_stream_info->m_vm_dp;
156        assert(vm_s);
157        memcpy(m_vm_flow_var,vm_s->get_bss(),vm_s->get_bss_size());
158
159        if ( vm_s->is_random_seed() ){
160            /* if we have random seed for this program */
161            if (m_ref_stream_info->m_random_seed) {
162                set_random_seed(m_ref_stream_info->m_random_seed);
163            }
164        }
165    }
166}
167
168
169/**
170 * this function called when stream restart after it was inactive
171 */
172void CGenNodeStateless::refresh(){
173
174    /* refill the stream info */
175    m_single_burst    = m_single_burst_refill;
176    m_multi_bursts    = m_ref_stream_info->m_num_bursts;
177    m_state           = CGenNodeStateless::ss_ACTIVE;
178
179    /* refresh init value */
180#if 0
181    /* TBD should add a JSON varible for that */
182    refresh_vm_bss();
183#endif
184}
185
186
187void CGenNodeCommand::free_command(){
188
189    assert(m_cmd);
190    m_cmd->on_node_remove();
191    delete m_cmd;
192}
193
194
195std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state){
196    std::string res;
197
198    switch (stream_state) {
199    case CGenNodeStateless::ss_FREE_RESUSE :
200         res="FREE    ";
201        break;
202    case CGenNodeStateless::ss_INACTIVE :
203        res="INACTIVE ";
204        break;
205    case CGenNodeStateless::ss_ACTIVE :
206        res="ACTIVE   ";
207        break;
208    default:
209        res="Unknow   ";
210    };
211    return(res);
212}
213
214rte_mbuf_t * CGenNodeStateless::alloc_flow_stat_mbuf(rte_mbuf_t *m) {
215    //?????????
216    // temp implementation. Just copy the entire mbuf
217    rte_mbuf_t *m_new = CGlobalInfo::pktmbuf_alloc( get_socket_id(), m->data_len );
218    /* TBD remove this, should handle cases of error */
219    assert(m_new);
220    char *p = rte_pktmbuf_mtod(m, char*);
221    char *p_new = rte_pktmbuf_append(m_new, m->data_len);
222    memcpy(p_new , p, m->data_len);
223
224    return m_new;
225}
226
227rte_mbuf_t   * CGenNodeStateless::alloc_node_with_vm(){
228
229    rte_mbuf_t        * m;
230    /* alloc small packet buffer*/
231    uint16_t prefix_size = prefix_header_size();
232    m = CGlobalInfo::pktmbuf_alloc( get_socket_id(), prefix_size );
233    if (m==0) {
234        return (m);
235    }
236    /* TBD remove this, should handle cases of error */
237    assert(m);
238    char *p=rte_pktmbuf_append(m, prefix_size);
239    memcpy( p ,m_original_packet_data_prefix, prefix_size);
240
241
242    /* run the VM program */
243    StreamDPVmInstructionsRunner runner;
244
245    runner.run( (uint32_t*)m_vm_flow_var,
246                m_vm_program_size,
247                m_vm_program,
248                m_vm_flow_var,
249                (uint8_t*)p);
250
251    uint16_t pkt_new_size=runner.get_new_pkt_size();
252    if ( likely( pkt_new_size == 0) ) {
253        /* no packet size change */
254        rte_mbuf_t * m_const = get_const_mbuf();
255        if (  m_const != NULL) {
256            utl_rte_pktmbuf_add_after(m,m_const);
257        }
258        return (m);
259    }
260
261    /* packet size change there are a few changes */
262    rte_mbuf_t * m_const = get_const_mbuf();
263    if ( (m_const == 0 ) || (pkt_new_size<=prefix_size) ) {
264        /* one mbuf , just trim it */
265        m->data_len = pkt_new_size;
266        m->pkt_len  = pkt_new_size;
267        return (m);
268    }
269
270    rte_mbuf_t * mi= CGlobalInfo::pktmbuf_alloc_small(get_socket_id());
271    assert(mi);
272    rte_pktmbuf_attach(mi,m_const);
273    utl_rte_pktmbuf_add_after2(m,mi);
274
275    if ( pkt_new_size < m->pkt_len) {
276        /* need to trim it */
277        mi->data_len = (pkt_new_size - prefix_size);
278        m->pkt_len   = pkt_new_size;
279    }
280    return (m);
281}
282
283void CGenNodeStateless::free_stl_vm_buf(){
284        rte_mbuf_t * m ;
285         m=get_const_mbuf();
286         if (m) {
287             rte_pktmbuf_free(m); /* reduce the ref counter */
288             /* clear the const marker */
289             clear_const_mbuf();
290         }
291
292         free_prefix_header();
293
294         if (m_vm_flow_var) {
295             /* free flow var */
296             free(m_vm_flow_var);
297             m_vm_flow_var=0;
298         }
299}
300
301
302
303void CGenNodeStateless::free_stl_node(){
304
305    if ( is_cache_mbuf_array() ){
306        /* do we have cache of mbuf pre allocated */
307        cache_mbuf_array_free();
308    }else{
309        /* if we have cache mbuf free it */
310        rte_mbuf_t * m=get_cache_mbuf();
311        if (m) {
312                rte_pktmbuf_free(m);
313                m_cache_mbuf=0;
314        }
315    }
316    free_stl_vm_buf();
317}
318
319
320bool TrexStatelessDpPerPort::update_number_of_active_streams(uint32_t d){
321    m_active_streams-=d; /* reduce the number of streams */
322    if (m_active_streams == 0) {
323        return (true);
324    }
325    return (false);
326}
327
328bool TrexStatelessDpPerPort::resume_traffic(uint8_t port_id){
329
330    /* we are working with continues streams so we must be in transmit mode */
331    assert(m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE);
332
333    for (auto dp_stream : m_active_nodes) {
334        CGenNodeStateless * node =dp_stream.m_node;
335        assert(node->get_port_id() == port_id);
336        assert(node->is_pause() == true);
337        node->set_pause(false);
338    }
339    m_state = TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
340    return (true);
341}
342
343bool TrexStatelessDpPerPort::update_traffic(uint8_t port_id, double factor) {
344
345    assert( (m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING ||
346            (m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE)) );
347
348    for (auto dp_stream : m_active_nodes) {
349        CGenNodeStateless * node = dp_stream.m_node;
350        assert(node->get_port_id() == port_id);
351
352        node->update_rate(factor);
353    }
354
355    return (true);
356}
357
358bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){
359
360    /* we are working with continues streams so we must be in transmit mode */
361    assert(m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING);
362
363    for (auto dp_stream : m_active_nodes) {
364        CGenNodeStateless * node =dp_stream.m_node;
365        assert(node->get_port_id() == port_id);
366        assert(node->is_pause() == false);
367        node->set_pause(true);
368    }
369    m_state = TrexStatelessDpPerPort::ppSTATE_PAUSE;
370    return (true);
371}
372
373bool TrexStatelessDpPerPort::push_pcap(uint8_t port_id,
374                                       const std::string &pcap_filename,
375                                       double ipg_usec,
376                                       double speedup,
377                                       uint32_t count) {
378
379    /* push pcap can only happen on an idle port from the core prespective */
380    assert(m_state == TrexStatelessDpPerPort::ppSTATE_IDLE);
381
382    CGenNodePCAP *pcap_node = m_core->allocate_pcap_node();
383    if (!pcap_node) {
384        return (false);
385    }
386
387    pkt_dir_t dir          = m_core->m_node_gen.m_v_if->port_id_to_dir(port_id);
388    socket_id_t socket_id  = m_core->m_node_gen.m_socket_id;
389
390    uint8_t mac_addr[12];
391    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, mac_addr);
392
393    bool rc = pcap_node->create(port_id,
394                                dir,
395                                socket_id,
396                                mac_addr,
397                                pcap_filename,
398                                ipg_usec,
399                                speedup,
400                                count);
401    if (!rc) {
402        m_core->free_node((CGenNode *)pcap_node);
403        return (false);
404    }
405
406    /* schedule the node for now */
407    pcap_node->m_time = m_core->m_cur_time_sec;
408    m_core->m_node_gen.add_node((CGenNode *)pcap_node);
409
410    /* hold a pointer to the node */
411    assert(m_active_pcap_node == NULL);
412    m_active_pcap_node = pcap_node;
413
414    m_state = TrexStatelessDpPerPort::ppSTATE_PCAP_TX;
415    return (true);
416}
417
418
419bool TrexStatelessDpPerPort::stop_traffic(uint8_t  port_id,
420                                          bool     stop_on_id,
421                                          int      event_id){
422
423
424    if (m_state == TrexStatelessDpPerPort::ppSTATE_IDLE) {
425        assert(m_active_streams==0);
426        return false;
427    }
428
429    /* there could be race of stop after stop */
430    if ( stop_on_id ) {
431        if (event_id != m_event_id){
432            /* we can't stop it is an old message */
433            return false;
434        }
435    }
436
437    for (auto dp_stream : m_active_nodes) {
438        CGenNodeStateless * node =dp_stream.m_node;
439        assert(node->get_port_id() == port_id);
440        if ( node->get_state() == CGenNodeStateless::ss_ACTIVE) {
441            node->mark_for_free();
442            m_active_streams--;
443            dp_stream.DeleteOnlyStream();
444
445        }else{
446            dp_stream.Delete(m_core);
447        }
448    }
449
450    /* check for active PCAP node */
451    if (m_active_pcap_node) {
452        /* when got async stop from outside or duration */
453        if (m_active_pcap_node->is_active()) {
454            m_active_pcap_node->mark_for_free();
455        } else {
456            /* graceful stop - node was put out by the scheduler */
457            m_core->free_node( (CGenNode *)m_active_pcap_node);
458        }
459
460        m_active_pcap_node = NULL;
461    }
462
463    /* active stream should be zero */
464    assert(m_active_streams==0);
465    m_active_nodes.clear();
466    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
467    return (true);
468}
469
470
471void TrexStatelessDpPerPort::create(CFlowGenListPerThread   *  core){
472    m_core=core;
473    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
474    m_active_streams=0;
475    m_active_nodes.clear();
476    m_active_pcap_node = NULL;
477}
478
479
480
481void
482TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) {
483    m_thread_id = thread_id;
484    m_core = core;
485    m_local_port_offset = 2*core->getDualPortId();
486
487    CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
488
489    m_ring_from_cp = cp_dp->getRingCpToDp(thread_id);
490    m_ring_to_cp   = cp_dp->getRingDpToCp(thread_id);
491
492    m_state = STATE_IDLE;
493
494    int i;
495    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
496        m_ports[i].create(core);
497    }
498}
499
500
501/* move to the next stream, old stream move to INACTIVE */
502bool TrexStatelessDpCore::set_stateless_next_node(CGenNodeStateless * cur_node,
503                                                  CGenNodeStateless * next_node){
504
505    assert(cur_node);
506    TrexStatelessDpPerPort * lp_port = get_port_db(cur_node->m_port_id);
507    bool schedule =false;
508
509    bool to_stop_port=false;
510
511    if (next_node == NULL) {
512        /* there is no next stream , reduce the number of active streams*/
513        to_stop_port = lp_port->update_number_of_active_streams(1);
514
515    }else{
516        uint8_t state=next_node->get_state();
517
518        /* can't be FREE_RESUSE */
519        assert(state != CGenNodeStateless::ss_FREE_RESUSE);
520        if (state == CGenNodeStateless::ss_INACTIVE ) {
521
522            if (cur_node->m_action_counter > 0) {
523                cur_node->m_action_counter--;
524                if (cur_node->m_action_counter==0) {
525                    to_stop_port = lp_port->update_number_of_active_streams(1);
526                }else{
527                    /* refill start info and scedule, no update in active streams  */
528                    next_node->refresh();
529                    schedule = true;
530                }
531            }else{
532                /* refill start info and scedule, no update in active streams  */
533                next_node->refresh();
534                schedule = true;
535            }
536
537        }else{
538            to_stop_port = lp_port->update_number_of_active_streams(1);
539        }
540    }
541
542    if ( to_stop_port ) {
543        /* call stop port explictly to move the state */
544        stop_traffic(cur_node->m_port_id,false,0);
545    }
546
547    return ( schedule );
548}
549
550
551
552/**
553 * in idle state loop, the processor most of the time sleeps
554 * and periodically checks for messages
555 *
556 * @author imarom (01-Nov-15)
557 */
558void
559TrexStatelessDpCore::idle_state_loop() {
560
561    const int SHORT_DELAY_MS    = 2;
562    const int LONG_DELAY_MS     = 50;
563    const int DEEP_SLEEP_LIMIT  = 2000;
564
565    int counter = 0;
566
567    while (m_state == STATE_IDLE) {
568        m_core->m_node_gen.m_v_if->flush_dp_rx_queue();
569        bool had_msg = periodic_check_for_cp_messages();
570        if (had_msg) {
571            counter = 0;
572            continue;
573        }
574
575        /* enter deep sleep only if enough time had passed */
576        if (counter < DEEP_SLEEP_LIMIT) {
577            delay(SHORT_DELAY_MS);
578            counter++;
579        } else {
580            delay(LONG_DELAY_MS);
581        }
582
583    }
584}
585
586
587
588void TrexStatelessDpCore::quit_main_loop(){
589    m_core->set_terminate_mode(true); /* mark it as terminated */
590    m_state = STATE_TERMINATE;
591    add_global_duration(0.0001);
592}
593
594
595/**
596 * scehduler runs when traffic exists
597 * it will return when no more transmitting is done on this
598 * core
599 *
600 * @author imarom (01-Nov-15)
601 */
602void
603TrexStatelessDpCore::start_scheduler() {
604    /* creates a maintenace job using the scheduler */
605    CGenNode * node_sync = m_core->create_node() ;
606    node_sync->m_type = CGenNode::FLOW_SYNC;
607    node_sync->m_time = m_core->m_cur_time_sec + SYNC_TIME_OUT;
608    m_core->m_node_gen.add_node(node_sync);
609
610    double old_offset = 0.0;
611    m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset);
612    /* bail out in case of terminate */
613    if (m_state != TrexStatelessDpCore::STATE_TERMINATE) {
614        m_core->m_node_gen.close_file(m_core);
615        m_state = STATE_IDLE; /* we exit from all ports and we have nothing to do, we move to IDLE state */
616    }
617}
618
619
620void
621TrexStatelessDpCore::run_once(){
622
623    idle_state_loop();
624
625    if ( m_state == STATE_TERMINATE ){
626        return;
627    }
628
629    start_scheduler();
630}
631
632
633
634
635void
636TrexStatelessDpCore::start() {
637
638    while (true) {
639        run_once();
640
641        if ( m_core->is_terminated_by_master() ) {
642            break;
643        }
644    }
645}
646
647/* only if both port are idle we can exit */
648void
649TrexStatelessDpCore::schedule_exit(){
650
651    CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
652
653    node->m_type = CGenNode::COMMAND;
654
655    node->m_cmd = new TrexStatelessDpCanQuit();
656
657    /* make sure it will be scheduled after the current node */
658    node->m_time = m_core->m_cur_time_sec ;
659
660    m_core->m_node_gen.add_node((CGenNode *)node);
661}
662
663
664void
665TrexStatelessDpCore::add_global_duration(double duration){
666    if (duration > 0.0) {
667        CGenNode *node = m_core->create_node() ;
668
669        node->m_type = CGenNode::EXIT_SCHED;
670
671        /* make sure it will be scheduled after the current node */
672        node->m_time = m_core->m_cur_time_sec + duration ;
673
674        m_core->m_node_gen.add_node(node);
675    }
676}
677
678/* add per port exit */
679void
680TrexStatelessDpCore::add_port_duration(double duration,
681                                       uint8_t port_id,
682                                       int event_id){
683    if (duration > 0.0) {
684        CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
685
686        node->m_type = CGenNode::COMMAND;
687
688        /* make sure it will be scheduled after the current node */
689        node->m_time = m_core->m_cur_time_sec + duration ;
690
691        TrexStatelessDpStop * cmd=new TrexStatelessDpStop(port_id);
692
693
694        /* test this */
695        m_core->m_non_active_nodes++;
696        cmd->set_core_ptr(m_core);
697        cmd->set_event_id(event_id);
698        cmd->set_wait_for_event_id(true);
699
700        node->m_cmd = cmd;
701
702        m_core->m_node_gen.add_node((CGenNode *)node);
703    }
704}
705
706
707void TrexStatelessDpCore::update_mac_addr(TrexStream * stream,
708                                          CGenNodeStateless *node,
709                                          pkt_dir_t dir,
710                                          char *raw_pkt){
711    bool              ov_src = stream->get_override_src_mac_by_pkt_data();
712    TrexStream::stream_dst_mac_t  ov_dst = stream->get_override_dst_mac_mode();
713
714
715    if ( (ov_src == true) && (ov_dst == TrexStream::stPKT) ) {
716        /* nothing to do, take from the packet both */
717        return;
718    }
719
720        /* take from cfg_file */
721    if ( (ov_src == false) &&
722         (ov_dst == TrexStream::stCFG_FILE) ){
723
724          m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*)raw_pkt);
725          return;
726    }
727
728    /* save the pkt*/
729    char tmp_pkt[12];
730    memcpy(tmp_pkt,raw_pkt,12);
731
732    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*)raw_pkt);
733
734    if ((ov_src == true) && (ov_dst == TrexStream::stCFG_FILE)) {
735        memcpy(raw_pkt+6,tmp_pkt+6,6);
736    }
737
738    if ((ov_src == false) && (ov_dst == TrexStream::stPKT)) {
739        memcpy(raw_pkt,tmp_pkt,6);
740    }
741}
742
743
744void TrexStatelessDpCore::replay_vm_into_cache(TrexStream * stream,
745                                               CGenNodeStateless *node){
746
747    uint16_t      cache_size = stream->m_cache_size;
748    assert(cache_size>0);
749    rte_mbuf_t * m=0;
750
751    uint32_t buf_size = CGenNodeCacheMbuf::get_object_size(cache_size);
752    CGenNodeCacheMbuf * p = (CGenNodeCacheMbuf *)malloc(buf_size);
753    assert(p);
754    memset(p,0,buf_size);
755
756    int i;
757    for (i=0; i<cache_size; i++) {
758        p->m_array[i] =  node->alloc_node_with_vm();
759    }
760    /* save const */
761    m=node->get_const_mbuf();
762    if (m) {
763        p->m_mbuf_const=m;
764        rte_pktmbuf_refcnt_update(m,1);
765    }
766
767    /* free all VM and const mbuf */
768    node->free_stl_vm_buf();
769
770    /* copy to local node meory */
771    node->cache_mbuf_array_copy(p,cache_size);
772
773    /* free the memory */
774    free(p);
775}
776
777
778void
779TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port,
780                                TrexStream * stream,
781                                TrexStreamsCompiledObj *comp) {
782
783    CGenNodeStateless *node = m_core->create_node_sl();
784
785    node->cache_mbuf_array_init();
786    node->m_batch_size=0;
787
788    /* add periodic */
789    node->m_cache_mbuf=0;
790    node->m_type = CGenNode::STATELESS_PKT;
791
792    node->m_action_counter = stream->m_action_count;
793
794    /* clone the stream from control plane memory to DP memory */
795    node->m_ref_stream_info = stream->clone();
796    /* no need for this memory anymore on the control plane memory */
797    stream->release_dp_object();
798
799    node->m_next_stream=0; /* will be fixed later */
800
801    if ( stream->m_self_start ){
802        /* if self start it is in active mode */
803        node->m_state =CGenNodeStateless::ss_ACTIVE;
804        lp_port->m_active_streams++;
805    }else{
806        node->m_state =CGenNodeStateless::ss_INACTIVE;
807    }
808
809    node->m_time = m_core->m_cur_time_sec + stream->get_start_delay_sec();
810
811    pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(stream->m_port_id);
812    node->m_flags = 0;
813    node->m_src_port =0;
814    node->m_original_packet_data_prefix = 0;
815
816    if (stream->m_rx_check.m_enabled) {
817        node->set_stat_needed();
818        uint8_t hw_id = stream->m_rx_check.m_hw_id;
819        assert (hw_id < MAX_FLOW_STATS + MAX_FLOW_STATS_PAYLOAD);
820        node->set_stat_hw_id(hw_id);
821    }
822
823    /* set socket id */
824    node->set_socket_id(m_core->m_node_gen.m_socket_id);
825
826    /* build a mbuf from a packet */
827
828    uint16_t pkt_size = stream->m_pkt.len;
829    const uint8_t *stream_pkt = stream->m_pkt.binary;
830
831    node->m_pause =0;
832    node->m_stream_type = stream->m_type;
833    node->m_next_time_offset = 1.0 / stream->get_pps();
834    node->m_null_stream = (stream->m_null_stream ? 1 : 0);
835
836    /* stateless specific fields */
837    switch ( stream->m_type ) {
838
839    case TrexStream::stCONTINUOUS :
840        node->m_single_burst=0;
841        node->m_single_burst_refill=0;
842        node->m_multi_bursts=0;
843        break;
844
845    case TrexStream::stSINGLE_BURST :
846        node->m_stream_type             = TrexStream::stMULTI_BURST;
847        node->m_single_burst            = stream->m_burst_total_pkts;
848        node->m_single_burst_refill     = stream->m_burst_total_pkts;
849        node->m_multi_bursts            = 1;  /* single burst in multi burst of 1 */
850        break;
851
852    case TrexStream::stMULTI_BURST :
853        node->m_single_burst        = stream->m_burst_total_pkts;
854        node->m_single_burst_refill = stream->m_burst_total_pkts;
855        node->m_multi_bursts        = stream->m_num_bursts;
856        break;
857    default:
858
859        assert(0);
860    };
861
862    node->m_port_id = stream->m_port_id;
863
864    /* set dir 0 or 1 client or server */
865    node->set_mbuf_cache_dir(dir);
866
867
868    if (node->m_ref_stream_info->getDpVm() == NULL) {
869        /* no VM */
870
871        node->m_vm_flow_var =  NULL;
872        node->m_vm_program  =  NULL;
873        node->m_vm_program_size =0;
874
875                /* allocate const mbuf */
876        rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size);
877        assert(m);
878
879        char *p = rte_pktmbuf_append(m, pkt_size);
880        assert(p);
881        /* copy the packet */
882        memcpy(p,stream_pkt,pkt_size);
883
884        update_mac_addr(stream,node,dir,p);
885
886        /* set the packet as a readonly */
887        node->set_cache_mbuf(m);
888
889        node->m_original_packet_data_prefix =0;
890    }else{
891
892        /* set the program */
893        TrexStream * local_mem_stream = node->m_ref_stream_info;
894
895        StreamVmDp  * lpDpVm = local_mem_stream->getDpVm();
896
897        node->m_vm_flow_var      = lpDpVm->clone_bss(); /* clone the flow var */
898        node->m_vm_program       = lpDpVm->get_program(); /* same ref to the program */
899        node->m_vm_program_size  = lpDpVm->get_program_size();
900
901
902        /* set the random seed if was set */
903        if ( lpDpVm->is_random_seed() ){
904            /* if we have random seed for this program */
905            if (stream->m_random_seed) {
906                node->set_random_seed(stream->m_random_seed);
907            }
908        }
909
910        /* we need to copy the object */
911        if ( pkt_size > lpDpVm->get_prefix_size() ) {
912            /* we need const packet */
913            uint16_t const_pkt_size  = pkt_size - lpDpVm->get_prefix_size() ;
914            rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), const_pkt_size );
915            assert(m);
916
917            char *p = rte_pktmbuf_append(m, const_pkt_size);
918            assert(p);
919
920            /* copy packet data */
921            memcpy(p,(stream_pkt + lpDpVm->get_prefix_size()),const_pkt_size);
922
923            node->set_const_mbuf(m);
924        }
925
926
927        if ( lpDpVm->is_pkt_size_var() ) {
928            // mark the node as varible size
929            node->set_var_pkt_size();
930        }
931
932
933        if (lpDpVm->get_prefix_size() > pkt_size ) {
934            lpDpVm->set_prefix_size(pkt_size);
935        }
936
937        /* copy the headr */
938        uint16_t header_size = lpDpVm->get_prefix_size();
939        assert(header_size);
940        node->alloc_prefix_header(header_size);
941        uint8_t *p=node->m_original_packet_data_prefix;
942        assert(p);
943
944        memcpy(p,stream_pkt , header_size);
945
946        update_mac_addr(stream,node,dir,(char *)p);
947
948        if (stream->m_cache_size > 0 ) {
949            /* we need to create cache of objects */
950            replay_vm_into_cache(stream, node);
951        }
952    }
953
954
955    CDpOneStream one_stream;
956
957    one_stream.m_dp_stream = node->m_ref_stream_info;
958    one_stream.m_node =node;
959
960    lp_port->m_active_nodes.push_back(one_stream);
961
962    /* schedule only if active */
963    if (node->m_state == CGenNodeStateless::ss_ACTIVE) {
964        m_core->m_node_gen.add_node((CGenNode *)node);
965    }
966}
967
968void
969TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj,
970                                   double duration,
971                                   int event_id) {
972
973
974    TrexStatelessDpPerPort * lp_port=get_port_db(obj->get_port_id());
975    lp_port->m_active_streams = 0;
976    lp_port->set_event_id(event_id);
977
978    /* update cur time */
979    if ( CGlobalInfo::is_realtime()  ){
980        m_core->m_cur_time_sec = now_sec() + SCHD_OFFSET_DTIME ;
981    }
982
983    /* no nodes in the list */
984    assert(lp_port->m_active_nodes.size()==0);
985
986    for (auto single_stream : obj->get_objects()) {
987        /* all commands should be for the same port */
988        assert(obj->get_port_id() == single_stream.m_stream->m_port_id);
989        add_stream(lp_port,single_stream.m_stream,obj);
990    }
991
992    uint32_t nodes = lp_port->m_active_nodes.size();
993    /* find next stream */
994    assert(nodes == obj->get_objects().size());
995
996    int cnt=0;
997
998    /* set the next_stream pointer  */
999    for (auto single_stream : obj->get_objects()) {
1000
1001        if (single_stream.m_stream->is_dp_next_stream() ) {
1002            int stream_id = single_stream.m_stream->m_next_stream_id;
1003            assert(stream_id<nodes);
1004            /* point to the next stream , stream_id is fixed */
1005            lp_port->m_active_nodes[cnt].m_node->m_next_stream = lp_port->m_active_nodes[stream_id].m_node ;
1006        }
1007        cnt++;
1008    }
1009
1010    lp_port->m_state =TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
1011    m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
1012
1013
1014    if ( duration > 0.0 ){
1015        add_port_duration( duration ,obj->get_port_id(),event_id );
1016    }
1017
1018}
1019
1020
1021bool TrexStatelessDpCore::are_all_ports_idle(){
1022
1023    bool res=true;
1024    int i;
1025    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
1026        if ( m_ports[i].m_state != TrexStatelessDpPerPort::ppSTATE_IDLE ){
1027            res=false;
1028        }
1029    }
1030    return (res);
1031}
1032
1033
1034void
1035TrexStatelessDpCore::resume_traffic(uint8_t port_id){
1036
1037    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1038
1039    lp_port->resume_traffic(port_id);
1040}
1041
1042
1043void
1044TrexStatelessDpCore::pause_traffic(uint8_t port_id){
1045
1046    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1047
1048    lp_port->pause_traffic(port_id);
1049}
1050
1051
1052void
1053TrexStatelessDpCore::push_pcap(uint8_t port_id,
1054                               int event_id,
1055                               const std::string &pcap_filename,
1056                               double ipg_usec,
1057                               double speedup,
1058                               uint32_t count,
1059                               double duration) {
1060
1061    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1062
1063    lp_port->set_event_id(event_id);
1064
1065    /* delegate the command to the port */
1066    bool rc = lp_port->push_pcap(port_id, pcap_filename, ipg_usec, speedup, count);
1067    if (!rc) {
1068        /* report back that we stopped */
1069        CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
1070        TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
1071                                                                       port_id,
1072                                                                       event_id,
1073                                                                       false);
1074        ring->Enqueue((CGenNode *)event_msg);
1075        return;
1076    }
1077
1078
1079    if (duration > 0.0) {
1080        add_port_duration(duration, port_id, event_id);
1081    }
1082
1083     m_state = TrexStatelessDpCore::STATE_PCAP_TX;
1084}
1085
1086
1087void
1088TrexStatelessDpCore::update_traffic(uint8_t port_id, double factor) {
1089
1090    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1091
1092    lp_port->update_traffic(port_id, factor);
1093}
1094
1095
1096void
1097TrexStatelessDpCore::stop_traffic(uint8_t  port_id,
1098                                  bool     stop_on_id,
1099                                  int      event_id) {
1100    /* we cannot remove nodes not from the top of the queue so
1101       for every active node - make sure next time
1102       the scheduler invokes it, it will be free */
1103
1104    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1105    if ( lp_port->stop_traffic(port_id,stop_on_id,event_id) == false){
1106        return;
1107    }
1108
1109
1110    /* flush the TX queue before sending done message to the CP */
1111    m_core->flush_tx_queue();
1112
1113    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
1114    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
1115                                                                   port_id,
1116                                                                   lp_port->get_event_id());
1117    ring->Enqueue((CGenNode *)event_msg);
1118
1119}
1120
1121/**
1122 * handle a message from CP to DP
1123 *
1124 */
1125void
1126TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) {
1127    msg->handle(this);
1128    delete msg;
1129}
1130
1131void
1132TrexStatelessDpCore::barrier(uint8_t port_id, int event_id) {
1133
1134    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
1135    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
1136                                                                   port_id,
1137                                                                   event_id);
1138    ring->Enqueue((CGenNode *)event_msg);
1139}
1140
1141
1142/**
1143 * PCAP node
1144 */
1145bool CGenNodePCAP::create(uint8_t port_id,
1146                          pkt_dir_t dir,
1147                          socket_id_t socket_id,
1148                          const uint8_t *mac_addr,
1149                          const std::string &pcap_filename,
1150                          double ipg_usec,
1151                          double speedup,
1152                          uint32_t count) {
1153    std::stringstream ss;
1154
1155    m_type       = CGenNode::PCAP_PKT;
1156    m_flags      = 0;
1157    m_src_port   = 0;
1158    m_port_id    = port_id;
1159    m_count      = count;
1160
1161    /* mark this node as slow path */
1162    set_slow_path(true);
1163
1164    if (ipg_usec != -1) {
1165        /* fixed IPG */
1166        m_ipg_sec = usec_to_sec(ipg_usec / speedup);
1167        m_speedup = 0;
1168    } else {
1169        /* packet IPG */
1170        m_ipg_sec = -1;
1171        m_speedup  = speedup;
1172    }
1173
1174    /* copy MAC addr info */
1175    memcpy(m_mac_addr, mac_addr, 12);
1176
1177    /* set the dir */
1178    set_mbuf_dir(dir);
1179    set_socket_id(socket_id);
1180
1181    /* create the PCAP reader */
1182    m_reader = CCapReaderFactory::CreateReader((char *)pcap_filename.c_str(), 0, ss);
1183    if (!m_reader) {
1184        return false;
1185    }
1186
1187    m_raw_packet = new CCapPktRaw();
1188    if ( m_reader->ReadPacket(m_raw_packet) == false ){
1189        /* handle error */
1190        delete m_reader;
1191        return (false);
1192    }
1193
1194    /* this is the reference time */
1195    //m_base_time = m_raw_packet->get_time();
1196    m_last_pkt_time = m_raw_packet->get_time();
1197
1198    /* ready */
1199    m_state = PCAP_ACTIVE;
1200
1201    return true;
1202}
1203
1204/**
1205 * cleanup for PCAP node
1206 *
1207 * @author imarom (08-May-16)
1208 */
1209void CGenNodePCAP::destroy() {
1210
1211    if (m_raw_packet) {
1212        delete m_raw_packet;
1213        m_raw_packet = NULL;
1214    }
1215
1216    if (m_reader) {
1217        delete m_reader;
1218        m_reader = NULL;
1219    }
1220
1221    m_state = PCAP_INVALID;
1222}
1223
1224