trex_stateless_dp_core.cpp revision db9145d2
1/*
2 Itay Marom
3 Hanoch Haim
4 Cisco Systems, Inc.
5*/
6
7/*
8Copyright (c) 2015-2016 Cisco Systems, Inc.
9
10Licensed under the Apache License, Version 2.0 (the "License");
11you may not use this file except in compliance with the License.
12You may obtain a copy of the License at
13
14    http://www.apache.org/licenses/LICENSE-2.0
15
16Unless required by applicable law or agreed to in writing, software
17distributed under the License is distributed on an "AS IS" BASIS,
18WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19See the License for the specific language governing permissions and
20limitations under the License.
21*/
22#include "bp_sim.h"
23#include "trex_stateless_dp_core.h"
24#include "trex_stateless_messaging.h"
25#include "trex_stream.h"
26#include "trex_stream_node.h"
27#include "trex_streams_compiler.h"
28
29void CDpOneStream::Delete(CFlowGenListPerThread   * core){
30    assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE);
31    core->free_node((CGenNode *)m_node);
32    delete m_dp_stream;
33    m_node=0;
34    m_dp_stream=0;
35}
36
37void CDpOneStream::DeleteOnlyStream(){
38    assert(m_dp_stream);
39    delete m_dp_stream;
40    m_dp_stream=0;
41}
42
43int CGenNodeStateless::get_stream_id(){
44    if (m_state ==CGenNodeStateless::ss_FREE_RESUSE) {
45        return (-1); // not valid
46    }
47    assert(m_ref_stream_info);
48    return ((int)m_ref_stream_info->m_stream_id);
49}
50
51
52void CGenNodeStateless::DumpHeader(FILE *fd){
53    fprintf(fd," pkt_id, time, port , action , state, stream_id , stype , m-burst# , burst# \n");
54
55}
56void CGenNodeStateless::Dump(FILE *fd){
57    fprintf(fd," %2.4f, %3lu, %s,%s, %3d, %s, %3lu, %3lu  \n",
58            m_time,
59            (ulong)m_port_id,
60            "s-pkt", //action
61            get_stream_state_str(m_state ).c_str(),
62            get_stream_id(),   //stream_id
63            TrexStream::get_stream_type_str(m_stream_type).c_str(), //stype
64            (ulong)m_multi_bursts,
65            (ulong)m_single_burst
66            );
67}
68
69
70
71void CGenNodeStateless::refresh_vm_bss(){
72    if ( m_vm_flow_var ) {
73        StreamVmDp  * vm_s=m_ref_stream_info->m_vm_dp;
74        assert(vm_s);
75        memcpy(m_vm_flow_var,vm_s->get_bss(),vm_s->get_bss_size());
76
77        if ( vm_s->is_random_seed() ){
78            /* if we have random seed for this program */
79            if (m_ref_stream_info->m_random_seed) {
80                set_random_seed(m_ref_stream_info->m_random_seed);
81            }
82        }
83    }
84}
85
86
87/**
88 * this function called when stream restart after it was inactive
89 */
90void CGenNodeStateless::refresh(){
91
92    /* refill the stream info */
93    m_single_burst    = m_single_burst_refill;
94    m_multi_bursts    = m_ref_stream_info->m_num_bursts;
95    m_state           = CGenNodeStateless::ss_ACTIVE;
96
97    /* refresh init value */
98#if 0
99    /* TBD should add a JSON varible for that */
100    refresh_vm_bss();
101#endif
102}
103
104
105void CGenNodeCommand::free_command(){
106
107    assert(m_cmd);
108    m_cmd->on_node_remove();
109    delete m_cmd;
110}
111
112
113std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state){
114    std::string res;
115
116    switch (stream_state) {
117    case CGenNodeStateless::ss_FREE_RESUSE :
118         res="FREE    ";
119        break;
120    case CGenNodeStateless::ss_INACTIVE :
121        res="INACTIVE ";
122        break;
123    case CGenNodeStateless::ss_ACTIVE :
124        res="ACTIVE   ";
125        break;
126    default:
127        res="Unknow   ";
128    };
129    return(res);
130}
131
132
133rte_mbuf_t   * CGenNodeStateless::alloc_node_with_vm(){
134
135    rte_mbuf_t        * m;
136    /* alloc small packet buffer*/
137    uint16_t prefix_size = prefix_header_size();
138    m = CGlobalInfo::pktmbuf_alloc( get_socket_id(), prefix_size );
139    if (m==0) {
140        return (m);
141    }
142    /* TBD remove this, should handle cases of error */
143    assert(m);
144    char *p=rte_pktmbuf_append(m, prefix_size);
145    memcpy( p ,m_original_packet_data_prefix, prefix_size);
146
147
148    /* run the VM program */
149    StreamDPVmInstructionsRunner runner;
150
151    runner.run( (uint32_t*)m_vm_flow_var,
152                m_vm_program_size,
153                m_vm_program,
154                m_vm_flow_var,
155                (uint8_t*)p);
156
157    uint16_t pkt_new_size=runner.get_new_pkt_size();
158    if ( likely( pkt_new_size == 0) ) {
159        /* no packet size change */
160        rte_mbuf_t * m_const = get_const_mbuf();
161        if (  m_const != NULL) {
162            utl_rte_pktmbuf_add_after(m,m_const);
163        }
164        return (m);
165    }
166
167    /* packet size change there are a few changes */
168    rte_mbuf_t * m_const = get_const_mbuf();
169    if ( (m_const == 0 ) || (pkt_new_size<=prefix_size) ) {
170        /* one mbuf , just trim it */
171        m->data_len = pkt_new_size;
172        m->pkt_len  = pkt_new_size;
173        return (m);
174    }
175
176    rte_mbuf_t * mi= CGlobalInfo::pktmbuf_alloc_small(get_socket_id());
177    assert(mi);
178    rte_pktmbuf_attach(mi,m_const);
179    utl_rte_pktmbuf_add_after2(m,mi);
180
181    if ( pkt_new_size < m->pkt_len) {
182        /* need to trim it */
183        mi->data_len = (pkt_new_size - prefix_size);
184        m->pkt_len   = pkt_new_size;
185    }
186    return (m);
187}
188
189
190void CGenNodeStateless::free_stl_node(){
191    /* if we have cache mbuf free it */
192    rte_mbuf_t * m=get_cache_mbuf();
193    if (m) {
194        rte_pktmbuf_free(m);
195        m_cache_mbuf=0;
196    }else{
197        /* non cache - must have an header */
198         m=get_const_mbuf();
199         if (m) {
200             rte_pktmbuf_free(m); /* reduce the ref counter */
201         }
202         free_prefix_header();
203    }
204    if (m_vm_flow_var) {
205        /* free flow var */
206        free(m_vm_flow_var);
207        m_vm_flow_var=0;
208    }
209}
210
211
212bool TrexStatelessDpPerPort::update_number_of_active_streams(uint32_t d){
213    m_active_streams-=d; /* reduce the number of streams */
214    if (m_active_streams == 0) {
215        return (true);
216    }
217    return (false);
218}
219
220bool TrexStatelessDpPerPort::resume_traffic(uint8_t port_id){
221
222    /* we are working with continues streams so we must be in transmit mode */
223    assert(m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE);
224
225    for (auto dp_stream : m_active_nodes) {
226        CGenNodeStateless * node =dp_stream.m_node;
227        assert(node->get_port_id() == port_id);
228        assert(node->is_pause() == true);
229        node->set_pause(false);
230    }
231    m_state = TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
232    return (true);
233}
234
235bool TrexStatelessDpPerPort::update_traffic(uint8_t port_id, double factor) {
236
237    assert( (m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING ||
238            (m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE)) );
239
240    for (auto dp_stream : m_active_nodes) {
241        CGenNodeStateless * node = dp_stream.m_node;
242        assert(node->get_port_id() == port_id);
243
244        node->update_rate(factor);
245    }
246
247    return (true);
248}
249
250bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){
251
252    /* we are working with continues streams so we must be in transmit mode */
253    assert(m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING);
254
255    for (auto dp_stream : m_active_nodes) {
256        CGenNodeStateless * node =dp_stream.m_node;
257        assert(node->get_port_id() == port_id);
258        assert(node->is_pause() == false);
259        node->set_pause(true);
260    }
261    m_state = TrexStatelessDpPerPort::ppSTATE_PAUSE;
262    return (true);
263}
264
265bool TrexStatelessDpPerPort::push_pcap(uint8_t port_id,
266                                       const std::string &pcap_filename,
267                                       double ipg_usec,
268                                       double speedup,
269                                       uint32_t count) {
270
271    /* push pcap can only happen on an idle port from the core prespective */
272    assert(m_state == TrexStatelessDpPerPort::ppSTATE_IDLE);
273
274    CGenNodePCAP *pcap_node = m_core->allocate_pcap_node();
275    if (!pcap_node) {
276        return (false);
277    }
278
279    pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(port_id);
280
281    uint8_t mac_addr[12];
282    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, mac_addr);
283
284    bool rc = pcap_node->create(port_id,
285                                dir,
286                                mac_addr,
287                                pcap_filename,
288                                ipg_usec,
289                                speedup,
290                                count);
291    if (!rc) {
292        m_core->free_node((CGenNode *)pcap_node);
293        return (false);
294    }
295
296    /* schedule the node for now */
297    pcap_node->m_time = m_core->m_cur_time_sec;
298    m_core->m_node_gen.add_node((CGenNode *)pcap_node);
299
300    /* hold a pointer to the node */
301    assert(m_active_pcap_node == NULL);
302    m_active_pcap_node = pcap_node;
303
304    m_state = TrexStatelessDpPerPort::ppSTATE_PCAP_TX;
305    return (true);
306}
307
308
309bool TrexStatelessDpPerPort::stop_traffic(uint8_t  port_id,
310                                          bool     stop_on_id,
311                                          int      event_id){
312
313
314    if (m_state == TrexStatelessDpPerPort::ppSTATE_IDLE) {
315        assert(m_active_streams==0);
316        return false;
317    }
318
319    /* there could be race of stop after stop */
320    if ( stop_on_id ) {
321        if (event_id != m_event_id){
322            /* we can't stop it is an old message */
323            return false;
324        }
325    }
326
327    for (auto dp_stream : m_active_nodes) {
328        CGenNodeStateless * node =dp_stream.m_node;
329        assert(node->get_port_id() == port_id);
330        if ( node->get_state() == CGenNodeStateless::ss_ACTIVE) {
331            node->mark_for_free();
332            m_active_streams--;
333            dp_stream.DeleteOnlyStream();
334
335        }else{
336            dp_stream.Delete(m_core);
337        }
338    }
339
340    /* check for active PCAP node */
341    if (m_active_pcap_node) {
342        /* when got async stop from outside or duration */
343        if (m_active_pcap_node->is_active()) {
344            m_active_pcap_node->mark_for_free();
345        } else {
346            /* graceful stop - node was put out by the scheduler */
347            m_core->free_node( (CGenNode *)m_active_pcap_node);
348        }
349
350        m_active_pcap_node = NULL;
351    }
352
353    /* active stream should be zero */
354    assert(m_active_streams==0);
355    m_active_nodes.clear();
356    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
357    return (true);
358}
359
360
361void TrexStatelessDpPerPort::create(CFlowGenListPerThread   *  core){
362    m_core=core;
363    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
364    m_active_streams=0;
365    m_active_nodes.clear();
366    m_active_pcap_node = NULL;
367}
368
369
370
371void
372TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) {
373    m_thread_id = thread_id;
374    m_core = core;
375    m_local_port_offset = 2*core->getDualPortId();
376
377    CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
378
379    m_ring_from_cp = cp_dp->getRingCpToDp(thread_id);
380    m_ring_to_cp   = cp_dp->getRingDpToCp(thread_id);
381
382    m_state = STATE_IDLE;
383
384    int i;
385    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
386        m_ports[i].create(core);
387    }
388}
389
390
391/* move to the next stream, old stream move to INACTIVE */
392bool TrexStatelessDpCore::set_stateless_next_node(CGenNodeStateless * cur_node,
393                                                  CGenNodeStateless * next_node){
394
395    assert(cur_node);
396    TrexStatelessDpPerPort * lp_port = get_port_db(cur_node->m_port_id);
397    bool schedule =false;
398
399    bool to_stop_port=false;
400
401    if (next_node == NULL) {
402        /* there is no next stream , reduce the number of active streams*/
403        to_stop_port = lp_port->update_number_of_active_streams(1);
404
405    }else{
406        uint8_t state=next_node->get_state();
407
408        /* can't be FREE_RESUSE */
409        assert(state != CGenNodeStateless::ss_FREE_RESUSE);
410        if (state == CGenNodeStateless::ss_INACTIVE ) {
411
412            if (cur_node->m_action_counter > 0) {
413                cur_node->m_action_counter--;
414                if (cur_node->m_action_counter==0) {
415                    to_stop_port = lp_port->update_number_of_active_streams(1);
416                }else{
417                    /* refill start info and scedule, no update in active streams  */
418                    next_node->refresh();
419                    schedule = true;
420                }
421            }else{
422                /* refill start info and scedule, no update in active streams  */
423                next_node->refresh();
424                schedule = true;
425            }
426
427        }else{
428            to_stop_port = lp_port->update_number_of_active_streams(1);
429        }
430    }
431
432    if ( to_stop_port ) {
433        /* call stop port explictly to move the state */
434        stop_traffic(cur_node->m_port_id,false,0);
435    }
436
437    return ( schedule );
438}
439
440
441
442/**
443 * in idle state loop, the processor most of the time sleeps
444 * and periodically checks for messages
445 *
446 * @author imarom (01-Nov-15)
447 */
448void
449TrexStatelessDpCore::idle_state_loop() {
450
451    const int SHORT_DELAY_MS    = 2;
452    const int LONG_DELAY_MS     = 50;
453    const int DEEP_SLEEP_LIMIT  = 2000;
454
455    int counter = 0;
456
457    while (m_state == STATE_IDLE) {
458        m_core->m_node_gen.m_v_if->flush_dp_rx_queue();
459        bool had_msg = periodic_check_for_cp_messages();
460        if (had_msg) {
461            counter = 0;
462            continue;
463        }
464
465        /* enter deep sleep only if enough time had passed */
466        if (counter < DEEP_SLEEP_LIMIT) {
467            delay(SHORT_DELAY_MS);
468            counter++;
469        } else {
470            delay(LONG_DELAY_MS);
471        }
472
473    }
474}
475
476
477
478void TrexStatelessDpCore::quit_main_loop(){
479    m_core->set_terminate_mode(true); /* mark it as terminated */
480    m_state = STATE_TERMINATE;
481    add_global_duration(0.0001);
482}
483
484
485/**
486 * scehduler runs when traffic exists
487 * it will return when no more transmitting is done on this
488 * core
489 *
490 * @author imarom (01-Nov-15)
491 */
492void
493TrexStatelessDpCore::start_scheduler() {
494    /* creates a maintenace job using the scheduler */
495    CGenNode * node_sync = m_core->create_node() ;
496    node_sync->m_type = CGenNode::FLOW_SYNC;
497    node_sync->m_time = m_core->m_cur_time_sec + SYNC_TIME_OUT;
498    m_core->m_node_gen.add_node(node_sync);
499
500    double old_offset = 0.0;
501    m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset);
502    /* bail out in case of terminate */
503    if (m_state != TrexStatelessDpCore::STATE_TERMINATE) {
504        m_core->m_node_gen.close_file(m_core);
505        m_state = STATE_IDLE; /* we exit from all ports and we have nothing to do, we move to IDLE state */
506    }
507}
508
509
510void
511TrexStatelessDpCore::run_once(){
512
513    idle_state_loop();
514
515    if ( m_state == STATE_TERMINATE ){
516        return;
517    }
518
519    start_scheduler();
520}
521
522
523
524
525void
526TrexStatelessDpCore::start() {
527
528    while (true) {
529        run_once();
530
531        if ( m_core->is_terminated_by_master() ) {
532            break;
533        }
534    }
535}
536
537/* only if both port are idle we can exit */
538void
539TrexStatelessDpCore::schedule_exit(){
540
541    CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
542
543    node->m_type = CGenNode::COMMAND;
544
545    node->m_cmd = new TrexStatelessDpCanQuit();
546
547    /* make sure it will be scheduled after the current node */
548    node->m_time = m_core->m_cur_time_sec ;
549
550    m_core->m_node_gen.add_node((CGenNode *)node);
551}
552
553
554void
555TrexStatelessDpCore::add_global_duration(double duration){
556    if (duration > 0.0) {
557        CGenNode *node = m_core->create_node() ;
558
559        node->m_type = CGenNode::EXIT_SCHED;
560
561        /* make sure it will be scheduled after the current node */
562        node->m_time = m_core->m_cur_time_sec + duration ;
563
564        m_core->m_node_gen.add_node(node);
565    }
566}
567
568/* add per port exit */
569void
570TrexStatelessDpCore::add_port_duration(double duration,
571                                       uint8_t port_id,
572                                       int event_id){
573    if (duration > 0.0) {
574        CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
575
576        node->m_type = CGenNode::COMMAND;
577
578        /* make sure it will be scheduled after the current node */
579        node->m_time = m_core->m_cur_time_sec + duration ;
580
581        TrexStatelessDpStop * cmd=new TrexStatelessDpStop(port_id);
582
583
584        /* test this */
585        m_core->m_non_active_nodes++;
586        cmd->set_core_ptr(m_core);
587        cmd->set_event_id(event_id);
588        cmd->set_wait_for_event_id(true);
589
590        node->m_cmd = cmd;
591
592        m_core->m_node_gen.add_node((CGenNode *)node);
593    }
594}
595
596
597void TrexStatelessDpCore::update_mac_addr(TrexStream * stream,
598                                          CGenNodeStateless *node,
599                                          pkt_dir_t dir,
600                                          char *raw_pkt){
601    bool              ov_src = stream->get_override_src_mac_by_pkt_data();
602    TrexStream::stream_dst_mac_t  ov_dst = stream->get_override_dst_mac_mode();
603
604
605    if ( (ov_src == true) && (ov_dst == TrexStream::stPKT) ) {
606        /* nothing to do, take from the packet both */
607        return;
608    }
609
610        /* take from cfg_file */
611    if ( (ov_src == false) &&
612         (ov_dst == TrexStream::stCFG_FILE) ){
613
614          m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*)raw_pkt);
615          return;
616    }
617
618    /* save the pkt*/
619    char tmp_pkt[12];
620    memcpy(tmp_pkt,raw_pkt,12);
621
622    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*)raw_pkt);
623
624    if ((ov_src == true) && (ov_dst == TrexStream::stCFG_FILE)) {
625        memcpy(raw_pkt+6,tmp_pkt+6,6);
626    }
627
628    if ((ov_src == false) && (ov_dst == TrexStream::stPKT)) {
629        memcpy(raw_pkt,tmp_pkt,6);
630    }
631}
632
633
634void
635TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port,
636                                TrexStream * stream,
637                                TrexStreamsCompiledObj *comp) {
638
639    CGenNodeStateless *node = m_core->create_node_sl();
640
641    /* add periodic */
642    node->m_cache_mbuf=0;
643    node->m_type = CGenNode::STATELESS_PKT;
644
645    node->m_action_counter = stream->m_action_count;
646
647    /* clone the stream from control plane memory to DP memory */
648    node->m_ref_stream_info = stream->clone();
649    /* no need for this memory anymore on the control plane memory */
650    stream->release_dp_object();
651
652    node->m_next_stream=0; /* will be fixed later */
653
654    if ( stream->m_self_start ){
655        /* if self start it is in active mode */
656        node->m_state =CGenNodeStateless::ss_ACTIVE;
657        lp_port->m_active_streams++;
658    }else{
659        node->m_state =CGenNodeStateless::ss_INACTIVE;
660    }
661
662    node->m_time = m_core->m_cur_time_sec + stream->get_start_delay_sec();
663
664    pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(stream->m_port_id);
665    node->m_flags = 0;
666    node->m_src_port =0;
667    node->m_original_packet_data_prefix = 0;
668
669    if (stream->m_rx_check.m_enabled) {
670        node->set_stat_needed();
671        uint8_t hw_id = stream->m_rx_check.m_hw_id;
672        assert (hw_id < MAX_FLOW_STATS);
673        node->set_stat_hw_id(hw_id);
674    }
675
676    /* set socket id */
677    node->set_socket_id(m_core->m_node_gen.m_socket_id);
678
679    /* build a mbuf from a packet */
680
681    uint16_t pkt_size = stream->m_pkt.len;
682    const uint8_t *stream_pkt = stream->m_pkt.binary;
683
684    node->m_pause =0;
685    node->m_stream_type = stream->m_type;
686    node->m_next_time_offset = 1.0 / stream->get_pps();
687    node->m_null_stream = (stream->m_null_stream ? 1 : 0);
688
689    /* stateless specific fields */
690    switch ( stream->m_type ) {
691
692    case TrexStream::stCONTINUOUS :
693        node->m_single_burst=0;
694        node->m_single_burst_refill=0;
695        node->m_multi_bursts=0;
696        break;
697
698    case TrexStream::stSINGLE_BURST :
699        node->m_stream_type             = TrexStream::stMULTI_BURST;
700        node->m_single_burst            = stream->m_burst_total_pkts;
701        node->m_single_burst_refill     = stream->m_burst_total_pkts;
702        node->m_multi_bursts            = 1;  /* single burst in multi burst of 1 */
703        break;
704
705    case TrexStream::stMULTI_BURST :
706        node->m_single_burst        = stream->m_burst_total_pkts;
707        node->m_single_burst_refill = stream->m_burst_total_pkts;
708        node->m_multi_bursts        = stream->m_num_bursts;
709        break;
710    default:
711
712        assert(0);
713    };
714
715    node->m_port_id = stream->m_port_id;
716
717    /* set dir 0 or 1 client or server */
718    node->set_mbuf_cache_dir(dir);
719
720
721    if (node->m_ref_stream_info->getDpVm() == NULL) {
722        /* no VM */
723
724        node->m_vm_flow_var =  NULL;
725        node->m_vm_program  =  NULL;
726        node->m_vm_program_size =0;
727
728                /* allocate const mbuf */
729        rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size);
730        assert(m);
731
732        char *p = rte_pktmbuf_append(m, pkt_size);
733        assert(p);
734        /* copy the packet */
735        memcpy(p,stream_pkt,pkt_size);
736
737        update_mac_addr(stream,node,dir,p);
738
739        /* set the packet as a readonly */
740        node->set_cache_mbuf(m);
741
742        node->m_original_packet_data_prefix =0;
743    }else{
744
745        /* set the program */
746        TrexStream * local_mem_stream = node->m_ref_stream_info;
747
748        StreamVmDp  * lpDpVm = local_mem_stream->getDpVm();
749
750        node->m_vm_flow_var      = lpDpVm->clone_bss(); /* clone the flow var */
751        node->m_vm_program       = lpDpVm->get_program(); /* same ref to the program */
752        node->m_vm_program_size  = lpDpVm->get_program_size();
753
754
755        /* set the random seed if was set */
756        if ( lpDpVm->is_random_seed() ){
757            /* if we have random seed for this program */
758            if (stream->m_random_seed) {
759                node->set_random_seed(stream->m_random_seed);
760            }
761        }
762
763        /* we need to copy the object */
764        if ( pkt_size > lpDpVm->get_prefix_size() ) {
765            /* we need const packet */
766            uint16_t const_pkt_size  = pkt_size - lpDpVm->get_prefix_size() ;
767            rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), const_pkt_size );
768            assert(m);
769
770            char *p = rte_pktmbuf_append(m, const_pkt_size);
771            assert(p);
772
773            /* copy packet data */
774            memcpy(p,(stream_pkt + lpDpVm->get_prefix_size()),const_pkt_size);
775
776            node->set_const_mbuf(m);
777        }
778
779
780        if ( lpDpVm->is_pkt_size_var() ) {
781            // mark the node as varible size
782            node->set_var_pkt_size();
783        }
784
785
786        if (lpDpVm->get_prefix_size() > pkt_size ) {
787            lpDpVm->set_prefix_size(pkt_size);
788        }
789
790        /* copy the headr */
791        uint16_t header_size = lpDpVm->get_prefix_size();
792        assert(header_size);
793        node->alloc_prefix_header(header_size);
794        uint8_t *p=node->m_original_packet_data_prefix;
795        assert(p);
796
797        memcpy(p,stream_pkt , header_size);
798
799        update_mac_addr(stream,node,dir,(char *)p);
800    }
801
802
803    CDpOneStream one_stream;
804
805    one_stream.m_dp_stream = node->m_ref_stream_info;
806    one_stream.m_node =node;
807
808    lp_port->m_active_nodes.push_back(one_stream);
809
810    /* schedule only if active */
811    if (node->m_state == CGenNodeStateless::ss_ACTIVE) {
812        m_core->m_node_gen.add_node((CGenNode *)node);
813    }
814}
815
816void
817TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj,
818                                   double duration,
819                                   int event_id) {
820
821
822    TrexStatelessDpPerPort * lp_port=get_port_db(obj->get_port_id());
823    lp_port->m_active_streams = 0;
824    lp_port->set_event_id(event_id);
825
826    /* no nodes in the list */
827    assert(lp_port->m_active_nodes.size()==0);
828
829    for (auto single_stream : obj->get_objects()) {
830        /* all commands should be for the same port */
831        assert(obj->get_port_id() == single_stream.m_stream->m_port_id);
832        add_stream(lp_port,single_stream.m_stream,obj);
833    }
834
835    uint32_t nodes = lp_port->m_active_nodes.size();
836    /* find next stream */
837    assert(nodes == obj->get_objects().size());
838
839    int cnt=0;
840
841    /* set the next_stream pointer  */
842    for (auto single_stream : obj->get_objects()) {
843
844        if (single_stream.m_stream->is_dp_next_stream() ) {
845            int stream_id = single_stream.m_stream->m_next_stream_id;
846            assert(stream_id<nodes);
847            /* point to the next stream , stream_id is fixed */
848            lp_port->m_active_nodes[cnt].m_node->m_next_stream = lp_port->m_active_nodes[stream_id].m_node ;
849        }
850        cnt++;
851    }
852
853    lp_port->m_state =TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
854    m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
855
856
857    if ( duration > 0.0 ){
858        add_port_duration( duration ,obj->get_port_id(),event_id );
859    }
860
861}
862
863
864bool TrexStatelessDpCore::are_all_ports_idle(){
865
866    bool res=true;
867    int i;
868    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
869        if ( m_ports[i].m_state != TrexStatelessDpPerPort::ppSTATE_IDLE ){
870            res=false;
871        }
872    }
873    return (res);
874}
875
876
877void
878TrexStatelessDpCore::resume_traffic(uint8_t port_id){
879
880    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
881
882    lp_port->resume_traffic(port_id);
883}
884
885
886void
887TrexStatelessDpCore::pause_traffic(uint8_t port_id){
888
889    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
890
891    lp_port->pause_traffic(port_id);
892}
893
894
895void
896TrexStatelessDpCore::push_pcap(uint8_t port_id,
897                               int event_id,
898                               const std::string &pcap_filename,
899                               double ipg_usec,
900                               double speedup,
901                               uint32_t count,
902                               double duration) {
903
904    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
905
906    lp_port->set_event_id(event_id);
907
908    /* delegate the command to the port */
909    bool rc = lp_port->push_pcap(port_id, pcap_filename, ipg_usec, speedup, count);
910    if (!rc) {
911        /* report back that we stopped */
912        CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
913        TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
914                                                                       port_id,
915                                                                       event_id,
916                                                                       false);
917        ring->Enqueue((CGenNode *)event_msg);
918        return;
919    }
920
921
922    if (duration > 0.0) {
923        add_port_duration(duration, port_id, event_id);
924    }
925
926     m_state = TrexStatelessDpCore::STATE_PCAP_TX;
927}
928
929
930void
931TrexStatelessDpCore::update_traffic(uint8_t port_id, double factor) {
932
933    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
934
935    lp_port->update_traffic(port_id, factor);
936}
937
938
939void
940TrexStatelessDpCore::stop_traffic(uint8_t  port_id,
941                                  bool     stop_on_id,
942                                  int      event_id) {
943    /* we cannot remove nodes not from the top of the queue so
944       for every active node - make sure next time
945       the scheduler invokes it, it will be free */
946
947    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
948    if ( lp_port->stop_traffic(port_id,stop_on_id,event_id) == false){
949        /* nothing to do ! already stopped */
950        //printf(" skip .. %f\n",m_core->m_cur_time_sec);
951        return;
952    }
953
954    /* inform the control plane we stopped - this might be a async stop
955       (streams ended)
956    */
957    #if 0
958    if ( are_all_ports_idle() ) {
959        /* just a place holder if we will need to do somthing in that case */
960    }
961    #endif
962
963    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
964    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
965                                                                   port_id,
966                                                                   lp_port->get_event_id());
967    ring->Enqueue((CGenNode *)event_msg);
968
969}
970
971/**
972 * handle a message from CP to DP
973 *
974 */
975void
976TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) {
977    msg->handle(this);
978    delete msg;
979}
980
981void
982TrexStatelessDpCore::barrier(uint8_t port_id, int event_id) {
983
984    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
985    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
986                                                                   port_id,
987                                                                   event_id);
988    ring->Enqueue((CGenNode *)event_msg);
989}
990
991
992/**
993 * PCAP node
994 */
995bool CGenNodePCAP::create(uint8_t port_id,
996                          pkt_dir_t dir,
997                          const uint8_t *mac_addr,
998                          const std::string &pcap_filename,
999                          double ipg_usec,
1000                          double speedup,
1001                          uint32_t count) {
1002    std::stringstream ss;
1003
1004    m_type       = CGenNode::PCAP_PKT;
1005    m_flags      = 0;
1006    m_src_port   = 0;
1007    m_port_id    = port_id;
1008    m_count      = count;
1009
1010    /* mark this node as slow path */
1011    set_slow_path(true);
1012
1013    if (ipg_usec != -1) {
1014        /* fixed IPG */
1015        m_ipg_sec = usec_to_sec(ipg_usec / speedup);
1016        m_speedup = 0;
1017    } else {
1018        /* packet IPG */
1019        m_ipg_sec = -1;
1020        m_speedup  = speedup;
1021    }
1022
1023    /* copy MAC addr info */
1024    memcpy(m_mac_addr, mac_addr, 12);
1025
1026    /* set the dir */
1027    set_mbuf_dir(dir);
1028
1029    /* create the PCAP reader */
1030    m_reader = CCapReaderFactory::CreateReader((char *)pcap_filename.c_str(), 0, ss);
1031    if (!m_reader) {
1032        return false;
1033    }
1034
1035    m_raw_packet = new CCapPktRaw();
1036    if ( m_reader->ReadPacket(m_raw_packet) == false ){
1037        /* handle error */
1038        delete m_reader;
1039        return (false);
1040    }
1041
1042    /* this is the reference time */
1043    //m_base_time = m_raw_packet->get_time();
1044    m_last_pkt_time = m_raw_packet->get_time();
1045
1046    /* ready */
1047    m_state = PCAP_ACTIVE;
1048
1049    return true;
1050}
1051
1052/**
1053 * cleanup for PCAP node
1054 *
1055 * @author imarom (08-May-16)
1056 */
1057void CGenNodePCAP::destroy() {
1058
1059    if (m_raw_packet) {
1060        delete m_raw_packet;
1061        m_raw_packet = NULL;
1062    }
1063
1064    if (m_reader) {
1065        delete m_reader;
1066        m_reader = NULL;
1067    }
1068
1069    m_state = PCAP_INVALID;
1070}
1071
1072