trex_stateless_dp_core.cpp revision 3ef23bf8
1/*
2 Itay Marom
3 Hanoch Haim
4 Cisco Systems, Inc.
5*/
6
7/*
8Copyright (c) 2015-2016 Cisco Systems, Inc.
9
10Licensed under the Apache License, Version 2.0 (the "License");
11you may not use this file except in compliance with the License.
12You may obtain a copy of the License at
13
14    http://www.apache.org/licenses/LICENSE-2.0
15
16Unless required by applicable law or agreed to in writing, software
17distributed under the License is distributed on an "AS IS" BASIS,
18WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19See the License for the specific language governing permissions and
20limitations under the License.
21*/
22#include "bp_sim.h"
23#include "trex_stateless_dp_core.h"
24#include "trex_stateless_messaging.h"
25#include "trex_stream.h"
26#include "trex_stream_node.h"
27#include "trex_streams_compiler.h"
28
29void CDpOneStream::Delete(CFlowGenListPerThread   * core){
30    assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE);
31    core->free_node((CGenNode *)m_node);
32    delete m_dp_stream;
33    m_node=0;
34    m_dp_stream=0;
35}
36
37void CDpOneStream::DeleteOnlyStream(){
38    assert(m_dp_stream);
39    delete m_dp_stream;
40    m_dp_stream=0;
41}
42
43int CGenNodeStateless::get_stream_id(){
44    if (m_state ==CGenNodeStateless::ss_FREE_RESUSE) {
45        return (-1); // not valid
46    }
47    assert(m_ref_stream_info);
48    return ((int)m_ref_stream_info->m_stream_id);
49}
50
51
52void CGenNodeStateless::DumpHeader(FILE *fd){
53    fprintf(fd," pkt_id, time, port , action , state, stream_id , stype , m-burst# , burst# \n");
54
55}
56void CGenNodeStateless::Dump(FILE *fd){
57    fprintf(fd," %2.4f, %3lu, %s,%s, %3d, %s, %3lu, %3lu  \n",
58            m_time,
59            (ulong)m_port_id,
60            "s-pkt", //action
61            get_stream_state_str(m_state ).c_str(),
62            get_stream_id(),   //stream_id
63            TrexStream::get_stream_type_str(m_stream_type).c_str(), //stype
64            (ulong)m_multi_bursts,
65            (ulong)m_single_burst
66            );
67}
68
69
70
71void CGenNodeStateless::refresh_vm_bss(){
72    if ( m_vm_flow_var ) {
73        StreamVmDp  * vm_s=m_ref_stream_info->m_vm_dp;
74        assert(vm_s);
75        memcpy(m_vm_flow_var,vm_s->get_bss(),vm_s->get_bss_size());
76
77        if ( vm_s->is_random_seed() ){
78            /* if we have random seed for this program */
79            if (m_ref_stream_info->m_random_seed) {
80                set_random_seed(m_ref_stream_info->m_random_seed);
81            }
82        }
83    }
84}
85
86
87/**
88 * this function called when stream restart after it was inactive
89 */
90void CGenNodeStateless::refresh(){
91
92    /* refill the stream info */
93    m_single_burst    = m_single_burst_refill;
94    m_multi_bursts    = m_ref_stream_info->m_num_bursts;
95    m_state           = CGenNodeStateless::ss_ACTIVE;
96
97    /* refresh init value */
98#if 0
99    /* TBD should add a JSON varible for that */
100    refresh_vm_bss();
101#endif
102}
103
104
105void CGenNodeCommand::free_command(){
106
107    assert(m_cmd);
108    m_cmd->on_node_remove();
109    delete m_cmd;
110}
111
112
113std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state){
114    std::string res;
115
116    switch (stream_state) {
117    case CGenNodeStateless::ss_FREE_RESUSE :
118         res="FREE    ";
119        break;
120    case CGenNodeStateless::ss_INACTIVE :
121        res="INACTIVE ";
122        break;
123    case CGenNodeStateless::ss_ACTIVE :
124        res="ACTIVE   ";
125        break;
126    default:
127        res="Unknow   ";
128    };
129    return(res);
130}
131
132
133rte_mbuf_t   * CGenNodeStateless::alloc_node_with_vm(){
134
135    rte_mbuf_t        * m;
136    /* alloc small packet buffer*/
137    uint16_t prefix_size = prefix_header_size();
138    m = CGlobalInfo::pktmbuf_alloc( get_socket_id(), prefix_size );
139    if (m==0) {
140        return (m);
141    }
142    /* TBD remove this, should handle cases of error */
143    assert(m);
144    char *p=rte_pktmbuf_append(m, prefix_size);
145    memcpy( p ,m_original_packet_data_prefix, prefix_size);
146
147
148    /* run the VM program */
149    StreamDPVmInstructionsRunner runner;
150
151    runner.run( (uint32_t*)m_vm_flow_var,
152                m_vm_program_size,
153                m_vm_program,
154                m_vm_flow_var,
155                (uint8_t*)p);
156
157    uint16_t pkt_new_size=runner.get_new_pkt_size();
158    if ( likely( pkt_new_size == 0) ) {
159        /* no packet size change */
160        rte_mbuf_t * m_const = get_const_mbuf();
161        if (  m_const != NULL) {
162            utl_rte_pktmbuf_add_after(m,m_const);
163        }
164        return (m);
165    }
166
167    /* packet size change there are a few changes */
168    rte_mbuf_t * m_const = get_const_mbuf();
169    if ( (m_const == 0 ) || (pkt_new_size<=prefix_size) ) {
170        /* one mbuf , just trim it */
171        m->data_len = pkt_new_size;
172        m->pkt_len  = pkt_new_size;
173        return (m);
174    }
175
176    rte_mbuf_t * mi= CGlobalInfo::pktmbuf_alloc_small(get_socket_id());
177    assert(mi);
178    rte_pktmbuf_attach(mi,m_const);
179    utl_rte_pktmbuf_add_after2(m,mi);
180
181    if ( pkt_new_size < m->pkt_len) {
182        /* need to trim it */
183        mi->data_len = (pkt_new_size - prefix_size);
184        m->pkt_len   = pkt_new_size;
185    }
186    return (m);
187}
188
189
190void CGenNodeStateless::free_stl_node(){
191    /* if we have cache mbuf free it */
192    rte_mbuf_t * m=get_cache_mbuf();
193    if (m) {
194        rte_pktmbuf_free(m);
195        m_cache_mbuf=0;
196    }else{
197        /* non cache - must have an header */
198         m=get_const_mbuf();
199         if (m) {
200             rte_pktmbuf_free(m); /* reduce the ref counter */
201         }
202         free_prefix_header();
203    }
204    if (m_vm_flow_var) {
205        /* free flow var */
206        free(m_vm_flow_var);
207        m_vm_flow_var=0;
208    }
209}
210
211
212bool TrexStatelessDpPerPort::update_number_of_active_streams(uint32_t d){
213    m_active_streams-=d; /* reduce the number of streams */
214    if (m_active_streams == 0) {
215        return (true);
216    }
217    return (false);
218}
219
220bool TrexStatelessDpPerPort::resume_traffic(uint8_t port_id){
221
222    /* we are working with continues streams so we must be in transmit mode */
223    assert(m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE);
224
225    for (auto dp_stream : m_active_nodes) {
226        CGenNodeStateless * node =dp_stream.m_node;
227        assert(node->get_port_id() == port_id);
228        assert(node->is_pause() == true);
229        node->set_pause(false);
230    }
231    m_state = TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
232    return (true);
233}
234
235bool TrexStatelessDpPerPort::update_traffic(uint8_t port_id, double factor) {
236
237    assert( (m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING ||
238            (m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE)) );
239
240    for (auto dp_stream : m_active_nodes) {
241        CGenNodeStateless * node = dp_stream.m_node;
242        assert(node->get_port_id() == port_id);
243
244        node->update_rate(factor);
245    }
246
247    return (true);
248}
249
250bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){
251
252    /* we are working with continues streams so we must be in transmit mode */
253    assert(m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING);
254
255    for (auto dp_stream : m_active_nodes) {
256        CGenNodeStateless * node =dp_stream.m_node;
257        assert(node->get_port_id() == port_id);
258        assert(node->is_pause() == false);
259        node->set_pause(true);
260    }
261    m_state = TrexStatelessDpPerPort::ppSTATE_PAUSE;
262    return (true);
263}
264
265bool TrexStatelessDpPerPort::push_pcap(uint8_t port_id,
266                                       const std::string &pcap_filename,
267                                       double ipg_usec,
268                                       double speedup,
269                                       uint32_t count) {
270
271    /* push pcap can only happen on an idle port from the core prespective */
272    assert(m_state == TrexStatelessDpPerPort::ppSTATE_IDLE);
273
274    CGenNodePCAP *pcap_node = m_core->allocate_pcap_node();
275    if (!pcap_node) {
276        return (false);
277    }
278
279    pkt_dir_t dir          = m_core->m_node_gen.m_v_if->port_id_to_dir(port_id);
280    socket_id_t socket_id  = m_core->m_node_gen.m_socket_id;
281
282    uint8_t mac_addr[12];
283    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, mac_addr);
284
285    bool rc = pcap_node->create(port_id,
286                                dir,
287                                socket_id,
288                                mac_addr,
289                                pcap_filename,
290                                ipg_usec,
291                                speedup,
292                                count);
293    if (!rc) {
294        m_core->free_node((CGenNode *)pcap_node);
295        return (false);
296    }
297
298    /* schedule the node for now */
299    pcap_node->m_time = m_core->m_cur_time_sec;
300    m_core->m_node_gen.add_node((CGenNode *)pcap_node);
301
302    /* hold a pointer to the node */
303    assert(m_active_pcap_node == NULL);
304    m_active_pcap_node = pcap_node;
305
306    m_state = TrexStatelessDpPerPort::ppSTATE_PCAP_TX;
307    return (true);
308}
309
310
311bool TrexStatelessDpPerPort::stop_traffic(uint8_t  port_id,
312                                          bool     stop_on_id,
313                                          int      event_id){
314
315
316    if (m_state == TrexStatelessDpPerPort::ppSTATE_IDLE) {
317        assert(m_active_streams==0);
318        return false;
319    }
320
321    /* there could be race of stop after stop */
322    if ( stop_on_id ) {
323        if (event_id != m_event_id){
324            /* we can't stop it is an old message */
325            return false;
326        }
327    }
328
329    for (auto dp_stream : m_active_nodes) {
330        CGenNodeStateless * node =dp_stream.m_node;
331        assert(node->get_port_id() == port_id);
332        if ( node->get_state() == CGenNodeStateless::ss_ACTIVE) {
333            node->mark_for_free();
334            m_active_streams--;
335            dp_stream.DeleteOnlyStream();
336
337        }else{
338            dp_stream.Delete(m_core);
339        }
340    }
341
342    /* check for active PCAP node */
343    if (m_active_pcap_node) {
344        /* when got async stop from outside or duration */
345        if (m_active_pcap_node->is_active()) {
346            m_active_pcap_node->mark_for_free();
347        } else {
348            /* graceful stop - node was put out by the scheduler */
349            m_core->free_node( (CGenNode *)m_active_pcap_node);
350        }
351
352        m_active_pcap_node = NULL;
353    }
354
355    /* active stream should be zero */
356    assert(m_active_streams==0);
357    m_active_nodes.clear();
358    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
359    return (true);
360}
361
362
363void TrexStatelessDpPerPort::create(CFlowGenListPerThread   *  core){
364    m_core=core;
365    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
366    m_active_streams=0;
367    m_active_nodes.clear();
368    m_active_pcap_node = NULL;
369}
370
371
372
373void
374TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) {
375    m_thread_id = thread_id;
376    m_core = core;
377    m_local_port_offset = 2*core->getDualPortId();
378
379    CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
380
381    m_ring_from_cp = cp_dp->getRingCpToDp(thread_id);
382    m_ring_to_cp   = cp_dp->getRingDpToCp(thread_id);
383
384    m_state = STATE_IDLE;
385
386    int i;
387    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
388        m_ports[i].create(core);
389    }
390}
391
392
393/* move to the next stream, old stream move to INACTIVE */
394bool TrexStatelessDpCore::set_stateless_next_node(CGenNodeStateless * cur_node,
395                                                  CGenNodeStateless * next_node){
396
397    assert(cur_node);
398    TrexStatelessDpPerPort * lp_port = get_port_db(cur_node->m_port_id);
399    bool schedule =false;
400
401    bool to_stop_port=false;
402
403    if (next_node == NULL) {
404        /* there is no next stream , reduce the number of active streams*/
405        to_stop_port = lp_port->update_number_of_active_streams(1);
406
407    }else{
408        uint8_t state=next_node->get_state();
409
410        /* can't be FREE_RESUSE */
411        assert(state != CGenNodeStateless::ss_FREE_RESUSE);
412        if (state == CGenNodeStateless::ss_INACTIVE ) {
413
414            if (cur_node->m_action_counter > 0) {
415                cur_node->m_action_counter--;
416                if (cur_node->m_action_counter==0) {
417                    to_stop_port = lp_port->update_number_of_active_streams(1);
418                }else{
419                    /* refill start info and scedule, no update in active streams  */
420                    next_node->refresh();
421                    schedule = true;
422                }
423            }else{
424                /* refill start info and scedule, no update in active streams  */
425                next_node->refresh();
426                schedule = true;
427            }
428
429        }else{
430            to_stop_port = lp_port->update_number_of_active_streams(1);
431        }
432    }
433
434    if ( to_stop_port ) {
435        /* call stop port explictly to move the state */
436        stop_traffic(cur_node->m_port_id,false,0);
437    }
438
439    return ( schedule );
440}
441
442
443
444/**
445 * in idle state loop, the processor most of the time sleeps
446 * and periodically checks for messages
447 *
448 * @author imarom (01-Nov-15)
449 */
450void
451TrexStatelessDpCore::idle_state_loop() {
452
453    const int SHORT_DELAY_MS    = 2;
454    const int LONG_DELAY_MS     = 50;
455    const int DEEP_SLEEP_LIMIT  = 2000;
456
457    int counter = 0;
458
459    while (m_state == STATE_IDLE) {
460        m_core->m_node_gen.m_v_if->flush_dp_rx_queue();
461        bool had_msg = periodic_check_for_cp_messages();
462        if (had_msg) {
463            counter = 0;
464            continue;
465        }
466
467        /* enter deep sleep only if enough time had passed */
468        if (counter < DEEP_SLEEP_LIMIT) {
469            delay(SHORT_DELAY_MS);
470            counter++;
471        } else {
472            delay(LONG_DELAY_MS);
473        }
474
475    }
476}
477
478
479
480void TrexStatelessDpCore::quit_main_loop(){
481    m_core->set_terminate_mode(true); /* mark it as terminated */
482    m_state = STATE_TERMINATE;
483    add_global_duration(0.0001);
484}
485
486
487/**
488 * scehduler runs when traffic exists
489 * it will return when no more transmitting is done on this
490 * core
491 *
492 * @author imarom (01-Nov-15)
493 */
494void
495TrexStatelessDpCore::start_scheduler() {
496    /* creates a maintenace job using the scheduler */
497    CGenNode * node_sync = m_core->create_node() ;
498    node_sync->m_type = CGenNode::FLOW_SYNC;
499    node_sync->m_time = m_core->m_cur_time_sec + SYNC_TIME_OUT;
500    m_core->m_node_gen.add_node(node_sync);
501
502    double old_offset = 0.0;
503    m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset);
504    /* bail out in case of terminate */
505    if (m_state != TrexStatelessDpCore::STATE_TERMINATE) {
506        m_core->m_node_gen.close_file(m_core);
507        m_state = STATE_IDLE; /* we exit from all ports and we have nothing to do, we move to IDLE state */
508    }
509}
510
511
512void
513TrexStatelessDpCore::run_once(){
514
515    idle_state_loop();
516
517    if ( m_state == STATE_TERMINATE ){
518        return;
519    }
520
521    start_scheduler();
522}
523
524
525
526
527void
528TrexStatelessDpCore::start() {
529
530    while (true) {
531        run_once();
532
533        if ( m_core->is_terminated_by_master() ) {
534            break;
535        }
536    }
537}
538
539/* only if both port are idle we can exit */
540void
541TrexStatelessDpCore::schedule_exit(){
542
543    CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
544
545    node->m_type = CGenNode::COMMAND;
546
547    node->m_cmd = new TrexStatelessDpCanQuit();
548
549    /* make sure it will be scheduled after the current node */
550    node->m_time = m_core->m_cur_time_sec ;
551
552    m_core->m_node_gen.add_node((CGenNode *)node);
553}
554
555
556void
557TrexStatelessDpCore::add_global_duration(double duration){
558    if (duration > 0.0) {
559        CGenNode *node = m_core->create_node() ;
560
561        node->m_type = CGenNode::EXIT_SCHED;
562
563        /* make sure it will be scheduled after the current node */
564        node->m_time = m_core->m_cur_time_sec + duration ;
565
566        m_core->m_node_gen.add_node(node);
567    }
568}
569
570/* add per port exit */
571void
572TrexStatelessDpCore::add_port_duration(double duration,
573                                       uint8_t port_id,
574                                       int event_id){
575    if (duration > 0.0) {
576        CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
577
578        node->m_type = CGenNode::COMMAND;
579
580        /* make sure it will be scheduled after the current node */
581        node->m_time = m_core->m_cur_time_sec + duration ;
582
583        TrexStatelessDpStop * cmd=new TrexStatelessDpStop(port_id);
584
585
586        /* test this */
587        m_core->m_non_active_nodes++;
588        cmd->set_core_ptr(m_core);
589        cmd->set_event_id(event_id);
590        cmd->set_wait_for_event_id(true);
591
592        node->m_cmd = cmd;
593
594        m_core->m_node_gen.add_node((CGenNode *)node);
595    }
596}
597
598
599void TrexStatelessDpCore::update_mac_addr(TrexStream * stream,
600                                          CGenNodeStateless *node,
601                                          pkt_dir_t dir,
602                                          char *raw_pkt){
603    bool              ov_src = stream->get_override_src_mac_by_pkt_data();
604    TrexStream::stream_dst_mac_t  ov_dst = stream->get_override_dst_mac_mode();
605
606
607    if ( (ov_src == true) && (ov_dst == TrexStream::stPKT) ) {
608        /* nothing to do, take from the packet both */
609        return;
610    }
611
612        /* take from cfg_file */
613    if ( (ov_src == false) &&
614         (ov_dst == TrexStream::stCFG_FILE) ){
615
616          m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*)raw_pkt);
617          return;
618    }
619
620    /* save the pkt*/
621    char tmp_pkt[12];
622    memcpy(tmp_pkt,raw_pkt,12);
623
624    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*)raw_pkt);
625
626    if ((ov_src == true) && (ov_dst == TrexStream::stCFG_FILE)) {
627        memcpy(raw_pkt+6,tmp_pkt+6,6);
628    }
629
630    if ((ov_src == false) && (ov_dst == TrexStream::stPKT)) {
631        memcpy(raw_pkt,tmp_pkt,6);
632    }
633}
634
635
636void
637TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port,
638                                TrexStream * stream,
639                                TrexStreamsCompiledObj *comp) {
640
641    CGenNodeStateless *node = m_core->create_node_sl();
642
643    /* add periodic */
644    node->m_cache_mbuf=0;
645    node->m_type = CGenNode::STATELESS_PKT;
646
647    node->m_action_counter = stream->m_action_count;
648
649    /* clone the stream from control plane memory to DP memory */
650    node->m_ref_stream_info = stream->clone();
651    /* no need for this memory anymore on the control plane memory */
652    stream->release_dp_object();
653
654    node->m_next_stream=0; /* will be fixed later */
655
656    if ( stream->m_self_start ){
657        /* if self start it is in active mode */
658        node->m_state =CGenNodeStateless::ss_ACTIVE;
659        lp_port->m_active_streams++;
660    }else{
661        node->m_state =CGenNodeStateless::ss_INACTIVE;
662    }
663
664    node->m_time = m_core->m_cur_time_sec + stream->get_start_delay_sec();
665
666    pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(stream->m_port_id);
667    node->m_flags = 0;
668    node->m_src_port =0;
669    node->m_original_packet_data_prefix = 0;
670
671    if (stream->m_rx_check.m_enabled) {
672        node->set_stat_needed();
673        uint8_t hw_id = stream->m_rx_check.m_hw_id;
674        assert (hw_id < MAX_FLOW_STATS);
675        node->set_stat_hw_id(hw_id);
676    }
677
678    /* set socket id */
679    node->set_socket_id(m_core->m_node_gen.m_socket_id);
680
681    /* build a mbuf from a packet */
682
683    uint16_t pkt_size = stream->m_pkt.len;
684    const uint8_t *stream_pkt = stream->m_pkt.binary;
685
686    node->m_pause =0;
687    node->m_stream_type = stream->m_type;
688    node->m_next_time_offset = 1.0 / stream->get_pps();
689    node->m_null_stream = (stream->m_null_stream ? 1 : 0);
690
691    /* stateless specific fields */
692    switch ( stream->m_type ) {
693
694    case TrexStream::stCONTINUOUS :
695        node->m_single_burst=0;
696        node->m_single_burst_refill=0;
697        node->m_multi_bursts=0;
698        break;
699
700    case TrexStream::stSINGLE_BURST :
701        node->m_stream_type             = TrexStream::stMULTI_BURST;
702        node->m_single_burst            = stream->m_burst_total_pkts;
703        node->m_single_burst_refill     = stream->m_burst_total_pkts;
704        node->m_multi_bursts            = 1;  /* single burst in multi burst of 1 */
705        break;
706
707    case TrexStream::stMULTI_BURST :
708        node->m_single_burst        = stream->m_burst_total_pkts;
709        node->m_single_burst_refill = stream->m_burst_total_pkts;
710        node->m_multi_bursts        = stream->m_num_bursts;
711        break;
712    default:
713
714        assert(0);
715    };
716
717    node->m_port_id = stream->m_port_id;
718
719    /* set dir 0 or 1 client or server */
720    node->set_mbuf_cache_dir(dir);
721
722
723    if (node->m_ref_stream_info->getDpVm() == NULL) {
724        /* no VM */
725
726        node->m_vm_flow_var =  NULL;
727        node->m_vm_program  =  NULL;
728        node->m_vm_program_size =0;
729
730                /* allocate const mbuf */
731        rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size);
732        assert(m);
733
734        char *p = rte_pktmbuf_append(m, pkt_size);
735        assert(p);
736        /* copy the packet */
737        memcpy(p,stream_pkt,pkt_size);
738
739        update_mac_addr(stream,node,dir,p);
740
741        /* set the packet as a readonly */
742        node->set_cache_mbuf(m);
743
744        node->m_original_packet_data_prefix =0;
745    }else{
746
747        /* set the program */
748        TrexStream * local_mem_stream = node->m_ref_stream_info;
749
750        StreamVmDp  * lpDpVm = local_mem_stream->getDpVm();
751
752        node->m_vm_flow_var      = lpDpVm->clone_bss(); /* clone the flow var */
753        node->m_vm_program       = lpDpVm->get_program(); /* same ref to the program */
754        node->m_vm_program_size  = lpDpVm->get_program_size();
755
756
757        /* set the random seed if was set */
758        if ( lpDpVm->is_random_seed() ){
759            /* if we have random seed for this program */
760            if (stream->m_random_seed) {
761                node->set_random_seed(stream->m_random_seed);
762            }
763        }
764
765        /* we need to copy the object */
766        if ( pkt_size > lpDpVm->get_prefix_size() ) {
767            /* we need const packet */
768            uint16_t const_pkt_size  = pkt_size - lpDpVm->get_prefix_size() ;
769            rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), const_pkt_size );
770            assert(m);
771
772            char *p = rte_pktmbuf_append(m, const_pkt_size);
773            assert(p);
774
775            /* copy packet data */
776            memcpy(p,(stream_pkt + lpDpVm->get_prefix_size()),const_pkt_size);
777
778            node->set_const_mbuf(m);
779        }
780
781
782        if ( lpDpVm->is_pkt_size_var() ) {
783            // mark the node as varible size
784            node->set_var_pkt_size();
785        }
786
787
788        if (lpDpVm->get_prefix_size() > pkt_size ) {
789            lpDpVm->set_prefix_size(pkt_size);
790        }
791
792        /* copy the headr */
793        uint16_t header_size = lpDpVm->get_prefix_size();
794        assert(header_size);
795        node->alloc_prefix_header(header_size);
796        uint8_t *p=node->m_original_packet_data_prefix;
797        assert(p);
798
799        memcpy(p,stream_pkt , header_size);
800
801        update_mac_addr(stream,node,dir,(char *)p);
802    }
803
804
805    CDpOneStream one_stream;
806
807    one_stream.m_dp_stream = node->m_ref_stream_info;
808    one_stream.m_node =node;
809
810    lp_port->m_active_nodes.push_back(one_stream);
811
812    /* schedule only if active */
813    if (node->m_state == CGenNodeStateless::ss_ACTIVE) {
814        m_core->m_node_gen.add_node((CGenNode *)node);
815    }
816}
817
818void
819TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj,
820                                   double duration,
821                                   int event_id) {
822
823
824    TrexStatelessDpPerPort * lp_port=get_port_db(obj->get_port_id());
825    lp_port->m_active_streams = 0;
826    lp_port->set_event_id(event_id);
827
828    /* no nodes in the list */
829    assert(lp_port->m_active_nodes.size()==0);
830
831    for (auto single_stream : obj->get_objects()) {
832        /* all commands should be for the same port */
833        assert(obj->get_port_id() == single_stream.m_stream->m_port_id);
834        add_stream(lp_port,single_stream.m_stream,obj);
835    }
836
837    uint32_t nodes = lp_port->m_active_nodes.size();
838    /* find next stream */
839    assert(nodes == obj->get_objects().size());
840
841    int cnt=0;
842
843    /* set the next_stream pointer  */
844    for (auto single_stream : obj->get_objects()) {
845
846        if (single_stream.m_stream->is_dp_next_stream() ) {
847            int stream_id = single_stream.m_stream->m_next_stream_id;
848            assert(stream_id<nodes);
849            /* point to the next stream , stream_id is fixed */
850            lp_port->m_active_nodes[cnt].m_node->m_next_stream = lp_port->m_active_nodes[stream_id].m_node ;
851        }
852        cnt++;
853    }
854
855    lp_port->m_state =TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
856    m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
857
858
859    if ( duration > 0.0 ){
860        add_port_duration( duration ,obj->get_port_id(),event_id );
861    }
862
863}
864
865
866bool TrexStatelessDpCore::are_all_ports_idle(){
867
868    bool res=true;
869    int i;
870    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
871        if ( m_ports[i].m_state != TrexStatelessDpPerPort::ppSTATE_IDLE ){
872            res=false;
873        }
874    }
875    return (res);
876}
877
878
879void
880TrexStatelessDpCore::resume_traffic(uint8_t port_id){
881
882    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
883
884    lp_port->resume_traffic(port_id);
885}
886
887
888void
889TrexStatelessDpCore::pause_traffic(uint8_t port_id){
890
891    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
892
893    lp_port->pause_traffic(port_id);
894}
895
896
897void
898TrexStatelessDpCore::push_pcap(uint8_t port_id,
899                               int event_id,
900                               const std::string &pcap_filename,
901                               double ipg_usec,
902                               double speedup,
903                               uint32_t count,
904                               double duration) {
905
906    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
907
908    lp_port->set_event_id(event_id);
909
910    /* delegate the command to the port */
911    bool rc = lp_port->push_pcap(port_id, pcap_filename, ipg_usec, speedup, count);
912    if (!rc) {
913        /* report back that we stopped */
914        CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
915        TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
916                                                                       port_id,
917                                                                       event_id,
918                                                                       false);
919        ring->Enqueue((CGenNode *)event_msg);
920        return;
921    }
922
923
924    if (duration > 0.0) {
925        add_port_duration(duration, port_id, event_id);
926    }
927
928     m_state = TrexStatelessDpCore::STATE_PCAP_TX;
929}
930
931
932void
933TrexStatelessDpCore::update_traffic(uint8_t port_id, double factor) {
934
935    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
936
937    lp_port->update_traffic(port_id, factor);
938}
939
940
941void
942TrexStatelessDpCore::stop_traffic(uint8_t  port_id,
943                                  bool     stop_on_id,
944                                  int      event_id) {
945    /* we cannot remove nodes not from the top of the queue so
946       for every active node - make sure next time
947       the scheduler invokes it, it will be free */
948
949    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
950    if ( lp_port->stop_traffic(port_id,stop_on_id,event_id) == false){
951        return;
952    }
953
954
955    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
956    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
957                                                                   port_id,
958                                                                   lp_port->get_event_id());
959    ring->Enqueue((CGenNode *)event_msg);
960
961}
962
963/**
964 * handle a message from CP to DP
965 *
966 */
967void
968TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) {
969    msg->handle(this);
970    delete msg;
971}
972
973void
974TrexStatelessDpCore::barrier(uint8_t port_id, int event_id) {
975
976    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
977    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
978                                                                   port_id,
979                                                                   event_id);
980    ring->Enqueue((CGenNode *)event_msg);
981}
982
983
984/**
985 * PCAP node
986 */
987bool CGenNodePCAP::create(uint8_t port_id,
988                          pkt_dir_t dir,
989                          socket_id_t socket_id,
990                          const uint8_t *mac_addr,
991                          const std::string &pcap_filename,
992                          double ipg_usec,
993                          double speedup,
994                          uint32_t count) {
995    std::stringstream ss;
996
997    m_type       = CGenNode::PCAP_PKT;
998    m_flags      = 0;
999    m_src_port   = 0;
1000    m_port_id    = port_id;
1001    m_count      = count;
1002
1003    /* mark this node as slow path */
1004    set_slow_path(true);
1005
1006    if (ipg_usec != -1) {
1007        /* fixed IPG */
1008        m_ipg_sec = usec_to_sec(ipg_usec / speedup);
1009        m_speedup = 0;
1010    } else {
1011        /* packet IPG */
1012        m_ipg_sec = -1;
1013        m_speedup  = speedup;
1014    }
1015
1016    /* copy MAC addr info */
1017    memcpy(m_mac_addr, mac_addr, 12);
1018
1019    /* set the dir */
1020    set_mbuf_dir(dir);
1021    set_socket_id(socket_id);
1022
1023    /* create the PCAP reader */
1024    m_reader = CCapReaderFactory::CreateReader((char *)pcap_filename.c_str(), 0, ss);
1025    if (!m_reader) {
1026        return false;
1027    }
1028
1029    m_raw_packet = new CCapPktRaw();
1030    if ( m_reader->ReadPacket(m_raw_packet) == false ){
1031        /* handle error */
1032        delete m_reader;
1033        return (false);
1034    }
1035
1036    /* this is the reference time */
1037    //m_base_time = m_raw_packet->get_time();
1038    m_last_pkt_time = m_raw_packet->get_time();
1039
1040    /* ready */
1041    m_state = PCAP_ACTIVE;
1042
1043    return true;
1044}
1045
1046/**
1047 * cleanup for PCAP node
1048 *
1049 * @author imarom (08-May-16)
1050 */
1051void CGenNodePCAP::destroy() {
1052
1053    if (m_raw_packet) {
1054        delete m_raw_packet;
1055        m_raw_packet = NULL;
1056    }
1057
1058    if (m_reader) {
1059        delete m_reader;
1060        m_reader = NULL;
1061    }
1062
1063    m_state = PCAP_INVALID;
1064}
1065
1066