trex_stateless_dp_core.cpp revision 8b1d07ff
1/*
2 Itay Marom
3 Hanoch Haim
4 Cisco Systems, Inc.
5*/
6
7/*
8Copyright (c) 2015-2016 Cisco Systems, Inc.
9
10Licensed under the Apache License, Version 2.0 (the "License");
11you may not use this file except in compliance with the License.
12You may obtain a copy of the License at
13
14    http://www.apache.org/licenses/LICENSE-2.0
15
16Unless required by applicable law or agreed to in writing, software
17distributed under the License is distributed on an "AS IS" BASIS,
18WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19See the License for the specific language governing permissions and
20limitations under the License.
21*/
22#include "bp_sim.h"
23#include "trex_stateless_dp_core.h"
24#include "trex_stateless_messaging.h"
25#include "trex_stream.h"
26#include "trex_stream_node.h"
27#include "trex_streams_compiler.h"
28#include "mbuf.h"
29
30
31
32
33void CGenNodeStateless::cache_mbuf_array_init(){
34    m_cache_size=0;
35    m_cache_array_cnt=0;
36}
37
38
39
40void CGenNodeStateless::cache_mbuf_array_copy(CGenNodeCacheMbuf *obj,
41                                              uint16_t size){
42
43    int i;
44    cache_mbuf_array_alloc(size);
45    for (i=0; i<size; i++) {
46        cache_mbuf_array_set(i,obj->m_array[i]);
47    }
48    cache_mbuf_array_set_const_mbuf(obj->m_mbuf_const);
49}
50
51
52rte_mbuf_t ** CGenNodeStateless::cache_mbuf_array_alloc(uint16_t size){
53
54    uint32_t buf_size = CGenNodeCacheMbuf::get_object_size(size);
55    /* TBD  replace with align, zero API */
56    m_cache_mbuf = (void *)malloc(buf_size);
57    assert(m_cache_mbuf);
58    memset(m_cache_mbuf,0,buf_size);
59
60    m_flags |= SL_NODE_CONST_MBUF_CACHE_ARRAY;
61    m_cache_size=size;
62    m_cache_array_cnt=0;
63    return ((rte_mbuf_t **)m_cache_mbuf);
64}
65
66void CGenNodeStateless::cache_mbuf_array_free(){
67
68    assert(m_cache_mbuf);
69    int i;
70    for (i=0; i<(int)m_cache_size; i++) {
71        rte_mbuf_t * m=cache_mbuf_array_get((uint16_t)i);
72        assert(m);
73        rte_pktmbuf_free(m);
74    }
75
76    /* free the const */
77    rte_mbuf_t * m=cache_mbuf_array_get_const_mbuf() ;
78    if (m) {
79        rte_pktmbuf_free(m);
80    }
81
82    free(m_cache_mbuf);
83    m_cache_mbuf=0;
84}
85
86
87rte_mbuf_t * CGenNodeStateless::cache_mbuf_array_get(uint16_t index){
88
89    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
90    return (p->m_array[index]);
91}
92
93void CGenNodeStateless::cache_mbuf_array_set_const_mbuf(rte_mbuf_t * m){
94    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
95    p->m_mbuf_const=m;
96}
97
98rte_mbuf_t * CGenNodeStateless::cache_mbuf_array_get_const_mbuf(){
99    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
100    return (p->m_mbuf_const);
101}
102
103
104void CGenNodeStateless::cache_mbuf_array_set(uint16_t index,
105                                             rte_mbuf_t * m){
106    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
107    p->m_array[index]=m;
108}
109
110
111void CDpOneStream::Delete(CFlowGenListPerThread   * core){
112    assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE);
113    core->free_node((CGenNode *)m_node);
114    delete m_dp_stream;
115    m_node=0;
116    m_dp_stream=0;
117}
118
119void CDpOneStream::DeleteOnlyStream(){
120    assert(m_dp_stream);
121    delete m_dp_stream;
122    m_dp_stream=0;
123}
124
125int CGenNodeStateless::get_stream_id(){
126    if (m_state ==CGenNodeStateless::ss_FREE_RESUSE) {
127        return (-1); // not valid
128    }
129    assert(m_ref_stream_info);
130    return ((int)m_ref_stream_info->m_stream_id);
131}
132
133
134void CGenNodeStateless::DumpHeader(FILE *fd){
135    fprintf(fd," pkt_id, time, port , action , state, stream_id , stype , m-burst# , burst# \n");
136
137}
138void CGenNodeStateless::Dump(FILE *fd){
139    fprintf(fd," %2.4f, %3lu, %s,%s, %3d, %s, %3lu, %3lu  \n",
140            m_time,
141            (ulong)m_port_id,
142            "s-pkt", //action
143            get_stream_state_str(m_state ).c_str(),
144            get_stream_id(),   //stream_id
145            TrexStream::get_stream_type_str(m_stream_type).c_str(), //stype
146            (ulong)m_multi_bursts,
147            (ulong)m_single_burst
148            );
149}
150
151
152
153void CGenNodeStateless::refresh_vm_bss(){
154    if ( m_vm_flow_var ) {
155        StreamVmDp  * vm_s=m_ref_stream_info->m_vm_dp;
156        assert(vm_s);
157        memcpy(m_vm_flow_var,vm_s->get_bss(),vm_s->get_bss_size());
158
159        if ( vm_s->is_random_seed() ){
160            /* if we have random seed for this program */
161            if (m_ref_stream_info->m_random_seed) {
162                set_random_seed(m_ref_stream_info->m_random_seed);
163            }
164        }
165    }
166}
167
168
169/**
170 * this function called when stream restart after it was inactive
171 */
172void CGenNodeStateless::refresh(){
173
174    /* refill the stream info */
175    m_single_burst    = m_single_burst_refill;
176    m_multi_bursts    = m_ref_stream_info->m_num_bursts;
177    m_state           = CGenNodeStateless::ss_ACTIVE;
178
179    /* refresh init value */
180#if 0
181    /* TBD should add a JSON varible for that */
182    refresh_vm_bss();
183#endif
184}
185
186
187void CGenNodeCommand::free_command(){
188
189    assert(m_cmd);
190    m_cmd->on_node_remove();
191    delete m_cmd;
192}
193
194
195std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state){
196    std::string res;
197
198    switch (stream_state) {
199    case CGenNodeStateless::ss_FREE_RESUSE :
200         res="FREE    ";
201        break;
202    case CGenNodeStateless::ss_INACTIVE :
203        res="INACTIVE ";
204        break;
205    case CGenNodeStateless::ss_ACTIVE :
206        res="ACTIVE   ";
207        break;
208    default:
209        res="Unknow   ";
210    };
211    return(res);
212}
213
214
215rte_mbuf_t   * CGenNodeStateless::alloc_node_with_vm(){
216
217    rte_mbuf_t        * m;
218    /* alloc small packet buffer*/
219    uint16_t prefix_size = prefix_header_size();
220    m = CGlobalInfo::pktmbuf_alloc( get_socket_id(), prefix_size );
221    if (m==0) {
222        return (m);
223    }
224    /* TBD remove this, should handle cases of error */
225    assert(m);
226    char *p=rte_pktmbuf_append(m, prefix_size);
227    memcpy( p ,m_original_packet_data_prefix, prefix_size);
228
229
230    /* run the VM program */
231    StreamDPVmInstructionsRunner runner;
232
233    runner.run( (uint32_t*)m_vm_flow_var,
234                m_vm_program_size,
235                m_vm_program,
236                m_vm_flow_var,
237                (uint8_t*)p);
238
239    uint16_t pkt_new_size=runner.get_new_pkt_size();
240    if ( likely( pkt_new_size == 0) ) {
241        /* no packet size change */
242        rte_mbuf_t * m_const = get_const_mbuf();
243        if (  m_const != NULL) {
244            utl_rte_pktmbuf_add_after(m,m_const);
245        }
246        return (m);
247    }
248
249    /* packet size change there are a few changes */
250    rte_mbuf_t * m_const = get_const_mbuf();
251    if ( (m_const == 0 ) || (pkt_new_size<=prefix_size) ) {
252        /* one mbuf , just trim it */
253        m->data_len = pkt_new_size;
254        m->pkt_len  = pkt_new_size;
255        return (m);
256    }
257
258    rte_mbuf_t * mi= CGlobalInfo::pktmbuf_alloc_small(get_socket_id());
259    assert(mi);
260    rte_pktmbuf_attach(mi,m_const);
261    utl_rte_pktmbuf_add_after2(m,mi);
262
263    if ( pkt_new_size < m->pkt_len) {
264        /* need to trim it */
265        mi->data_len = (pkt_new_size - prefix_size);
266        m->pkt_len   = pkt_new_size;
267    }
268    return (m);
269}
270
271void CGenNodeStateless::free_stl_vm_buf(){
272        rte_mbuf_t * m ;
273         m=get_const_mbuf();
274         if (m) {
275             rte_pktmbuf_free(m); /* reduce the ref counter */
276             /* clear the const marker */
277             clear_const_mbuf();
278         }
279
280         free_prefix_header();
281
282         if (m_vm_flow_var) {
283             /* free flow var */
284             free(m_vm_flow_var);
285             m_vm_flow_var=0;
286         }
287}
288
289
290
291void CGenNodeStateless::free_stl_node(){
292
293    if ( is_cache_mbuf_array() ){
294        /* do we have cache of mbuf pre allocated */
295        cache_mbuf_array_free();
296    }else{
297        /* if we have cache mbuf free it */
298        rte_mbuf_t * m=get_cache_mbuf();
299        if (m) {
300                rte_pktmbuf_free(m);
301                m_cache_mbuf=0;
302        }
303    }
304    free_stl_vm_buf();
305}
306
307
308bool TrexStatelessDpPerPort::update_number_of_active_streams(uint32_t d){
309    m_active_streams-=d; /* reduce the number of streams */
310    if (m_active_streams == 0) {
311        return (true);
312    }
313    return (false);
314}
315
316bool TrexStatelessDpPerPort::resume_traffic(uint8_t port_id){
317
318    /* we are working with continues streams so we must be in transmit mode */
319    assert(m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE);
320
321    for (auto dp_stream : m_active_nodes) {
322        CGenNodeStateless * node =dp_stream.m_node;
323        assert(node->get_port_id() == port_id);
324        assert(node->is_pause() == true);
325        node->set_pause(false);
326    }
327    m_state = TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
328    return (true);
329}
330
331bool TrexStatelessDpPerPort::update_traffic(uint8_t port_id, double factor) {
332
333    assert( (m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING ||
334            (m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE)) );
335
336    for (auto dp_stream : m_active_nodes) {
337        CGenNodeStateless * node = dp_stream.m_node;
338        assert(node->get_port_id() == port_id);
339
340        node->update_rate(factor);
341    }
342
343    return (true);
344}
345
346bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){
347
348    /* we are working with continues streams so we must be in transmit mode */
349    assert(m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING);
350
351    for (auto dp_stream : m_active_nodes) {
352        CGenNodeStateless * node =dp_stream.m_node;
353        assert(node->get_port_id() == port_id);
354        assert(node->is_pause() == false);
355        node->set_pause(true);
356    }
357    m_state = TrexStatelessDpPerPort::ppSTATE_PAUSE;
358    return (true);
359}
360
361bool TrexStatelessDpPerPort::push_pcap(uint8_t port_id,
362                                       const std::string &pcap_filename,
363                                       double ipg_usec,
364                                       double speedup,
365                                       uint32_t count) {
366
367    /* push pcap can only happen on an idle port from the core prespective */
368    assert(m_state == TrexStatelessDpPerPort::ppSTATE_IDLE);
369
370    CGenNodePCAP *pcap_node = m_core->allocate_pcap_node();
371    if (!pcap_node) {
372        return (false);
373    }
374
375    pkt_dir_t dir          = m_core->m_node_gen.m_v_if->port_id_to_dir(port_id);
376    socket_id_t socket_id  = m_core->m_node_gen.m_socket_id;
377
378    uint8_t mac_addr[12];
379    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, mac_addr);
380
381    bool rc = pcap_node->create(port_id,
382                                dir,
383                                socket_id,
384                                mac_addr,
385                                pcap_filename,
386                                ipg_usec,
387                                speedup,
388                                count);
389    if (!rc) {
390        m_core->free_node((CGenNode *)pcap_node);
391        return (false);
392    }
393
394    /* schedule the node for now */
395    pcap_node->m_time = m_core->m_cur_time_sec;
396    m_core->m_node_gen.add_node((CGenNode *)pcap_node);
397
398    /* hold a pointer to the node */
399    assert(m_active_pcap_node == NULL);
400    m_active_pcap_node = pcap_node;
401
402    m_state = TrexStatelessDpPerPort::ppSTATE_PCAP_TX;
403    return (true);
404}
405
406
407bool TrexStatelessDpPerPort::stop_traffic(uint8_t  port_id,
408                                          bool     stop_on_id,
409                                          int      event_id){
410
411
412    if (m_state == TrexStatelessDpPerPort::ppSTATE_IDLE) {
413        assert(m_active_streams==0);
414        return false;
415    }
416
417    /* there could be race of stop after stop */
418    if ( stop_on_id ) {
419        if (event_id != m_event_id){
420            /* we can't stop it is an old message */
421            return false;
422        }
423    }
424
425    for (auto dp_stream : m_active_nodes) {
426        CGenNodeStateless * node =dp_stream.m_node;
427        assert(node->get_port_id() == port_id);
428        if ( node->get_state() == CGenNodeStateless::ss_ACTIVE) {
429            node->mark_for_free();
430            m_active_streams--;
431            dp_stream.DeleteOnlyStream();
432
433        }else{
434            dp_stream.Delete(m_core);
435        }
436    }
437
438    /* check for active PCAP node */
439    if (m_active_pcap_node) {
440        /* when got async stop from outside or duration */
441        if (m_active_pcap_node->is_active()) {
442            m_active_pcap_node->mark_for_free();
443        } else {
444            /* graceful stop - node was put out by the scheduler */
445            m_core->free_node( (CGenNode *)m_active_pcap_node);
446        }
447
448        m_active_pcap_node = NULL;
449    }
450
451    /* active stream should be zero */
452    assert(m_active_streams==0);
453    m_active_nodes.clear();
454    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
455    return (true);
456}
457
458
459void TrexStatelessDpPerPort::create(CFlowGenListPerThread   *  core){
460    m_core=core;
461    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
462    m_active_streams=0;
463    m_active_nodes.clear();
464    m_active_pcap_node = NULL;
465}
466
467
468
469void
470TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) {
471    m_thread_id = thread_id;
472    m_core = core;
473    m_local_port_offset = 2*core->getDualPortId();
474
475    CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
476
477    m_ring_from_cp = cp_dp->getRingCpToDp(thread_id);
478    m_ring_to_cp   = cp_dp->getRingDpToCp(thread_id);
479
480    m_state = STATE_IDLE;
481
482    int i;
483    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
484        m_ports[i].create(core);
485    }
486}
487
488
489/* move to the next stream, old stream move to INACTIVE */
490bool TrexStatelessDpCore::set_stateless_next_node(CGenNodeStateless * cur_node,
491                                                  CGenNodeStateless * next_node){
492
493    assert(cur_node);
494    TrexStatelessDpPerPort * lp_port = get_port_db(cur_node->m_port_id);
495    bool schedule =false;
496
497    bool to_stop_port=false;
498
499    if (next_node == NULL) {
500        /* there is no next stream , reduce the number of active streams*/
501        to_stop_port = lp_port->update_number_of_active_streams(1);
502
503    }else{
504        uint8_t state=next_node->get_state();
505
506        /* can't be FREE_RESUSE */
507        assert(state != CGenNodeStateless::ss_FREE_RESUSE);
508        if (state == CGenNodeStateless::ss_INACTIVE ) {
509
510            if (cur_node->m_action_counter > 0) {
511                cur_node->m_action_counter--;
512                if (cur_node->m_action_counter==0) {
513                    to_stop_port = lp_port->update_number_of_active_streams(1);
514                }else{
515                    /* refill start info and scedule, no update in active streams  */
516                    next_node->refresh();
517                    schedule = true;
518                }
519            }else{
520                /* refill start info and scedule, no update in active streams  */
521                next_node->refresh();
522                schedule = true;
523            }
524
525        }else{
526            to_stop_port = lp_port->update_number_of_active_streams(1);
527        }
528    }
529
530    if ( to_stop_port ) {
531        /* call stop port explictly to move the state */
532        stop_traffic(cur_node->m_port_id,false,0);
533    }
534
535    return ( schedule );
536}
537
538
539
540/**
541 * in idle state loop, the processor most of the time sleeps
542 * and periodically checks for messages
543 *
544 * @author imarom (01-Nov-15)
545 */
546void
547TrexStatelessDpCore::idle_state_loop() {
548
549    const int SHORT_DELAY_MS    = 2;
550    const int LONG_DELAY_MS     = 50;
551    const int DEEP_SLEEP_LIMIT  = 2000;
552
553    int counter = 0;
554
555    while (m_state == STATE_IDLE) {
556        m_core->m_node_gen.m_v_if->flush_dp_rx_queue();
557        bool had_msg = periodic_check_for_cp_messages();
558        if (had_msg) {
559            counter = 0;
560            continue;
561        }
562
563        /* enter deep sleep only if enough time had passed */
564        if (counter < DEEP_SLEEP_LIMIT) {
565            delay(SHORT_DELAY_MS);
566            counter++;
567        } else {
568            delay(LONG_DELAY_MS);
569        }
570
571    }
572}
573
574
575
576void TrexStatelessDpCore::quit_main_loop(){
577    m_core->set_terminate_mode(true); /* mark it as terminated */
578    m_state = STATE_TERMINATE;
579    add_global_duration(0.0001);
580}
581
582
583/**
584 * scehduler runs when traffic exists
585 * it will return when no more transmitting is done on this
586 * core
587 *
588 * @author imarom (01-Nov-15)
589 */
590void
591TrexStatelessDpCore::start_scheduler() {
592    /* creates a maintenace job using the scheduler */
593    CGenNode * node_sync = m_core->create_node() ;
594    node_sync->m_type = CGenNode::FLOW_SYNC;
595    node_sync->m_time = m_core->m_cur_time_sec + SYNC_TIME_OUT;
596    m_core->m_node_gen.add_node(node_sync);
597
598    double old_offset = 0.0;
599    m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset);
600    /* bail out in case of terminate */
601    if (m_state != TrexStatelessDpCore::STATE_TERMINATE) {
602        m_core->m_node_gen.close_file(m_core);
603        m_state = STATE_IDLE; /* we exit from all ports and we have nothing to do, we move to IDLE state */
604    }
605}
606
607
608void
609TrexStatelessDpCore::run_once(){
610
611    idle_state_loop();
612
613    if ( m_state == STATE_TERMINATE ){
614        return;
615    }
616
617    start_scheduler();
618}
619
620
621
622
623void
624TrexStatelessDpCore::start() {
625
626    while (true) {
627        run_once();
628
629        if ( m_core->is_terminated_by_master() ) {
630            break;
631        }
632    }
633}
634
635/* only if both port are idle we can exit */
636void
637TrexStatelessDpCore::schedule_exit(){
638
639    CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
640
641    node->m_type = CGenNode::COMMAND;
642
643    node->m_cmd = new TrexStatelessDpCanQuit();
644
645    /* make sure it will be scheduled after the current node */
646    node->m_time = m_core->m_cur_time_sec ;
647
648    m_core->m_node_gen.add_node((CGenNode *)node);
649}
650
651
652void
653TrexStatelessDpCore::add_global_duration(double duration){
654    if (duration > 0.0) {
655        CGenNode *node = m_core->create_node() ;
656
657        node->m_type = CGenNode::EXIT_SCHED;
658
659        /* make sure it will be scheduled after the current node */
660        node->m_time = m_core->m_cur_time_sec + duration ;
661
662        m_core->m_node_gen.add_node(node);
663    }
664}
665
666/* add per port exit */
667void
668TrexStatelessDpCore::add_port_duration(double duration,
669                                       uint8_t port_id,
670                                       int event_id){
671    if (duration > 0.0) {
672        CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
673
674        node->m_type = CGenNode::COMMAND;
675
676        /* make sure it will be scheduled after the current node */
677        node->m_time = m_core->m_cur_time_sec + duration ;
678
679        TrexStatelessDpStop * cmd=new TrexStatelessDpStop(port_id);
680
681
682        /* test this */
683        m_core->m_non_active_nodes++;
684        cmd->set_core_ptr(m_core);
685        cmd->set_event_id(event_id);
686        cmd->set_wait_for_event_id(true);
687
688        node->m_cmd = cmd;
689
690        m_core->m_node_gen.add_node((CGenNode *)node);
691    }
692}
693
694
695void TrexStatelessDpCore::update_mac_addr(TrexStream * stream,
696                                          CGenNodeStateless *node,
697                                          pkt_dir_t dir,
698                                          char *raw_pkt){
699    bool              ov_src = stream->get_override_src_mac_by_pkt_data();
700    TrexStream::stream_dst_mac_t  ov_dst = stream->get_override_dst_mac_mode();
701
702
703    if ( (ov_src == true) && (ov_dst == TrexStream::stPKT) ) {
704        /* nothing to do, take from the packet both */
705        return;
706    }
707
708        /* take from cfg_file */
709    if ( (ov_src == false) &&
710         (ov_dst == TrexStream::stCFG_FILE) ){
711
712          m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*)raw_pkt);
713          return;
714    }
715
716    /* save the pkt*/
717    char tmp_pkt[12];
718    memcpy(tmp_pkt,raw_pkt,12);
719
720    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*)raw_pkt);
721
722    if ((ov_src == true) && (ov_dst == TrexStream::stCFG_FILE)) {
723        memcpy(raw_pkt+6,tmp_pkt+6,6);
724    }
725
726    if ((ov_src == false) && (ov_dst == TrexStream::stPKT)) {
727        memcpy(raw_pkt,tmp_pkt,6);
728    }
729}
730
731
732void TrexStatelessDpCore::replay_vm_into_cache(TrexStream * stream,
733                                               CGenNodeStateless *node){
734
735    uint16_t      cache_size = stream->m_cache_size;
736    assert(cache_size>0);
737    rte_mbuf_t * m=0;
738
739    uint32_t buf_size = CGenNodeCacheMbuf::get_object_size(cache_size);
740    CGenNodeCacheMbuf * p = (CGenNodeCacheMbuf *)malloc(buf_size);
741    assert(p);
742    memset(p,0,buf_size);
743
744    int i;
745    for (i=0; i<cache_size; i++) {
746        p->m_array[i] =  node->alloc_node_with_vm();
747    }
748    /* save const */
749    m=node->get_const_mbuf();
750    if (m) {
751        p->m_mbuf_const=m;
752        rte_pktmbuf_refcnt_update(m,1);
753    }
754
755    /* free all VM and const mbuf */
756    node->free_stl_vm_buf();
757
758    /* copy to local node meory */
759    node->cache_mbuf_array_copy(p,cache_size);
760
761    /* free the memory */
762    free(p);
763}
764
765
766void
767TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port,
768                                TrexStream * stream,
769                                TrexStreamsCompiledObj *comp) {
770
771    CGenNodeStateless *node = m_core->create_node_sl();
772
773    node->cache_mbuf_array_init();
774    node->m_batch_size=0;
775
776    /* add periodic */
777    node->m_cache_mbuf=0;
778    node->m_type = CGenNode::STATELESS_PKT;
779
780    node->m_action_counter = stream->m_action_count;
781
782    /* clone the stream from control plane memory to DP memory */
783    node->m_ref_stream_info = stream->clone();
784    /* no need for this memory anymore on the control plane memory */
785    stream->release_dp_object();
786
787    node->m_next_stream=0; /* will be fixed later */
788
789    if ( stream->m_self_start ){
790        /* if self start it is in active mode */
791        node->m_state =CGenNodeStateless::ss_ACTIVE;
792        lp_port->m_active_streams++;
793    }else{
794        node->m_state =CGenNodeStateless::ss_INACTIVE;
795    }
796
797    node->m_time = m_core->m_cur_time_sec + stream->get_start_delay_sec();
798
799    pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(stream->m_port_id);
800    node->m_flags = 0;
801    node->m_src_port =0;
802    node->m_original_packet_data_prefix = 0;
803
804    if (stream->m_rx_check.m_enabled) {
805        node->set_stat_needed();
806        uint8_t hw_id = stream->m_rx_check.m_hw_id;
807        assert (hw_id < MAX_FLOW_STATS);
808        node->set_stat_hw_id(hw_id);
809    }
810
811    /* set socket id */
812    node->set_socket_id(m_core->m_node_gen.m_socket_id);
813
814    /* build a mbuf from a packet */
815
816    uint16_t pkt_size = stream->m_pkt.len;
817    const uint8_t *stream_pkt = stream->m_pkt.binary;
818
819    node->m_pause =0;
820    node->m_stream_type = stream->m_type;
821    node->m_next_time_offset = 1.0 / stream->get_pps();
822    node->m_null_stream = (stream->m_null_stream ? 1 : 0);
823
824    /* stateless specific fields */
825    switch ( stream->m_type ) {
826
827    case TrexStream::stCONTINUOUS :
828        node->m_single_burst=0;
829        node->m_single_burst_refill=0;
830        node->m_multi_bursts=0;
831        break;
832
833    case TrexStream::stSINGLE_BURST :
834        node->m_stream_type             = TrexStream::stMULTI_BURST;
835        node->m_single_burst            = stream->m_burst_total_pkts;
836        node->m_single_burst_refill     = stream->m_burst_total_pkts;
837        node->m_multi_bursts            = 1;  /* single burst in multi burst of 1 */
838        break;
839
840    case TrexStream::stMULTI_BURST :
841        node->m_single_burst        = stream->m_burst_total_pkts;
842        node->m_single_burst_refill = stream->m_burst_total_pkts;
843        node->m_multi_bursts        = stream->m_num_bursts;
844        break;
845    default:
846
847        assert(0);
848    };
849
850    node->m_port_id = stream->m_port_id;
851
852    /* set dir 0 or 1 client or server */
853    node->set_mbuf_cache_dir(dir);
854
855
856    if (node->m_ref_stream_info->getDpVm() == NULL) {
857        /* no VM */
858
859        node->m_vm_flow_var =  NULL;
860        node->m_vm_program  =  NULL;
861        node->m_vm_program_size =0;
862
863                /* allocate const mbuf */
864        rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size);
865        assert(m);
866
867        char *p = rte_pktmbuf_append(m, pkt_size);
868        assert(p);
869        /* copy the packet */
870        memcpy(p,stream_pkt,pkt_size);
871
872        update_mac_addr(stream,node,dir,p);
873
874        /* set the packet as a readonly */
875        node->set_cache_mbuf(m);
876
877        node->m_original_packet_data_prefix =0;
878    }else{
879
880        /* set the program */
881        TrexStream * local_mem_stream = node->m_ref_stream_info;
882
883        StreamVmDp  * lpDpVm = local_mem_stream->getDpVm();
884
885        node->m_vm_flow_var      = lpDpVm->clone_bss(); /* clone the flow var */
886        node->m_vm_program       = lpDpVm->get_program(); /* same ref to the program */
887        node->m_vm_program_size  = lpDpVm->get_program_size();
888
889
890        /* set the random seed if was set */
891        if ( lpDpVm->is_random_seed() ){
892            /* if we have random seed for this program */
893            if (stream->m_random_seed) {
894                node->set_random_seed(stream->m_random_seed);
895            }
896        }
897
898        /* we need to copy the object */
899        if ( pkt_size > lpDpVm->get_prefix_size() ) {
900            /* we need const packet */
901            uint16_t const_pkt_size  = pkt_size - lpDpVm->get_prefix_size() ;
902            rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), const_pkt_size );
903            assert(m);
904
905            char *p = rte_pktmbuf_append(m, const_pkt_size);
906            assert(p);
907
908            /* copy packet data */
909            memcpy(p,(stream_pkt + lpDpVm->get_prefix_size()),const_pkt_size);
910
911            node->set_const_mbuf(m);
912        }
913
914
915        if ( lpDpVm->is_pkt_size_var() ) {
916            // mark the node as varible size
917            node->set_var_pkt_size();
918        }
919
920
921        if (lpDpVm->get_prefix_size() > pkt_size ) {
922            lpDpVm->set_prefix_size(pkt_size);
923        }
924
925        /* copy the headr */
926        uint16_t header_size = lpDpVm->get_prefix_size();
927        assert(header_size);
928        node->alloc_prefix_header(header_size);
929        uint8_t *p=node->m_original_packet_data_prefix;
930        assert(p);
931
932        memcpy(p,stream_pkt , header_size);
933
934        update_mac_addr(stream,node,dir,(char *)p);
935
936        if (stream->m_cache_size > 0 ) {
937            /* we need to create cache of objects */
938            replay_vm_into_cache(stream, node);
939        }
940    }
941
942
943    CDpOneStream one_stream;
944
945    one_stream.m_dp_stream = node->m_ref_stream_info;
946    one_stream.m_node =node;
947
948    lp_port->m_active_nodes.push_back(one_stream);
949
950    /* schedule only if active */
951    if (node->m_state == CGenNodeStateless::ss_ACTIVE) {
952        m_core->m_node_gen.add_node((CGenNode *)node);
953    }
954}
955
956void
957TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj,
958                                   double duration,
959                                   int event_id) {
960
961
962    TrexStatelessDpPerPort * lp_port=get_port_db(obj->get_port_id());
963    lp_port->m_active_streams = 0;
964    lp_port->set_event_id(event_id);
965
966    /* update cur time */
967    if ( CGlobalInfo::is_realtime()  ){
968        m_core->m_cur_time_sec = now_sec() + SCHD_OFFSET_DTIME ;
969    }
970
971    /* no nodes in the list */
972    assert(lp_port->m_active_nodes.size()==0);
973
974    for (auto single_stream : obj->get_objects()) {
975        /* all commands should be for the same port */
976        assert(obj->get_port_id() == single_stream.m_stream->m_port_id);
977        add_stream(lp_port,single_stream.m_stream,obj);
978    }
979
980    uint32_t nodes = lp_port->m_active_nodes.size();
981    /* find next stream */
982    assert(nodes == obj->get_objects().size());
983
984    int cnt=0;
985
986    /* set the next_stream pointer  */
987    for (auto single_stream : obj->get_objects()) {
988
989        if (single_stream.m_stream->is_dp_next_stream() ) {
990            int stream_id = single_stream.m_stream->m_next_stream_id;
991            assert(stream_id<nodes);
992            /* point to the next stream , stream_id is fixed */
993            lp_port->m_active_nodes[cnt].m_node->m_next_stream = lp_port->m_active_nodes[stream_id].m_node ;
994        }
995        cnt++;
996    }
997
998    lp_port->m_state =TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
999    m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
1000
1001
1002    if ( duration > 0.0 ){
1003        add_port_duration( duration ,obj->get_port_id(),event_id );
1004    }
1005
1006}
1007
1008
1009bool TrexStatelessDpCore::are_all_ports_idle(){
1010
1011    bool res=true;
1012    int i;
1013    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
1014        if ( m_ports[i].m_state != TrexStatelessDpPerPort::ppSTATE_IDLE ){
1015            res=false;
1016        }
1017    }
1018    return (res);
1019}
1020
1021
1022void
1023TrexStatelessDpCore::resume_traffic(uint8_t port_id){
1024
1025    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1026
1027    lp_port->resume_traffic(port_id);
1028}
1029
1030
1031void
1032TrexStatelessDpCore::pause_traffic(uint8_t port_id){
1033
1034    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1035
1036    lp_port->pause_traffic(port_id);
1037}
1038
1039
1040void
1041TrexStatelessDpCore::push_pcap(uint8_t port_id,
1042                               int event_id,
1043                               const std::string &pcap_filename,
1044                               double ipg_usec,
1045                               double speedup,
1046                               uint32_t count,
1047                               double duration) {
1048
1049    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1050
1051    lp_port->set_event_id(event_id);
1052
1053    /* delegate the command to the port */
1054    bool rc = lp_port->push_pcap(port_id, pcap_filename, ipg_usec, speedup, count);
1055    if (!rc) {
1056        /* report back that we stopped */
1057        CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
1058        TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
1059                                                                       port_id,
1060                                                                       event_id,
1061                                                                       false);
1062        ring->Enqueue((CGenNode *)event_msg);
1063        return;
1064    }
1065
1066
1067    if (duration > 0.0) {
1068        add_port_duration(duration, port_id, event_id);
1069    }
1070
1071     m_state = TrexStatelessDpCore::STATE_PCAP_TX;
1072}
1073
1074
1075void
1076TrexStatelessDpCore::update_traffic(uint8_t port_id, double factor) {
1077
1078    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1079
1080    lp_port->update_traffic(port_id, factor);
1081}
1082
1083
1084void
1085TrexStatelessDpCore::stop_traffic(uint8_t  port_id,
1086                                  bool     stop_on_id,
1087                                  int      event_id) {
1088    /* we cannot remove nodes not from the top of the queue so
1089       for every active node - make sure next time
1090       the scheduler invokes it, it will be free */
1091
1092    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1093    if ( lp_port->stop_traffic(port_id,stop_on_id,event_id) == false){
1094        return;
1095    }
1096
1097
1098    /* flush the TX queue before sending done message to the CP */
1099    m_core->flush_tx_queue();
1100
1101    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
1102    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
1103                                                                   port_id,
1104                                                                   lp_port->get_event_id());
1105    ring->Enqueue((CGenNode *)event_msg);
1106
1107}
1108
1109/**
1110 * handle a message from CP to DP
1111 *
1112 */
1113void
1114TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) {
1115    msg->handle(this);
1116    delete msg;
1117}
1118
1119void
1120TrexStatelessDpCore::barrier(uint8_t port_id, int event_id) {
1121
1122    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
1123    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
1124                                                                   port_id,
1125                                                                   event_id);
1126    ring->Enqueue((CGenNode *)event_msg);
1127}
1128
1129
1130/**
1131 * PCAP node
1132 */
1133bool CGenNodePCAP::create(uint8_t port_id,
1134                          pkt_dir_t dir,
1135                          socket_id_t socket_id,
1136                          const uint8_t *mac_addr,
1137                          const std::string &pcap_filename,
1138                          double ipg_usec,
1139                          double speedup,
1140                          uint32_t count) {
1141    std::stringstream ss;
1142
1143    m_type       = CGenNode::PCAP_PKT;
1144    m_flags      = 0;
1145    m_src_port   = 0;
1146    m_port_id    = port_id;
1147    m_count      = count;
1148
1149    /* mark this node as slow path */
1150    set_slow_path(true);
1151
1152    if (ipg_usec != -1) {
1153        /* fixed IPG */
1154        m_ipg_sec = usec_to_sec(ipg_usec / speedup);
1155        m_speedup = 0;
1156    } else {
1157        /* packet IPG */
1158        m_ipg_sec = -1;
1159        m_speedup  = speedup;
1160    }
1161
1162    /* copy MAC addr info */
1163    memcpy(m_mac_addr, mac_addr, 12);
1164
1165    /* set the dir */
1166    set_mbuf_dir(dir);
1167    set_socket_id(socket_id);
1168
1169    /* create the PCAP reader */
1170    m_reader = CCapReaderFactory::CreateReader((char *)pcap_filename.c_str(), 0, ss);
1171    if (!m_reader) {
1172        return false;
1173    }
1174
1175    m_raw_packet = new CCapPktRaw();
1176    if ( m_reader->ReadPacket(m_raw_packet) == false ){
1177        /* handle error */
1178        delete m_reader;
1179        return (false);
1180    }
1181
1182    /* this is the reference time */
1183    //m_base_time = m_raw_packet->get_time();
1184    m_last_pkt_time = m_raw_packet->get_time();
1185
1186    /* ready */
1187    m_state = PCAP_ACTIVE;
1188
1189    return true;
1190}
1191
1192/**
1193 * cleanup for PCAP node
1194 *
1195 * @author imarom (08-May-16)
1196 */
1197void CGenNodePCAP::destroy() {
1198
1199    if (m_raw_packet) {
1200        delete m_raw_packet;
1201        m_raw_packet = NULL;
1202    }
1203
1204    if (m_reader) {
1205        delete m_reader;
1206        m_reader = NULL;
1207    }
1208
1209    m_state = PCAP_INVALID;
1210}
1211
1212