trex_stateless_dp_core.cpp revision 89d643b9
1/*
2 Itay Marom
3 Hanoch Haim
4 Cisco Systems, Inc.
5*/
6
7/*
8Copyright (c) 2015-2016 Cisco Systems, Inc.
9
10Licensed under the Apache License, Version 2.0 (the "License");
11you may not use this file except in compliance with the License.
12You may obtain a copy of the License at
13
14    http://www.apache.org/licenses/LICENSE-2.0
15
16Unless required by applicable law or agreed to in writing, software
17distributed under the License is distributed on an "AS IS" BASIS,
18WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19See the License for the specific language governing permissions and
20limitations under the License.
21*/
22#include "bp_sim.h"
23#include "trex_stateless_dp_core.h"
24#include "trex_stateless_messaging.h"
25#include "trex_stream.h"
26#include "trex_stream_node.h"
27#include "trex_streams_compiler.h"
28#include "mbuf.h"
29
30
31
32
33void CGenNodeStateless::cache_mbuf_array_init(){
34    m_cache_size=0;
35    m_cache_array_cnt=0;
36}
37
38
39
40void CGenNodeStateless::cache_mbuf_array_copy(CGenNodeCacheMbuf *obj,
41                                              uint16_t size){
42
43    int i;
44    cache_mbuf_array_alloc(size);
45    for (i=0; i<size; i++) {
46        cache_mbuf_array_set(i,obj->m_array[i]);
47    }
48    cache_mbuf_array_set_const_mbuf(obj->m_mbuf_const);
49}
50
51
52rte_mbuf_t ** CGenNodeStateless::cache_mbuf_array_alloc(uint16_t size){
53
54    uint32_t buf_size = CGenNodeCacheMbuf::get_object_size(size);
55    /* TBD  replace with align, zero API */
56    m_cache_mbuf = (void *)malloc(buf_size);
57    assert(m_cache_mbuf);
58    memset(m_cache_mbuf,0,buf_size);
59
60    m_flags |= SL_NODE_CONST_MBUF_CACHE_ARRAY;
61    m_cache_size=size;
62    m_cache_array_cnt=0;
63    return ((rte_mbuf_t **)m_cache_mbuf);
64}
65
66void CGenNodeStateless::cache_mbuf_array_free(){
67
68    assert(m_cache_mbuf);
69    int i;
70    for (i=0; i<(int)m_cache_size; i++) {
71        rte_mbuf_t * m=cache_mbuf_array_get((uint16_t)i);
72        assert(m);
73        rte_pktmbuf_free(m);
74    }
75
76    /* free the const */
77    rte_mbuf_t * m=cache_mbuf_array_get_const_mbuf() ;
78    if (m) {
79        rte_pktmbuf_free(m);
80    }
81
82    free(m_cache_mbuf);
83    m_cache_mbuf=0;
84}
85
86
87rte_mbuf_t * CGenNodeStateless::cache_mbuf_array_get(uint16_t index){
88
89    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
90    return (p->m_array[index]);
91}
92
93void CGenNodeStateless::cache_mbuf_array_set_const_mbuf(rte_mbuf_t * m){
94    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
95    p->m_mbuf_const=m;
96}
97
98rte_mbuf_t * CGenNodeStateless::cache_mbuf_array_get_const_mbuf(){
99    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
100    return (p->m_mbuf_const);
101}
102
103
104void CGenNodeStateless::cache_mbuf_array_set(uint16_t index,
105                                             rte_mbuf_t * m){
106    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
107    p->m_array[index]=m;
108}
109
110
111void CDpOneStream::Delete(CFlowGenListPerThread   * core){
112    assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE);
113    core->free_node((CGenNode *)m_node);
114    delete m_dp_stream;
115    m_node=0;
116    m_dp_stream=0;
117}
118
119void CDpOneStream::DeleteOnlyStream(){
120    assert(m_dp_stream);
121    delete m_dp_stream;
122    m_dp_stream=0;
123}
124
125int CGenNodeStateless::get_stream_id(){
126    if (m_state ==CGenNodeStateless::ss_FREE_RESUSE) {
127        return (-1); // not valid
128    }
129    assert(m_ref_stream_info);
130    return ((int)m_ref_stream_info->m_stream_id);
131}
132
133
134void CGenNodeStateless::DumpHeader(FILE *fd){
135    fprintf(fd," pkt_id, time, port , action , state, stream_id , stype , m-burst# , burst# \n");
136
137}
138void CGenNodeStateless::Dump(FILE *fd){
139    fprintf(fd," %2.4f, %3lu, %s,%s, %3d, %s, %3lu, %3lu  \n",
140            m_time,
141            (ulong)m_port_id,
142            "s-pkt", //action
143            get_stream_state_str(m_state ).c_str(),
144            get_stream_id(),   //stream_id
145            TrexStream::get_stream_type_str(m_stream_type).c_str(), //stype
146            (ulong)m_multi_bursts,
147            (ulong)m_single_burst
148            );
149}
150
151
152
153void CGenNodeStateless::refresh_vm_bss(){
154    if ( m_vm_flow_var ) {
155        StreamVmDp  * vm_s=m_ref_stream_info->m_vm_dp;
156        assert(vm_s);
157        memcpy(m_vm_flow_var,vm_s->get_bss(),vm_s->get_bss_size());
158
159        if ( vm_s->is_random_seed() ){
160            /* if we have random seed for this program */
161            if (m_ref_stream_info->m_random_seed) {
162                set_random_seed(m_ref_stream_info->m_random_seed);
163            }
164        }
165    }
166}
167
168
169/**
170 * this function called when stream restart after it was inactive
171 */
172void CGenNodeStateless::refresh(){
173
174    /* refill the stream info */
175    m_single_burst    = m_single_burst_refill;
176    m_multi_bursts    = m_ref_stream_info->m_num_bursts;
177    m_state           = CGenNodeStateless::ss_ACTIVE;
178
179    /* refresh init value */
180#if 0
181    /* TBD should add a JSON varible for that */
182    refresh_vm_bss();
183#endif
184}
185
186
187void CGenNodeCommand::free_command(){
188
189    assert(m_cmd);
190    m_cmd->on_node_remove();
191    delete m_cmd;
192}
193
194
195std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state){
196    std::string res;
197
198    switch (stream_state) {
199    case CGenNodeStateless::ss_FREE_RESUSE :
200         res="FREE    ";
201        break;
202    case CGenNodeStateless::ss_INACTIVE :
203        res="INACTIVE ";
204        break;
205    case CGenNodeStateless::ss_ACTIVE :
206        res="ACTIVE   ";
207        break;
208    default:
209        res="Unknow   ";
210    };
211    return(res);
212}
213
214rte_mbuf_t * CGenNodeStateless::alloc_flow_stat_mbuf(rte_mbuf_t *m) {
215    //????????? temp implementation. Just copy the entire mbuf
216    rte_mbuf_t *m_new = CGlobalInfo::pktmbuf_alloc( get_socket_id(), m->data_len );
217    /* TBD remove this, should handle cases of error */
218    assert(m_new);
219    char *p = rte_pktmbuf_mtod(m, char*);
220    char *p_new = rte_pktmbuf_append(m_new, m->data_len);
221    memcpy(p_new , p, m->data_len);
222
223    return m_new;
224}
225
226rte_mbuf_t   * CGenNodeStateless::alloc_node_with_vm(){
227
228    rte_mbuf_t        * m;
229    /* alloc small packet buffer*/
230    uint16_t prefix_size = prefix_header_size();
231    m = CGlobalInfo::pktmbuf_alloc( get_socket_id(), prefix_size );
232    if (m==0) {
233        return (m);
234    }
235    /* TBD remove this, should handle cases of error */
236    assert(m);
237    char *p=rte_pktmbuf_append(m, prefix_size);
238    memcpy( p ,m_original_packet_data_prefix, prefix_size);
239
240
241    /* run the VM program */
242    StreamDPVmInstructionsRunner runner;
243
244    runner.run( (uint32_t*)m_vm_flow_var,
245                m_vm_program_size,
246                m_vm_program,
247                m_vm_flow_var,
248                (uint8_t*)p);
249
250    uint16_t pkt_new_size=runner.get_new_pkt_size();
251    if ( likely( pkt_new_size == 0) ) {
252        /* no packet size change */
253        rte_mbuf_t * m_const = get_const_mbuf();
254        if (  m_const != NULL) {
255            utl_rte_pktmbuf_add_after(m,m_const);
256        }
257        return (m);
258    }
259
260    /* packet size change there are a few changes */
261    rte_mbuf_t * m_const = get_const_mbuf();
262    if ( (m_const == 0 ) || (pkt_new_size<=prefix_size) ) {
263        /* one mbuf , just trim it */
264        m->data_len = pkt_new_size;
265        m->pkt_len  = pkt_new_size;
266        return (m);
267    }
268
269    rte_mbuf_t * mi= CGlobalInfo::pktmbuf_alloc_small(get_socket_id());
270    assert(mi);
271    rte_pktmbuf_attach(mi,m_const);
272    utl_rte_pktmbuf_add_after2(m,mi);
273
274    if ( pkt_new_size < m->pkt_len) {
275        /* need to trim it */
276        mi->data_len = (pkt_new_size - prefix_size);
277        m->pkt_len   = pkt_new_size;
278    }
279    return (m);
280}
281
282void CGenNodeStateless::free_stl_vm_buf(){
283        rte_mbuf_t * m ;
284         m=get_const_mbuf();
285         if (m) {
286             rte_pktmbuf_free(m); /* reduce the ref counter */
287             /* clear the const marker */
288             clear_const_mbuf();
289         }
290
291         free_prefix_header();
292
293         if (m_vm_flow_var) {
294             /* free flow var */
295             free(m_vm_flow_var);
296             m_vm_flow_var=0;
297         }
298}
299
300
301
302void CGenNodeStateless::free_stl_node(){
303
304    if ( is_cache_mbuf_array() ){
305        /* do we have cache of mbuf pre allocated */
306        cache_mbuf_array_free();
307    }else{
308        /* if we have cache mbuf free it */
309        rte_mbuf_t * m=get_cache_mbuf();
310        if (m) {
311                rte_pktmbuf_free(m);
312                m_cache_mbuf=0;
313        }
314    }
315    free_stl_vm_buf();
316}
317
318
319bool TrexStatelessDpPerPort::update_number_of_active_streams(uint32_t d){
320    m_active_streams-=d; /* reduce the number of streams */
321    if (m_active_streams == 0) {
322        return (true);
323    }
324    return (false);
325}
326
327bool TrexStatelessDpPerPort::resume_traffic(uint8_t port_id){
328
329    /* we are working with continues streams so we must be in transmit mode */
330    assert(m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE);
331
332    for (auto dp_stream : m_active_nodes) {
333        CGenNodeStateless * node =dp_stream.m_node;
334        assert(node->get_port_id() == port_id);
335        assert(node->is_pause() == true);
336        node->set_pause(false);
337    }
338    m_state = TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
339    return (true);
340}
341
342bool TrexStatelessDpPerPort::update_traffic(uint8_t port_id, double factor) {
343
344    assert( (m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING ||
345            (m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE)) );
346
347    for (auto dp_stream : m_active_nodes) {
348        CGenNodeStateless * node = dp_stream.m_node;
349        assert(node->get_port_id() == port_id);
350
351        node->update_rate(factor);
352    }
353
354    return (true);
355}
356
357bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){
358
359    /* we are working with continues streams so we must be in transmit mode */
360    assert(m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING);
361
362    for (auto dp_stream : m_active_nodes) {
363        CGenNodeStateless * node =dp_stream.m_node;
364        assert(node->get_port_id() == port_id);
365        assert(node->is_pause() == false);
366        node->set_pause(true);
367    }
368    m_state = TrexStatelessDpPerPort::ppSTATE_PAUSE;
369    return (true);
370}
371
372bool TrexStatelessDpPerPort::push_pcap(uint8_t port_id,
373                                       const std::string &pcap_filename,
374                                       double ipg_usec,
375                                       double speedup,
376                                       uint32_t count) {
377
378    /* push pcap can only happen on an idle port from the core prespective */
379    assert(m_state == TrexStatelessDpPerPort::ppSTATE_IDLE);
380
381    CGenNodePCAP *pcap_node = m_core->allocate_pcap_node();
382    if (!pcap_node) {
383        return (false);
384    }
385
386    pkt_dir_t dir          = m_core->m_node_gen.m_v_if->port_id_to_dir(port_id);
387    socket_id_t socket_id  = m_core->m_node_gen.m_socket_id;
388
389    uint8_t mac_addr[12];
390    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, mac_addr);
391
392    bool rc = pcap_node->create(port_id,
393                                dir,
394                                socket_id,
395                                mac_addr,
396                                pcap_filename,
397                                ipg_usec,
398                                speedup,
399                                count);
400    if (!rc) {
401        m_core->free_node((CGenNode *)pcap_node);
402        return (false);
403    }
404
405    /* schedule the node for now */
406    pcap_node->m_time = m_core->m_cur_time_sec;
407    m_core->m_node_gen.add_node((CGenNode *)pcap_node);
408
409    /* hold a pointer to the node */
410    assert(m_active_pcap_node == NULL);
411    m_active_pcap_node = pcap_node;
412
413    m_state = TrexStatelessDpPerPort::ppSTATE_PCAP_TX;
414    return (true);
415}
416
417
418bool TrexStatelessDpPerPort::stop_traffic(uint8_t  port_id,
419                                          bool     stop_on_id,
420                                          int      event_id){
421
422
423    if (m_state == TrexStatelessDpPerPort::ppSTATE_IDLE) {
424        assert(m_active_streams==0);
425        return false;
426    }
427
428    /* there could be race of stop after stop */
429    if ( stop_on_id ) {
430        if (event_id != m_event_id){
431            /* we can't stop it is an old message */
432            return false;
433        }
434    }
435
436    for (auto dp_stream : m_active_nodes) {
437        CGenNodeStateless * node =dp_stream.m_node;
438        assert(node->get_port_id() == port_id);
439        if ( node->get_state() == CGenNodeStateless::ss_ACTIVE) {
440            node->mark_for_free();
441            m_active_streams--;
442            dp_stream.DeleteOnlyStream();
443
444        }else{
445            dp_stream.Delete(m_core);
446        }
447    }
448
449    /* check for active PCAP node */
450    if (m_active_pcap_node) {
451        /* when got async stop from outside or duration */
452        if (m_active_pcap_node->is_active()) {
453            m_active_pcap_node->mark_for_free();
454        } else {
455            /* graceful stop - node was put out by the scheduler */
456            m_core->free_node( (CGenNode *)m_active_pcap_node);
457        }
458
459        m_active_pcap_node = NULL;
460    }
461
462    /* active stream should be zero */
463    assert(m_active_streams==0);
464    m_active_nodes.clear();
465    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
466    return (true);
467}
468
469
470void TrexStatelessDpPerPort::create(CFlowGenListPerThread   *  core){
471    m_core=core;
472    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
473    m_active_streams=0;
474    m_active_nodes.clear();
475    m_active_pcap_node = NULL;
476}
477
478
479
480void
481TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) {
482    m_thread_id = thread_id;
483    m_core = core;
484    m_local_port_offset = 2*core->getDualPortId();
485
486    CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
487
488    m_ring_from_cp = cp_dp->getRingCpToDp(thread_id);
489    m_ring_to_cp   = cp_dp->getRingDpToCp(thread_id);
490
491    m_state = STATE_IDLE;
492
493    int i;
494    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
495        m_ports[i].create(core);
496    }
497}
498
499
500/* move to the next stream, old stream move to INACTIVE */
501bool TrexStatelessDpCore::set_stateless_next_node(CGenNodeStateless * cur_node,
502                                                  CGenNodeStateless * next_node){
503
504    assert(cur_node);
505    TrexStatelessDpPerPort * lp_port = get_port_db(cur_node->m_port_id);
506    bool schedule =false;
507
508    bool to_stop_port=false;
509
510    if (next_node == NULL) {
511        /* there is no next stream , reduce the number of active streams*/
512        to_stop_port = lp_port->update_number_of_active_streams(1);
513
514    }else{
515        uint8_t state=next_node->get_state();
516
517        /* can't be FREE_RESUSE */
518        assert(state != CGenNodeStateless::ss_FREE_RESUSE);
519        if (state == CGenNodeStateless::ss_INACTIVE ) {
520
521            if (cur_node->m_action_counter > 0) {
522                cur_node->m_action_counter--;
523                if (cur_node->m_action_counter==0) {
524                    to_stop_port = lp_port->update_number_of_active_streams(1);
525                }else{
526                    /* refill start info and scedule, no update in active streams  */
527                    next_node->refresh();
528                    schedule = true;
529                }
530            }else{
531                /* refill start info and scedule, no update in active streams  */
532                next_node->refresh();
533                schedule = true;
534            }
535
536        }else{
537            to_stop_port = lp_port->update_number_of_active_streams(1);
538        }
539    }
540
541    if ( to_stop_port ) {
542        /* call stop port explictly to move the state */
543        stop_traffic(cur_node->m_port_id,false,0);
544    }
545
546    return ( schedule );
547}
548
549
550
551/**
552 * in idle state loop, the processor most of the time sleeps
553 * and periodically checks for messages
554 *
555 * @author imarom (01-Nov-15)
556 */
557void
558TrexStatelessDpCore::idle_state_loop() {
559
560    const int SHORT_DELAY_MS    = 2;
561    const int LONG_DELAY_MS     = 50;
562    const int DEEP_SLEEP_LIMIT  = 2000;
563
564    int counter = 0;
565
566    while (m_state == STATE_IDLE) {
567        m_core->m_node_gen.m_v_if->flush_dp_rx_queue();
568        bool had_msg = periodic_check_for_cp_messages();
569        if (had_msg) {
570            counter = 0;
571            continue;
572        }
573
574        /* enter deep sleep only if enough time had passed */
575        if (counter < DEEP_SLEEP_LIMIT) {
576            delay(SHORT_DELAY_MS);
577            counter++;
578        } else {
579            delay(LONG_DELAY_MS);
580        }
581
582    }
583}
584
585
586
587void TrexStatelessDpCore::quit_main_loop(){
588    m_core->set_terminate_mode(true); /* mark it as terminated */
589    m_state = STATE_TERMINATE;
590    add_global_duration(0.0001);
591}
592
593
594/**
595 * scehduler runs when traffic exists
596 * it will return when no more transmitting is done on this
597 * core
598 *
599 * @author imarom (01-Nov-15)
600 */
601void
602TrexStatelessDpCore::start_scheduler() {
603    /* creates a maintenace job using the scheduler */
604    CGenNode * node_sync = m_core->create_node() ;
605    node_sync->m_type = CGenNode::FLOW_SYNC;
606    node_sync->m_time = m_core->m_cur_time_sec + SYNC_TIME_OUT;
607    m_core->m_node_gen.add_node(node_sync);
608
609    double old_offset = 0.0;
610    m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset);
611    /* bail out in case of terminate */
612    if (m_state != TrexStatelessDpCore::STATE_TERMINATE) {
613        m_core->m_node_gen.close_file(m_core);
614        m_state = STATE_IDLE; /* we exit from all ports and we have nothing to do, we move to IDLE state */
615    }
616}
617
618
619void
620TrexStatelessDpCore::run_once(){
621
622    idle_state_loop();
623
624    if ( m_state == STATE_TERMINATE ){
625        return;
626    }
627
628    start_scheduler();
629}
630
631
632
633
634void
635TrexStatelessDpCore::start() {
636
637    while (true) {
638        run_once();
639
640        if ( m_core->is_terminated_by_master() ) {
641            break;
642        }
643    }
644}
645
646/* only if both port are idle we can exit */
647void
648TrexStatelessDpCore::schedule_exit(){
649
650    CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
651
652    node->m_type = CGenNode::COMMAND;
653
654    node->m_cmd = new TrexStatelessDpCanQuit();
655
656    /* make sure it will be scheduled after the current node */
657    node->m_time = m_core->m_cur_time_sec ;
658
659    m_core->m_node_gen.add_node((CGenNode *)node);
660}
661
662
663void
664TrexStatelessDpCore::add_global_duration(double duration){
665    if (duration > 0.0) {
666        CGenNode *node = m_core->create_node() ;
667
668        node->m_type = CGenNode::EXIT_SCHED;
669
670        /* make sure it will be scheduled after the current node */
671        node->m_time = m_core->m_cur_time_sec + duration ;
672
673        m_core->m_node_gen.add_node(node);
674    }
675}
676
677/* add per port exit */
678void
679TrexStatelessDpCore::add_port_duration(double duration,
680                                       uint8_t port_id,
681                                       int event_id){
682    if (duration > 0.0) {
683        CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
684
685        node->m_type = CGenNode::COMMAND;
686
687        /* make sure it will be scheduled after the current node */
688        node->m_time = m_core->m_cur_time_sec + duration ;
689
690        TrexStatelessDpStop * cmd=new TrexStatelessDpStop(port_id);
691
692
693        /* test this */
694        m_core->m_non_active_nodes++;
695        cmd->set_core_ptr(m_core);
696        cmd->set_event_id(event_id);
697        cmd->set_wait_for_event_id(true);
698
699        node->m_cmd = cmd;
700
701        m_core->m_node_gen.add_node((CGenNode *)node);
702    }
703}
704
705
706void TrexStatelessDpCore::update_mac_addr(TrexStream * stream,
707                                          CGenNodeStateless *node,
708                                          pkt_dir_t dir,
709                                          char *raw_pkt){
710    bool              ov_src = stream->get_override_src_mac_by_pkt_data();
711    TrexStream::stream_dst_mac_t  ov_dst = stream->get_override_dst_mac_mode();
712
713
714    if ( (ov_src == true) && (ov_dst == TrexStream::stPKT) ) {
715        /* nothing to do, take from the packet both */
716        return;
717    }
718
719        /* take from cfg_file */
720    if ( (ov_src == false) &&
721         (ov_dst == TrexStream::stCFG_FILE) ){
722
723          m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*)raw_pkt);
724          return;
725    }
726
727    /* save the pkt*/
728    char tmp_pkt[12];
729    memcpy(tmp_pkt,raw_pkt,12);
730
731    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*)raw_pkt);
732
733    if ((ov_src == true) && (ov_dst == TrexStream::stCFG_FILE)) {
734        memcpy(raw_pkt+6,tmp_pkt+6,6);
735    }
736
737    if ((ov_src == false) && (ov_dst == TrexStream::stPKT)) {
738        memcpy(raw_pkt,tmp_pkt,6);
739    }
740}
741
742
743void TrexStatelessDpCore::replay_vm_into_cache(TrexStream * stream,
744                                               CGenNodeStateless *node){
745
746    uint16_t      cache_size = stream->m_cache_size;
747    assert(cache_size>0);
748    rte_mbuf_t * m=0;
749
750    uint32_t buf_size = CGenNodeCacheMbuf::get_object_size(cache_size);
751    CGenNodeCacheMbuf * p = (CGenNodeCacheMbuf *)malloc(buf_size);
752    assert(p);
753    memset(p,0,buf_size);
754
755    int i;
756    for (i=0; i<cache_size; i++) {
757        p->m_array[i] =  node->alloc_node_with_vm();
758    }
759    /* save const */
760    m=node->get_const_mbuf();
761    if (m) {
762        p->m_mbuf_const=m;
763        rte_pktmbuf_refcnt_update(m,1);
764    }
765
766    /* free all VM and const mbuf */
767    node->free_stl_vm_buf();
768
769    /* copy to local node meory */
770    node->cache_mbuf_array_copy(p,cache_size);
771
772    /* free the memory */
773    free(p);
774}
775
776
777void
778TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port,
779                                TrexStream * stream,
780                                TrexStreamsCompiledObj *comp) {
781
782    CGenNodeStateless *node = m_core->create_node_sl();
783
784    node->cache_mbuf_array_init();
785    node->m_batch_size=0;
786
787    /* add periodic */
788    node->m_cache_mbuf=0;
789    node->m_type = CGenNode::STATELESS_PKT;
790
791    node->m_action_counter = stream->m_action_count;
792
793    /* clone the stream from control plane memory to DP memory */
794    node->m_ref_stream_info = stream->clone();
795    /* no need for this memory anymore on the control plane memory */
796    stream->release_dp_object();
797
798    node->m_next_stream=0; /* will be fixed later */
799
800    if ( stream->m_self_start ){
801        /* if self start it is in active mode */
802        node->m_state =CGenNodeStateless::ss_ACTIVE;
803        lp_port->m_active_streams++;
804    }else{
805        node->m_state =CGenNodeStateless::ss_INACTIVE;
806    }
807
808    node->m_time = m_core->m_cur_time_sec + stream->get_start_delay_sec();
809
810    pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(stream->m_port_id);
811    node->m_flags = 0;
812    node->m_src_port =0;
813    node->m_original_packet_data_prefix = 0;
814
815    if (stream->m_rx_check.m_enabled) {
816        node->set_stat_needed();
817        uint8_t hw_id = stream->m_rx_check.m_hw_id;
818        assert (hw_id < MAX_FLOW_STATS + MAX_FLOW_STATS_PAYLOAD);
819        node->set_stat_hw_id(hw_id);
820    }
821
822    /* set socket id */
823    node->set_socket_id(m_core->m_node_gen.m_socket_id);
824
825    /* build a mbuf from a packet */
826
827    uint16_t pkt_size = stream->m_pkt.len;
828    const uint8_t *stream_pkt = stream->m_pkt.binary;
829
830    node->m_pause =0;
831    node->m_stream_type = stream->m_type;
832    node->m_next_time_offset = 1.0 / stream->get_pps();
833    node->m_null_stream = (stream->m_null_stream ? 1 : 0);
834
835    /* stateless specific fields */
836    switch ( stream->m_type ) {
837
838    case TrexStream::stCONTINUOUS :
839        node->m_single_burst=0;
840        node->m_single_burst_refill=0;
841        node->m_multi_bursts=0;
842        break;
843
844    case TrexStream::stSINGLE_BURST :
845        node->m_stream_type             = TrexStream::stMULTI_BURST;
846        node->m_single_burst            = stream->m_burst_total_pkts;
847        node->m_single_burst_refill     = stream->m_burst_total_pkts;
848        node->m_multi_bursts            = 1;  /* single burst in multi burst of 1 */
849        break;
850
851    case TrexStream::stMULTI_BURST :
852        node->m_single_burst        = stream->m_burst_total_pkts;
853        node->m_single_burst_refill = stream->m_burst_total_pkts;
854        node->m_multi_bursts        = stream->m_num_bursts;
855        break;
856    default:
857
858        assert(0);
859    };
860
861    node->m_port_id = stream->m_port_id;
862
863    /* set dir 0 or 1 client or server */
864    node->set_mbuf_cache_dir(dir);
865
866
867    if (node->m_ref_stream_info->getDpVm() == NULL) {
868        /* no VM */
869
870        node->m_vm_flow_var =  NULL;
871        node->m_vm_program  =  NULL;
872        node->m_vm_program_size =0;
873
874                /* allocate const mbuf */
875        rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size);
876        assert(m);
877
878        char *p = rte_pktmbuf_append(m, pkt_size);
879        assert(p);
880        /* copy the packet */
881        memcpy(p,stream_pkt,pkt_size);
882
883        update_mac_addr(stream,node,dir,p);
884
885        /* set the packet as a readonly */
886        node->set_cache_mbuf(m);
887
888        node->m_original_packet_data_prefix =0;
889    }else{
890
891        /* set the program */
892        TrexStream * local_mem_stream = node->m_ref_stream_info;
893
894        StreamVmDp  * lpDpVm = local_mem_stream->getDpVm();
895
896        node->m_vm_flow_var      = lpDpVm->clone_bss(); /* clone the flow var */
897        node->m_vm_program       = lpDpVm->get_program(); /* same ref to the program */
898        node->m_vm_program_size  = lpDpVm->get_program_size();
899
900
901        /* set the random seed if was set */
902        if ( lpDpVm->is_random_seed() ){
903            /* if we have random seed for this program */
904            if (stream->m_random_seed) {
905                node->set_random_seed(stream->m_random_seed);
906            }
907        }
908
909        /* we need to copy the object */
910        if ( pkt_size > lpDpVm->get_prefix_size() ) {
911            /* we need const packet */
912            uint16_t const_pkt_size  = pkt_size - lpDpVm->get_prefix_size() ;
913            rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), const_pkt_size );
914            assert(m);
915
916            char *p = rte_pktmbuf_append(m, const_pkt_size);
917            assert(p);
918
919            /* copy packet data */
920            memcpy(p,(stream_pkt + lpDpVm->get_prefix_size()),const_pkt_size);
921
922            node->set_const_mbuf(m);
923        }
924
925
926        if ( lpDpVm->is_pkt_size_var() ) {
927            // mark the node as varible size
928            node->set_var_pkt_size();
929        }
930
931
932        if (lpDpVm->get_prefix_size() > pkt_size ) {
933            lpDpVm->set_prefix_size(pkt_size);
934        }
935
936        /* copy the headr */
937        uint16_t header_size = lpDpVm->get_prefix_size();
938        assert(header_size);
939        node->alloc_prefix_header(header_size);
940        uint8_t *p=node->m_original_packet_data_prefix;
941        assert(p);
942
943        memcpy(p,stream_pkt , header_size);
944
945        update_mac_addr(stream,node,dir,(char *)p);
946
947        if (stream->m_cache_size > 0 ) {
948            /* we need to create cache of objects */
949            replay_vm_into_cache(stream, node);
950        }
951    }
952
953
954    CDpOneStream one_stream;
955
956    one_stream.m_dp_stream = node->m_ref_stream_info;
957    one_stream.m_node =node;
958
959    lp_port->m_active_nodes.push_back(one_stream);
960
961    /* schedule only if active */
962    if (node->m_state == CGenNodeStateless::ss_ACTIVE) {
963        m_core->m_node_gen.add_node((CGenNode *)node);
964    }
965}
966
967void
968TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj,
969                                   double duration,
970                                   int event_id) {
971
972
973    TrexStatelessDpPerPort * lp_port=get_port_db(obj->get_port_id());
974    lp_port->m_active_streams = 0;
975    lp_port->set_event_id(event_id);
976
977    /* update cur time */
978    if ( CGlobalInfo::is_realtime()  ){
979        m_core->m_cur_time_sec = now_sec() + SCHD_OFFSET_DTIME ;
980    }
981
982    /* no nodes in the list */
983    assert(lp_port->m_active_nodes.size()==0);
984
985    for (auto single_stream : obj->get_objects()) {
986        /* all commands should be for the same port */
987        assert(obj->get_port_id() == single_stream.m_stream->m_port_id);
988        add_stream(lp_port,single_stream.m_stream,obj);
989    }
990
991    uint32_t nodes = lp_port->m_active_nodes.size();
992    /* find next stream */
993    assert(nodes == obj->get_objects().size());
994
995    int cnt=0;
996
997    /* set the next_stream pointer  */
998    for (auto single_stream : obj->get_objects()) {
999
1000        if (single_stream.m_stream->is_dp_next_stream() ) {
1001            int stream_id = single_stream.m_stream->m_next_stream_id;
1002            assert(stream_id<nodes);
1003            /* point to the next stream , stream_id is fixed */
1004            lp_port->m_active_nodes[cnt].m_node->m_next_stream = lp_port->m_active_nodes[stream_id].m_node ;
1005        }
1006        cnt++;
1007    }
1008
1009    lp_port->m_state =TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
1010    m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
1011
1012
1013    if ( duration > 0.0 ){
1014        add_port_duration( duration ,obj->get_port_id(),event_id );
1015    }
1016
1017}
1018
1019
1020bool TrexStatelessDpCore::are_all_ports_idle(){
1021
1022    bool res=true;
1023    int i;
1024    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
1025        if ( m_ports[i].m_state != TrexStatelessDpPerPort::ppSTATE_IDLE ){
1026            res=false;
1027        }
1028    }
1029    return (res);
1030}
1031
1032
1033void
1034TrexStatelessDpCore::resume_traffic(uint8_t port_id){
1035
1036    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1037
1038    lp_port->resume_traffic(port_id);
1039}
1040
1041
1042void
1043TrexStatelessDpCore::pause_traffic(uint8_t port_id){
1044
1045    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1046
1047    lp_port->pause_traffic(port_id);
1048}
1049
1050
1051void
1052TrexStatelessDpCore::push_pcap(uint8_t port_id,
1053                               int event_id,
1054                               const std::string &pcap_filename,
1055                               double ipg_usec,
1056                               double speedup,
1057                               uint32_t count,
1058                               double duration) {
1059
1060    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1061
1062    lp_port->set_event_id(event_id);
1063
1064    /* delegate the command to the port */
1065    bool rc = lp_port->push_pcap(port_id, pcap_filename, ipg_usec, speedup, count);
1066    if (!rc) {
1067        /* report back that we stopped */
1068        CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
1069        TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
1070                                                                       port_id,
1071                                                                       event_id,
1072                                                                       false);
1073        ring->Enqueue((CGenNode *)event_msg);
1074        return;
1075    }
1076
1077
1078    if (duration > 0.0) {
1079        add_port_duration(duration, port_id, event_id);
1080    }
1081
1082     m_state = TrexStatelessDpCore::STATE_PCAP_TX;
1083}
1084
1085
1086void
1087TrexStatelessDpCore::update_traffic(uint8_t port_id, double factor) {
1088
1089    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1090
1091    lp_port->update_traffic(port_id, factor);
1092}
1093
1094
1095void
1096TrexStatelessDpCore::stop_traffic(uint8_t  port_id,
1097                                  bool     stop_on_id,
1098                                  int      event_id) {
1099    /* we cannot remove nodes not from the top of the queue so
1100       for every active node - make sure next time
1101       the scheduler invokes it, it will be free */
1102
1103    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1104    if ( lp_port->stop_traffic(port_id,stop_on_id,event_id) == false){
1105        return;
1106    }
1107
1108
1109    /* flush the TX queue before sending done message to the CP */
1110    m_core->flush_tx_queue();
1111
1112    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
1113    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
1114                                                                   port_id,
1115                                                                   lp_port->get_event_id());
1116    ring->Enqueue((CGenNode *)event_msg);
1117
1118}
1119
1120/**
1121 * handle a message from CP to DP
1122 *
1123 */
1124void
1125TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) {
1126    msg->handle(this);
1127    delete msg;
1128}
1129
1130void
1131TrexStatelessDpCore::barrier(uint8_t port_id, int event_id) {
1132
1133    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
1134    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
1135                                                                   port_id,
1136                                                                   event_id);
1137    ring->Enqueue((CGenNode *)event_msg);
1138}
1139
1140
1141/**
1142 * PCAP node
1143 */
1144bool CGenNodePCAP::create(uint8_t port_id,
1145                          pkt_dir_t dir,
1146                          socket_id_t socket_id,
1147                          const uint8_t *mac_addr,
1148                          const std::string &pcap_filename,
1149                          double ipg_usec,
1150                          double speedup,
1151                          uint32_t count) {
1152    std::stringstream ss;
1153
1154    m_type       = CGenNode::PCAP_PKT;
1155    m_flags      = 0;
1156    m_src_port   = 0;
1157    m_port_id    = port_id;
1158    m_count      = count;
1159
1160    /* mark this node as slow path */
1161    set_slow_path(true);
1162
1163    if (ipg_usec != -1) {
1164        /* fixed IPG */
1165        m_ipg_sec = usec_to_sec(ipg_usec / speedup);
1166        m_speedup = 0;
1167    } else {
1168        /* packet IPG */
1169        m_ipg_sec = -1;
1170        m_speedup  = speedup;
1171    }
1172
1173    /* copy MAC addr info */
1174    memcpy(m_mac_addr, mac_addr, 12);
1175
1176    /* set the dir */
1177    set_mbuf_dir(dir);
1178    set_socket_id(socket_id);
1179
1180    /* create the PCAP reader */
1181    m_reader = CCapReaderFactory::CreateReader((char *)pcap_filename.c_str(), 0, ss);
1182    if (!m_reader) {
1183        return false;
1184    }
1185
1186    m_raw_packet = new CCapPktRaw();
1187    if ( m_reader->ReadPacket(m_raw_packet) == false ){
1188        /* handle error */
1189        delete m_reader;
1190        return (false);
1191    }
1192
1193    /* this is the reference time */
1194    //m_base_time = m_raw_packet->get_time();
1195    m_last_pkt_time = m_raw_packet->get_time();
1196
1197    /* ready */
1198    m_state = PCAP_ACTIVE;
1199
1200    return true;
1201}
1202
1203/**
1204 * cleanup for PCAP node
1205 *
1206 * @author imarom (08-May-16)
1207 */
1208void CGenNodePCAP::destroy() {
1209
1210    if (m_raw_packet) {
1211        delete m_raw_packet;
1212        m_raw_packet = NULL;
1213    }
1214
1215    if (m_reader) {
1216        delete m_reader;
1217        m_reader = NULL;
1218    }
1219
1220    m_state = PCAP_INVALID;
1221}
1222
1223