trex_stateless_dp_core.cpp revision 8691f401
1/*
2 Itay Marom
3 Hanoch Haim
4 Cisco Systems, Inc.
5*/
6
7/*
8Copyright (c) 2015-2016 Cisco Systems, Inc.
9
10Licensed under the Apache License, Version 2.0 (the "License");
11you may not use this file except in compliance with the License.
12You may obtain a copy of the License at
13
14    http://www.apache.org/licenses/LICENSE-2.0
15
16Unless required by applicable law or agreed to in writing, software
17distributed under the License is distributed on an "AS IS" BASIS,
18WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19See the License for the specific language governing permissions and
20limitations under the License.
21*/
22#include "bp_sim.h"
23#include "trex_stateless_dp_core.h"
24#include "trex_stateless_messaging.h"
25#include "trex_stream.h"
26#include "trex_stream_node.h"
27#include "trex_streams_compiler.h"
28
29void CDpOneStream::Delete(CFlowGenListPerThread   * core){
30    assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE);
31    core->free_node((CGenNode *)m_node);
32    delete m_dp_stream;
33    m_node=0;
34    m_dp_stream=0;
35}
36
37void CDpOneStream::DeleteOnlyStream(){
38    assert(m_dp_stream);
39    delete m_dp_stream;
40    m_dp_stream=0;
41}
42
43int CGenNodeStateless::get_stream_id(){
44    if (m_state ==CGenNodeStateless::ss_FREE_RESUSE) {
45        return (-1); // not valid
46    }
47    assert(m_ref_stream_info);
48    return ((int)m_ref_stream_info->m_stream_id);
49}
50
51
52void CGenNodeStateless::DumpHeader(FILE *fd){
53    fprintf(fd," pkt_id, time, port , action , state, stream_id , stype , m-burst# , burst# \n");
54
55}
56void CGenNodeStateless::Dump(FILE *fd){
57    fprintf(fd," %2.4f, %3lu, %s,%s, %3d, %s, %3lu, %3lu  \n",
58            m_time,
59            (ulong)m_port_id,
60            "s-pkt", //action
61            get_stream_state_str(m_state ).c_str(),
62            get_stream_id(),   //stream_id
63            TrexStream::get_stream_type_str(m_stream_type).c_str(), //stype
64            (ulong)m_multi_bursts,
65            (ulong)m_single_burst
66            );
67}
68
69
70
71void CGenNodeStateless::refresh_vm_bss(){
72    if ( m_vm_flow_var ) {
73        StreamVmDp  * vm_s=m_ref_stream_info->m_vm_dp;
74        assert(vm_s);
75        memcpy(m_vm_flow_var,vm_s->get_bss(),vm_s->get_bss_size());
76
77        if ( vm_s->is_random_seed() ){
78            /* if we have random seed for this program */
79            if (m_ref_stream_info->m_random_seed) {
80                set_random_seed(m_ref_stream_info->m_random_seed);
81            }
82        }
83    }
84}
85
86
87/**
88 * this function called when stream restart after it was inactive
89 */
90void CGenNodeStateless::refresh(){
91
92    /* refill the stream info */
93    m_single_burst    = m_single_burst_refill;
94    m_multi_bursts    = m_ref_stream_info->m_num_bursts;
95    m_state           = CGenNodeStateless::ss_ACTIVE;
96
97    /* refresh init value */
98#if 0
99    /* TBD should add a JSON varible for that */
100    refresh_vm_bss();
101#endif
102}
103
104
105void CGenNodeCommand::free_command(){
106
107    assert(m_cmd);
108    m_cmd->on_node_remove();
109    delete m_cmd;
110}
111
112
113std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state){
114    std::string res;
115
116    switch (stream_state) {
117    case CGenNodeStateless::ss_FREE_RESUSE :
118         res="FREE    ";
119        break;
120    case CGenNodeStateless::ss_INACTIVE :
121        res="INACTIVE ";
122        break;
123    case CGenNodeStateless::ss_ACTIVE :
124        res="ACTIVE   ";
125        break;
126    default:
127        res="Unknow   ";
128    };
129    return(res);
130}
131
132
133rte_mbuf_t   * CGenNodeStateless::alloc_node_with_vm(){
134
135    rte_mbuf_t        * m;
136    /* alloc small packet buffer*/
137    uint16_t prefix_size = prefix_header_size();
138    m = CGlobalInfo::pktmbuf_alloc( get_socket_id(), prefix_size );
139    if (m==0) {
140        return (m);
141    }
142    /* TBD remove this, should handle cases of error */
143    assert(m);
144    char *p=rte_pktmbuf_append(m, prefix_size);
145    memcpy( p ,m_original_packet_data_prefix, prefix_size);
146
147
148    /* run the VM program */
149    StreamDPVmInstructionsRunner runner;
150
151    runner.run( (uint32_t*)m_vm_flow_var,
152                m_vm_program_size,
153                m_vm_program,
154                m_vm_flow_var,
155                (uint8_t*)p);
156
157    uint16_t pkt_new_size=runner.get_new_pkt_size();
158    if ( likely( pkt_new_size == 0) ) {
159        /* no packet size change */
160        rte_mbuf_t * m_const = get_const_mbuf();
161        if (  m_const != NULL) {
162            utl_rte_pktmbuf_add_after(m,m_const);
163        }
164        return (m);
165    }
166
167    /* packet size change there are a few changes */
168    rte_mbuf_t * m_const = get_const_mbuf();
169    if ( (m_const == 0 ) || (pkt_new_size<=prefix_size) ) {
170        /* one mbuf , just trim it */
171        m->data_len = pkt_new_size;
172        m->pkt_len  = pkt_new_size;
173        return (m);
174    }
175
176    rte_mbuf_t * mi= CGlobalInfo::pktmbuf_alloc_small(get_socket_id());
177    assert(mi);
178    rte_pktmbuf_attach(mi,m_const);
179    utl_rte_pktmbuf_add_after2(m,mi);
180
181    if ( pkt_new_size < m->pkt_len) {
182        /* need to trim it */
183        mi->data_len = (pkt_new_size - prefix_size);
184        m->pkt_len   = pkt_new_size;
185    }
186    return (m);
187}
188
189
190void CGenNodeStateless::free_stl_node(){
191    /* if we have cache mbuf free it */
192    rte_mbuf_t * m=get_cache_mbuf();
193    if (m) {
194        rte_pktmbuf_free(m);
195        m_cache_mbuf=0;
196    }else{
197        /* non cache - must have an header */
198         m=get_const_mbuf();
199         if (m) {
200             rte_pktmbuf_free(m); /* reduce the ref counter */
201         }
202         free_prefix_header();
203    }
204    if (m_vm_flow_var) {
205        /* free flow var */
206        free(m_vm_flow_var);
207        m_vm_flow_var=0;
208    }
209}
210
211
212bool TrexStatelessDpPerPort::update_number_of_active_streams(uint32_t d){
213    m_active_streams-=d; /* reduce the number of streams */
214    if (m_active_streams == 0) {
215        return (true);
216    }
217    return (false);
218}
219
220bool TrexStatelessDpPerPort::resume_traffic(uint8_t port_id){
221
222    /* we are working with continues streams so we must be in transmit mode */
223    assert(m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE);
224
225    for (auto dp_stream : m_active_nodes) {
226        CGenNodeStateless * node =dp_stream.m_node;
227        assert(node->get_port_id() == port_id);
228        assert(node->is_pause() == true);
229        node->set_pause(false);
230    }
231    m_state = TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
232    return (true);
233}
234
235bool TrexStatelessDpPerPort::update_traffic(uint8_t port_id, double factor) {
236
237    assert( (m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING ||
238            (m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE)) );
239
240    for (auto dp_stream : m_active_nodes) {
241        CGenNodeStateless * node = dp_stream.m_node;
242        assert(node->get_port_id() == port_id);
243
244        node->update_rate(factor);
245    }
246
247    return (true);
248}
249
250bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){
251
252    /* we are working with continues streams so we must be in transmit mode */
253    assert(m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING);
254
255    for (auto dp_stream : m_active_nodes) {
256        CGenNodeStateless * node =dp_stream.m_node;
257        assert(node->get_port_id() == port_id);
258        assert(node->is_pause() == false);
259        node->set_pause(true);
260    }
261    m_state = TrexStatelessDpPerPort::ppSTATE_PAUSE;
262    return (true);
263}
264
265bool TrexStatelessDpPerPort::push_pcap(uint8_t port_id, const std::string &pcap_filename){
266
267    /* push pcap can only happen on an idle port from the core prespective */
268    assert(m_state == TrexStatelessDpPerPort::ppSTATE_IDLE);
269
270    CGenNodePCAP *pcap_node = m_core->allocate_pcap_node();
271    if (!pcap_node) {
272        return (false);
273    }
274
275    pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(port_id);
276
277    uint8_t mac_addr[12];
278    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, mac_addr);
279
280    bool rc = pcap_node->create(port_id, pcap_filename, dir, mac_addr);
281    if (!rc) {
282        m_core->free_node((CGenNode *)pcap_node);
283        return (false);
284    }
285
286    /* schedule the node for now */
287    pcap_node->m_time = m_core->m_cur_time_sec;
288    m_core->m_node_gen.add_node((CGenNode *)pcap_node);
289
290    m_state = TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
291
292    return (true);
293}
294
295
296bool TrexStatelessDpPerPort::stop_traffic(uint8_t  port_id,
297                                          bool     stop_on_id,
298                                          int      event_id){
299
300
301    if (m_state == TrexStatelessDpPerPort::ppSTATE_IDLE) {
302        assert(m_active_streams==0);
303        return false;
304    }
305
306    /* there could be race of stop after stop */
307    if ( stop_on_id ) {
308        if (event_id != m_event_id){
309            /* we can't stop it is an old message */
310            return false;
311        }
312    }
313
314    for (auto dp_stream : m_active_nodes) {
315        CGenNodeStateless * node =dp_stream.m_node;
316        assert(node->get_port_id() == port_id);
317        if ( node->get_state() == CGenNodeStateless::ss_ACTIVE) {
318            node->mark_for_free();
319            m_active_streams--;
320            dp_stream.DeleteOnlyStream();
321
322        }else{
323            dp_stream.Delete(m_core);
324        }
325    }
326
327    /* active stream should be zero */
328    assert(m_active_streams==0);
329    m_active_nodes.clear();
330    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
331    return (true);
332}
333
334
335void TrexStatelessDpPerPort::create(CFlowGenListPerThread   *  core){
336    m_core=core;
337    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
338    m_active_streams=0;
339    m_active_nodes.clear();
340}
341
342
343
344void
345TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) {
346    m_thread_id = thread_id;
347    m_core = core;
348    m_local_port_offset = 2*core->getDualPortId();
349
350    CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
351
352    m_ring_from_cp = cp_dp->getRingCpToDp(thread_id);
353    m_ring_to_cp   = cp_dp->getRingDpToCp(thread_id);
354
355    m_state = STATE_IDLE;
356
357    int i;
358    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
359        m_ports[i].create(core);
360    }
361}
362
363
364/* move to the next stream, old stream move to INACTIVE */
365bool TrexStatelessDpCore::set_stateless_next_node(CGenNodeStateless * cur_node,
366                                                  CGenNodeStateless * next_node){
367
368    assert(cur_node);
369    TrexStatelessDpPerPort * lp_port = get_port_db(cur_node->m_port_id);
370    bool schedule =false;
371
372    bool to_stop_port=false;
373
374    if (next_node == NULL) {
375        /* there is no next stream , reduce the number of active streams*/
376        to_stop_port = lp_port->update_number_of_active_streams(1);
377
378    }else{
379        uint8_t state=next_node->get_state();
380
381        /* can't be FREE_RESUSE */
382        assert(state != CGenNodeStateless::ss_FREE_RESUSE);
383        if (state == CGenNodeStateless::ss_INACTIVE ) {
384
385            if (cur_node->m_action_counter > 0) {
386                cur_node->m_action_counter--;
387                if (cur_node->m_action_counter==0) {
388                    to_stop_port = lp_port->update_number_of_active_streams(1);
389                }else{
390                    /* refill start info and scedule, no update in active streams  */
391                    next_node->refresh();
392                    schedule = true;
393                }
394            }else{
395                /* refill start info and scedule, no update in active streams  */
396                next_node->refresh();
397                schedule = true;
398            }
399
400        }else{
401            to_stop_port = lp_port->update_number_of_active_streams(1);
402        }
403    }
404
405    if ( to_stop_port ) {
406        /* call stop port explictly to move the state */
407        stop_traffic(cur_node->m_port_id,false,0);
408    }
409
410    return ( schedule );
411}
412
413
414
415/**
416 * in idle state loop, the processor most of the time sleeps
417 * and periodically checks for messages
418 *
419 * @author imarom (01-Nov-15)
420 */
421void
422TrexStatelessDpCore::idle_state_loop() {
423
424    const int SHORT_DELAY_MS    = 2;
425    const int LONG_DELAY_MS     = 50;
426    const int DEEP_SLEEP_LIMIT  = 2000;
427
428    int counter = 0;
429
430    while (m_state == STATE_IDLE) {
431        m_core->m_node_gen.m_v_if->flush_dp_rx_queue();
432        bool had_msg = periodic_check_for_cp_messages();
433        if (had_msg) {
434            counter = 0;
435            continue;
436        }
437
438        /* enter deep sleep only if enough time had passed */
439        if (counter < DEEP_SLEEP_LIMIT) {
440            delay(SHORT_DELAY_MS);
441            counter++;
442        } else {
443            delay(LONG_DELAY_MS);
444        }
445
446    }
447}
448
449
450
451void TrexStatelessDpCore::quit_main_loop(){
452    m_core->set_terminate_mode(true); /* mark it as terminated */
453    m_state = STATE_TERMINATE;
454    add_global_duration(0.0001);
455}
456
457
458/**
459 * scehduler runs when traffic exists
460 * it will return when no more transmitting is done on this
461 * core
462 *
463 * @author imarom (01-Nov-15)
464 */
465void
466TrexStatelessDpCore::start_scheduler() {
467    /* creates a maintenace job using the scheduler */
468    CGenNode * node_sync = m_core->create_node() ;
469    node_sync->m_type = CGenNode::FLOW_SYNC;
470    node_sync->m_time = m_core->m_cur_time_sec + SYNC_TIME_OUT;
471    m_core->m_node_gen.add_node(node_sync);
472
473    double old_offset = 0.0;
474    m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset);
475    /* bail out in case of terminate */
476    if (m_state != TrexStatelessDpCore::STATE_TERMINATE) {
477        m_core->m_node_gen.close_file(m_core);
478        m_state = STATE_IDLE; /* we exit from all ports and we have nothing to do, we move to IDLE state */
479    }
480}
481
482
483void
484TrexStatelessDpCore::run_once(){
485
486    idle_state_loop();
487
488    if ( m_state == STATE_TERMINATE ){
489        return;
490    }
491
492    start_scheduler();
493}
494
495
496
497
498void
499TrexStatelessDpCore::start() {
500
501    while (true) {
502        run_once();
503
504        if ( m_core->is_terminated_by_master() ) {
505            break;
506        }
507    }
508}
509
510/* only if both port are idle we can exit */
511void
512TrexStatelessDpCore::schedule_exit(){
513
514    CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
515
516    node->m_type = CGenNode::COMMAND;
517
518    node->m_cmd = new TrexStatelessDpCanQuit();
519
520    /* make sure it will be scheduled after the current node */
521    node->m_time = m_core->m_cur_time_sec ;
522
523    m_core->m_node_gen.add_node((CGenNode *)node);
524}
525
526
527void
528TrexStatelessDpCore::add_global_duration(double duration){
529    if (duration > 0.0) {
530        CGenNode *node = m_core->create_node() ;
531
532        node->m_type = CGenNode::EXIT_SCHED;
533
534        /* make sure it will be scheduled after the current node */
535        node->m_time = m_core->m_cur_time_sec + duration ;
536
537        m_core->m_node_gen.add_node(node);
538    }
539}
540
541/* add per port exit */
542void
543TrexStatelessDpCore::add_port_duration(double duration,
544                                       uint8_t port_id,
545                                       int event_id){
546    if (duration > 0.0) {
547        CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
548
549        node->m_type = CGenNode::COMMAND;
550
551        /* make sure it will be scheduled after the current node */
552        node->m_time = m_core->m_cur_time_sec + duration ;
553
554        TrexStatelessDpStop * cmd=new TrexStatelessDpStop(port_id);
555
556
557        /* test this */
558        m_core->m_non_active_nodes++;
559        cmd->set_core_ptr(m_core);
560        cmd->set_event_id(event_id);
561        cmd->set_wait_for_event_id(true);
562
563        node->m_cmd = cmd;
564
565        m_core->m_node_gen.add_node((CGenNode *)node);
566    }
567}
568
569
570void TrexStatelessDpCore::update_mac_addr(TrexStream * stream,
571                                          CGenNodeStateless *node,
572                                          pkt_dir_t dir,
573                                          char *raw_pkt){
574    bool              ov_src = stream->get_override_src_mac_by_pkt_data();
575    TrexStream::stream_dst_mac_t  ov_dst = stream->get_override_dst_mac_mode();
576
577
578    if ( (ov_src == true) && (ov_dst == TrexStream::stPKT) ) {
579        /* nothing to do, take from the packet both */
580        return;
581    }
582
583        /* take from cfg_file */
584    if ( (ov_src == false) &&
585         (ov_dst == TrexStream::stCFG_FILE) ){
586
587          m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*)raw_pkt);
588          return;
589    }
590
591    /* save the pkt*/
592    char tmp_pkt[12];
593    memcpy(tmp_pkt,raw_pkt,12);
594
595    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*)raw_pkt);
596
597    if ((ov_src == true) && (ov_dst == TrexStream::stCFG_FILE)) {
598        memcpy(raw_pkt+6,tmp_pkt+6,6);
599    }
600
601    if ((ov_src == false) && (ov_dst == TrexStream::stPKT)) {
602        memcpy(raw_pkt,tmp_pkt,6);
603    }
604}
605
606
607void
608TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port,
609                                TrexStream * stream,
610                                TrexStreamsCompiledObj *comp) {
611
612    CGenNodeStateless *node = m_core->create_node_sl();
613
614    /* add periodic */
615    node->m_cache_mbuf=0;
616    node->m_type = CGenNode::STATELESS_PKT;
617
618    node->m_action_counter = stream->m_action_count;
619
620    /* clone the stream from control plane memory to DP memory */
621    node->m_ref_stream_info = stream->clone();
622    /* no need for this memory anymore on the control plane memory */
623    stream->release_dp_object();
624
625    node->m_next_stream=0; /* will be fixed later */
626
627    if ( stream->m_self_start ){
628        /* if self start it is in active mode */
629        node->m_state =CGenNodeStateless::ss_ACTIVE;
630        lp_port->m_active_streams++;
631    }else{
632        node->m_state =CGenNodeStateless::ss_INACTIVE;
633    }
634
635    node->m_time = m_core->m_cur_time_sec + stream->get_start_delay_sec();
636
637    pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(stream->m_port_id);
638    node->m_flags = 0;
639    node->m_src_port =0;
640    node->m_original_packet_data_prefix = 0;
641
642    if (stream->m_rx_check.m_enabled) {
643        node->set_stat_needed();
644        uint8_t hw_id = stream->m_rx_check.m_hw_id;
645        assert (hw_id < MAX_FLOW_STATS);
646        node->set_stat_hw_id(hw_id);
647    }
648
649    /* set socket id */
650    node->set_socket_id(m_core->m_node_gen.m_socket_id);
651
652    /* build a mbuf from a packet */
653
654    uint16_t pkt_size = stream->m_pkt.len;
655    const uint8_t *stream_pkt = stream->m_pkt.binary;
656
657    node->m_pause =0;
658    node->m_stream_type = stream->m_type;
659    node->m_next_time_offset = 1.0 / stream->get_pps();
660    node->m_null_stream = (stream->m_null_stream ? 1 : 0);
661
662    /* stateless specific fields */
663    switch ( stream->m_type ) {
664
665    case TrexStream::stCONTINUOUS :
666        node->m_single_burst=0;
667        node->m_single_burst_refill=0;
668        node->m_multi_bursts=0;
669        break;
670
671    case TrexStream::stSINGLE_BURST :
672        node->m_stream_type             = TrexStream::stMULTI_BURST;
673        node->m_single_burst            = stream->m_burst_total_pkts;
674        node->m_single_burst_refill     = stream->m_burst_total_pkts;
675        node->m_multi_bursts            = 1;  /* single burst in multi burst of 1 */
676        break;
677
678    case TrexStream::stMULTI_BURST :
679        node->m_single_burst        = stream->m_burst_total_pkts;
680        node->m_single_burst_refill = stream->m_burst_total_pkts;
681        node->m_multi_bursts        = stream->m_num_bursts;
682        break;
683    default:
684
685        assert(0);
686    };
687
688    node->m_port_id = stream->m_port_id;
689
690    /* set dir 0 or 1 client or server */
691    node->set_mbuf_cache_dir(dir);
692
693
694    if (node->m_ref_stream_info->getDpVm() == NULL) {
695        /* no VM */
696
697        node->m_vm_flow_var =  NULL;
698        node->m_vm_program  =  NULL;
699        node->m_vm_program_size =0;
700
701                /* allocate const mbuf */
702        rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size);
703        assert(m);
704
705        char *p = rte_pktmbuf_append(m, pkt_size);
706        assert(p);
707        /* copy the packet */
708        memcpy(p,stream_pkt,pkt_size);
709
710        update_mac_addr(stream,node,dir,p);
711
712        /* set the packet as a readonly */
713        node->set_cache_mbuf(m);
714
715        node->m_original_packet_data_prefix =0;
716    }else{
717
718        /* set the program */
719        TrexStream * local_mem_stream = node->m_ref_stream_info;
720
721        StreamVmDp  * lpDpVm = local_mem_stream->getDpVm();
722
723        node->m_vm_flow_var      = lpDpVm->clone_bss(); /* clone the flow var */
724        node->m_vm_program       = lpDpVm->get_program(); /* same ref to the program */
725        node->m_vm_program_size  = lpDpVm->get_program_size();
726
727
728        /* set the random seed if was set */
729        if ( lpDpVm->is_random_seed() ){
730            /* if we have random seed for this program */
731            if (stream->m_random_seed) {
732                node->set_random_seed(stream->m_random_seed);
733            }
734        }
735
736        /* we need to copy the object */
737        if ( pkt_size > lpDpVm->get_prefix_size() ) {
738            /* we need const packet */
739            uint16_t const_pkt_size  = pkt_size - lpDpVm->get_prefix_size() ;
740            rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), const_pkt_size );
741            assert(m);
742
743            char *p = rte_pktmbuf_append(m, const_pkt_size);
744            assert(p);
745
746            /* copy packet data */
747            memcpy(p,(stream_pkt + lpDpVm->get_prefix_size()),const_pkt_size);
748
749            node->set_const_mbuf(m);
750        }
751
752
753        if ( lpDpVm->is_pkt_size_var() ) {
754            // mark the node as varible size
755            node->set_var_pkt_size();
756        }
757
758
759        if (lpDpVm->get_prefix_size() > pkt_size ) {
760            lpDpVm->set_prefix_size(pkt_size);
761        }
762
763        /* copy the headr */
764        uint16_t header_size = lpDpVm->get_prefix_size();
765        assert(header_size);
766        node->alloc_prefix_header(header_size);
767        uint8_t *p=node->m_original_packet_data_prefix;
768        assert(p);
769
770        memcpy(p,stream_pkt , header_size);
771
772        update_mac_addr(stream,node,dir,(char *)p);
773    }
774
775
776    CDpOneStream one_stream;
777
778    one_stream.m_dp_stream = node->m_ref_stream_info;
779    one_stream.m_node =node;
780
781    lp_port->m_active_nodes.push_back(one_stream);
782
783    /* schedule only if active */
784    if (node->m_state == CGenNodeStateless::ss_ACTIVE) {
785        m_core->m_node_gen.add_node((CGenNode *)node);
786    }
787}
788
789void
790TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj,
791                                   double duration,
792                                   int event_id) {
793
794
795    TrexStatelessDpPerPort * lp_port=get_port_db(obj->get_port_id());
796    lp_port->m_active_streams = 0;
797    lp_port->set_event_id(event_id);
798
799    /* no nodes in the list */
800    assert(lp_port->m_active_nodes.size()==0);
801
802    for (auto single_stream : obj->get_objects()) {
803        /* all commands should be for the same port */
804        assert(obj->get_port_id() == single_stream.m_stream->m_port_id);
805        add_stream(lp_port,single_stream.m_stream,obj);
806    }
807
808    uint32_t nodes = lp_port->m_active_nodes.size();
809    /* find next stream */
810    assert(nodes == obj->get_objects().size());
811
812    int cnt=0;
813
814    /* set the next_stream pointer  */
815    for (auto single_stream : obj->get_objects()) {
816
817        if (single_stream.m_stream->is_dp_next_stream() ) {
818            int stream_id = single_stream.m_stream->m_next_stream_id;
819            assert(stream_id<nodes);
820            /* point to the next stream , stream_id is fixed */
821            lp_port->m_active_nodes[cnt].m_node->m_next_stream = lp_port->m_active_nodes[stream_id].m_node ;
822        }
823        cnt++;
824    }
825
826    lp_port->m_state =TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
827    m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
828
829
830    if ( duration > 0.0 ){
831        add_port_duration( duration ,obj->get_port_id(),event_id );
832    }
833
834}
835
836
837bool TrexStatelessDpCore::are_all_ports_idle(){
838
839    bool res=true;
840    int i;
841    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
842        if ( m_ports[i].m_state != TrexStatelessDpPerPort::ppSTATE_IDLE ){
843            res=false;
844        }
845    }
846    return (res);
847}
848
849
850void
851TrexStatelessDpCore::resume_traffic(uint8_t port_id){
852
853    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
854
855    lp_port->resume_traffic(port_id);
856}
857
858
859void
860TrexStatelessDpCore::pause_traffic(uint8_t port_id){
861
862    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
863
864    lp_port->pause_traffic(port_id);
865}
866
867
868void
869TrexStatelessDpCore::push_pcap(uint8_t port_id, int event_id, const std::string &pcap_filename) {
870
871    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
872
873    lp_port->set_event_id(event_id);
874
875    /* delegate the command to the port */
876    bool rc = lp_port->push_pcap(port_id, pcap_filename);
877    if (!rc) {
878        /* report back that we stopped */
879        CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
880        TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
881                                                                       port_id,
882                                                                       event_id,
883                                                                       false);
884        ring->Enqueue((CGenNode *)event_msg);
885        return;
886    }
887
888    m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
889
890    #if 0
891    if ( duration > 0.0 ){
892        add_port_duration(duration, port_id, event_id);
893    }
894    #endif
895}
896
897
898void
899TrexStatelessDpCore::update_traffic(uint8_t port_id, double factor) {
900
901    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
902
903    lp_port->update_traffic(port_id, factor);
904}
905
906
907void
908TrexStatelessDpCore::stop_traffic(uint8_t  port_id,
909                                  bool     stop_on_id,
910                                  int      event_id) {
911    /* we cannot remove nodes not from the top of the queue so
912       for every active node - make sure next time
913       the scheduler invokes it, it will be free */
914
915    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
916
917    if ( lp_port->stop_traffic(port_id,stop_on_id,event_id) == false){
918        /* nothing to do ! already stopped */
919        //printf(" skip .. %f\n",m_core->m_cur_time_sec);
920        return;
921    }
922
923    /* inform the control plane we stopped - this might be a async stop
924       (streams ended)
925    */
926    #if 0
927    if ( are_all_ports_idle() ) {
928        /* just a place holder if we will need to do somthing in that case */
929    }
930    #endif
931
932    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
933    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
934                                                                   port_id,
935                                                                   lp_port->get_event_id());
936    ring->Enqueue((CGenNode *)event_msg);
937
938}
939
940/**
941 * handle a message from CP to DP
942 *
943 */
944void
945TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) {
946    msg->handle(this);
947    delete msg;
948}
949
950void
951TrexStatelessDpCore::barrier(uint8_t port_id, int event_id) {
952
953    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
954    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
955                                                                   port_id,
956                                                                   event_id);
957    ring->Enqueue((CGenNode *)event_msg);
958}
959
960
961/**
962 * PCAP node
963 */
964bool CGenNodePCAP::create(uint8_t port_id, const std::string &pcap_filename, pkt_dir_t dir, const uint8_t *mac_addr) {
965    std::stringstream ss;
966
967    m_type       = CGenNode::PCAP_PKT;
968    m_flags      = 0;
969    m_src_port   = 0;
970    m_port_id    = port_id;
971
972    /* copy MAC addr info */
973    memcpy(m_mac_addr, mac_addr, 12);
974
975    /* set the dir */
976    set_mbuf_dir(dir);
977
978    /* create the PCAP reader */
979    m_reader = CCapReaderFactory::CreateReader((char *)pcap_filename.c_str(), 0, ss);
980    if (!m_reader) {
981        return false;
982    }
983
984    m_raw_packet = new CCapPktRaw();
985    if ( m_reader->ReadPacket(m_raw_packet) == false ){
986        /* handle error */
987        delete m_reader;
988        return (false);
989    }
990
991    /* this is the reference time */
992    //m_base_time = m_raw_packet->get_time();
993    m_last_pkt_time = m_raw_packet->get_time();
994
995    /* ready */
996    m_state = PCAP_ACTIVE;
997
998    return true;
999}
1000
1001void CGenNodePCAP::destroy() {
1002
1003    if (m_raw_packet) {
1004        delete m_raw_packet;
1005        m_raw_packet = NULL;
1006    }
1007
1008    if (m_reader) {
1009        delete m_reader;
1010        m_reader = NULL;
1011    }
1012
1013    m_state = PCAP_INVALID;
1014}
1015
1016