trex_stateless_dp_core.cpp revision 03d70c42
1/*
2 Itay Marom
3 Hanoch Haim
4 Cisco Systems, Inc.
5*/
6
7/*
8Copyright (c) 2015-2016 Cisco Systems, Inc.
9
10Licensed under the Apache License, Version 2.0 (the "License");
11you may not use this file except in compliance with the License.
12You may obtain a copy of the License at
13
14    http://www.apache.org/licenses/LICENSE-2.0
15
16Unless required by applicable law or agreed to in writing, software
17distributed under the License is distributed on an "AS IS" BASIS,
18WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19See the License for the specific language governing permissions and
20limitations under the License.
21*/
22#include "bp_sim.h"
23#include "trex_stateless_dp_core.h"
24#include "trex_stateless_messaging.h"
25#include "trex_stream.h"
26#include "trex_stream_node.h"
27#include "trex_streams_compiler.h"
28#include "mbuf.h"
29
30
31
32
33void CGenNodeStateless::cache_mbuf_array_init(){
34    m_cache_size=0;
35    m_cache_array_cnt=0;
36}
37
38
39
40void CGenNodeStateless::cache_mbuf_array_copy(CGenNodeCacheMbuf *obj,
41                                              uint16_t size){
42
43    int i;
44    cache_mbuf_array_alloc(size);
45    for (i=0; i<size; i++) {
46        cache_mbuf_array_set(i,obj->m_array[i]);
47    }
48    cache_mbuf_array_set_const_mbuf(obj->m_mbuf_const);
49}
50
51
52rte_mbuf_t ** CGenNodeStateless::cache_mbuf_array_alloc(uint16_t size){
53
54    uint32_t buf_size = CGenNodeCacheMbuf::get_object_size(size);
55    /* TBD  replace with align, zero API */
56    m_cache_mbuf = (void *)malloc(buf_size);
57    assert(m_cache_mbuf);
58    memset(m_cache_mbuf,0,buf_size);
59
60    m_flags |= SL_NODE_CONST_MBUF_CACHE_ARRAY;
61    m_cache_size=size;
62    m_cache_array_cnt=0;
63    return ((rte_mbuf_t **)m_cache_mbuf);
64}
65
66void CGenNodeStateless::cache_mbuf_array_free(){
67
68    assert(m_cache_mbuf);
69    int i;
70    for (i=0; i<(int)m_cache_size; i++) {
71        rte_mbuf_t * m=cache_mbuf_array_get((uint16_t)i);
72        assert(m);
73        rte_pktmbuf_free(m);
74    }
75
76    /* free the const */
77    rte_mbuf_t * m=cache_mbuf_array_get_const_mbuf() ;
78    if (m) {
79        rte_pktmbuf_free(m);
80    }
81
82    free(m_cache_mbuf);
83    m_cache_mbuf=0;
84}
85
86
87rte_mbuf_t * CGenNodeStateless::cache_mbuf_array_get(uint16_t index){
88
89    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
90    return (p->m_array[index]);
91}
92
93void CGenNodeStateless::cache_mbuf_array_set_const_mbuf(rte_mbuf_t * m){
94    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
95    p->m_mbuf_const=m;
96}
97
98rte_mbuf_t * CGenNodeStateless::cache_mbuf_array_get_const_mbuf(){
99    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
100    return (p->m_mbuf_const);
101}
102
103
104void CGenNodeStateless::cache_mbuf_array_set(uint16_t index,
105                                             rte_mbuf_t * m){
106    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
107    p->m_array[index]=m;
108}
109
110
111void CDpOneStream::Delete(CFlowGenListPerThread   * core){
112    assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE);
113    core->free_node((CGenNode *)m_node);
114    delete m_dp_stream;
115    m_node=0;
116    m_dp_stream=0;
117}
118
119void CDpOneStream::DeleteOnlyStream(){
120    assert(m_dp_stream);
121    delete m_dp_stream;
122    m_dp_stream=0;
123}
124
125int CGenNodeStateless::get_stream_id(){
126    if (m_state ==CGenNodeStateless::ss_FREE_RESUSE) {
127        return (-1); // not valid
128    }
129    assert(m_ref_stream_info);
130    return ((int)m_ref_stream_info->m_stream_id);
131}
132
133
134void CGenNodeStateless::DumpHeader(FILE *fd){
135    fprintf(fd," pkt_id, time, port , action , state, stream_id , stype , m-burst# , burst# \n");
136
137}
138void CGenNodeStateless::Dump(FILE *fd){
139    fprintf(fd," %2.4f, %3lu, %s,%s, %3d, %s, %3lu, %3lu  \n",
140            m_time,
141            (ulong)m_port_id,
142            "s-pkt", //action
143            get_stream_state_str(m_state ).c_str(),
144            get_stream_id(),   //stream_id
145            TrexStream::get_stream_type_str(m_stream_type).c_str(), //stype
146            (ulong)m_multi_bursts,
147            (ulong)m_single_burst
148            );
149}
150
151
152
153void CGenNodeStateless::refresh_vm_bss(){
154    if ( m_vm_flow_var ) {
155        StreamVmDp  * vm_s=m_ref_stream_info->m_vm_dp;
156        assert(vm_s);
157        memcpy(m_vm_flow_var,vm_s->get_bss(),vm_s->get_bss_size());
158
159        if ( vm_s->is_random_seed() ){
160            /* if we have random seed for this program */
161            if (m_ref_stream_info->m_random_seed) {
162                set_random_seed(m_ref_stream_info->m_random_seed);
163            }
164        }
165    }
166}
167
168
169/**
170 * this function called when stream restart after it was inactive
171 */
172void CGenNodeStateless::refresh(){
173
174    /* refill the stream info */
175    m_single_burst    = m_single_burst_refill;
176    m_multi_bursts    = m_ref_stream_info->m_num_bursts;
177    m_state           = CGenNodeStateless::ss_ACTIVE;
178
179    /* refresh init value */
180#if 0
181    /* TBD should add a JSON varible for that */
182    refresh_vm_bss();
183#endif
184}
185
186
187void CGenNodeCommand::free_command(){
188
189    assert(m_cmd);
190    m_cmd->on_node_remove();
191    delete m_cmd;
192}
193
194
195std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state){
196    std::string res;
197
198    switch (stream_state) {
199    case CGenNodeStateless::ss_FREE_RESUSE :
200         res="FREE    ";
201        break;
202    case CGenNodeStateless::ss_INACTIVE :
203        res="INACTIVE ";
204        break;
205    case CGenNodeStateless::ss_ACTIVE :
206        res="ACTIVE   ";
207        break;
208    default:
209        res="Unknow   ";
210    };
211    return(res);
212}
213
214/*
215 * Allocate mbuf for flow stat (and latency) info sending
216 * m - Original mbuf (can be complicated mbuf data structure)
217 * fsp_head - return pointer in which the flow stat info should be filled
218 * is_const - is the given mbuf const
219 * return new mbuf structure in which the fsp_head can be written. If needed, orginal mbuf is freed.
220 */
221rte_mbuf_t * CGenNodeStateless::alloc_flow_stat_mbuf(rte_mbuf_t *m, struct flow_stat_payload_header *&fsp_head
222                                                     , bool is_const) {
223    rte_mbuf_t *m_ret = NULL, *m_lat = NULL;
224    uint16_t fsp_head_size = sizeof(struct flow_stat_payload_header);
225
226    if (is_const) {
227        // const mbuf case
228        if (rte_pktmbuf_data_len(m) > 128) {
229            m_ret = CGlobalInfo::pktmbuf_alloc_small(get_socket_id());
230            assert(m_ret);
231            // alloc mbuf just for the latency header
232            m_lat = CGlobalInfo::pktmbuf_alloc( get_socket_id(), fsp_head_size);
233            assert(m_lat);
234            fsp_head = (struct flow_stat_payload_header *)rte_pktmbuf_append(m_lat, fsp_head_size);
235            rte_pktmbuf_attach(m_ret, m);
236            rte_pktmbuf_trim(m_ret, sizeof(struct flow_stat_payload_header));
237            utl_rte_pktmbuf_add_after2(m_ret, m_lat);
238            return m_ret;
239        } else {
240            // Short packet. Just copy all bytes.
241            m_ret = CGlobalInfo::pktmbuf_alloc( get_socket_id(), rte_pktmbuf_data_len(m) );
242            assert(m_ret);
243            char *p = rte_pktmbuf_mtod(m, char*);
244            char *p_new = rte_pktmbuf_append(m_ret, rte_pktmbuf_data_len(m));
245            memcpy(p_new , p, rte_pktmbuf_data_len(m));
246            fsp_head = (struct flow_stat_payload_header *)(p_new + rte_pktmbuf_data_len(m) - fsp_head_size);
247            rte_pktmbuf_free(m);
248            return m_ret;
249        }
250    } else {
251        // Field engine (vm)
252        if (rte_pktmbuf_is_contiguous(m)) {
253            // one, r/w mbuf
254            char *p = rte_pktmbuf_mtod(m, char*);
255            fsp_head = (struct flow_stat_payload_header *)(p + rte_pktmbuf_data_len(m) - fsp_head_size);
256            return m;
257        } else {
258            // r/w --> read only. Should do something like:
259            // Alloc indirect,. make r/w->indirect point to read_only) -> new fsp_header
260            // for the mean time, just copy the entire packet.
261            m_ret = CGlobalInfo::pktmbuf_alloc( get_socket_id(), rte_pktmbuf_pkt_len(m) );
262            assert(m_ret);
263            char *p_new = rte_pktmbuf_append(m_ret, rte_pktmbuf_pkt_len(m));
264            rte_mbuf_t *m_free = m;
265            while (m != NULL) {
266                char *p = rte_pktmbuf_mtod(m, char*);
267                memcpy(p_new, p, m->data_len);
268                p_new += m->data_len;
269                m = m->next;
270            }
271            p_new = rte_pktmbuf_mtod(m_ret, char*);
272            fsp_head = (struct flow_stat_payload_header *)(p_new + rte_pktmbuf_data_len(m_ret) - fsp_head_size);
273            rte_pktmbuf_free(m_free);
274            return m_ret;
275        }
276    }
277}
278
279// test the const case of alloc_flow_stat_mbuf. The more complicated non const case is tested in the simulation.
280bool CGenNodeStateless::alloc_flow_stat_mbuf_test_const() {
281    rte_mbuf_t *m, *m_test;
282    uint16_t sizes[2] = {64, 500};
283    uint16_t size;
284    struct flow_stat_payload_header *fsp_head;
285    char *p;
286
287    set_socket_id(0);
288    for (int test_num = 0; test_num < sizeof(sizes)/sizeof(sizes[0]); test_num++) {
289        size = sizes[test_num];
290        m = CGlobalInfo::pktmbuf_alloc(get_socket_id(), size);
291        p = rte_pktmbuf_append(m, size);
292        for (int i = 0; i < size; i++) {
293            p[i] = (char)i;
294        }
295        m_test = alloc_flow_stat_mbuf(m, fsp_head, true);
296        p = rte_pktmbuf_mtod(m_test, char*);
297        assert(rte_pktmbuf_pkt_len(m_test) == size);
298        for (int i = 0; i < rte_pktmbuf_pkt_len(m_test) - sizeof(*fsp_head); i++) {
299            assert(p[i] == (char)i);
300        }
301        // verify fsp_head points correctly
302        if (size > 128) { // should match threshould in alloc_flow_stat_mbuf
303            assert(rte_pktmbuf_data_len(m_test) == size - sizeof(*fsp_head));
304            assert(rte_pktmbuf_data_len(m_test->next) == sizeof(*fsp_head));
305            assert((char *)fsp_head == rte_pktmbuf_mtod((m_test->next), char*));
306        } else {
307            assert(rte_pktmbuf_data_len(m_test) == size);
308            assert (((char *)fsp_head) + sizeof (*fsp_head) == p + rte_pktmbuf_data_len(m_test));
309        }
310        rte_pktmbuf_free(m_test);
311    }
312    return true;
313}
314
315rte_mbuf_t   * CGenNodeStateless::alloc_node_with_vm(){
316
317    rte_mbuf_t        * m;
318    /* alloc small packet buffer*/
319    uint16_t prefix_size = prefix_header_size();
320    m = CGlobalInfo::pktmbuf_alloc( get_socket_id(), prefix_size );
321    if (m==0) {
322        return (m);
323    }
324    /* TBD remove this, should handle cases of error */
325    assert(m);
326    char *p=rte_pktmbuf_append(m, prefix_size);
327    memcpy( p ,m_original_packet_data_prefix, prefix_size);
328
329
330    /* run the VM program */
331    StreamDPVmInstructionsRunner runner;
332
333    runner.run( (uint32_t*)m_vm_flow_var,
334                m_vm_program_size,
335                m_vm_program,
336                m_vm_flow_var,
337                (uint8_t*)p);
338
339    uint16_t pkt_new_size=runner.get_new_pkt_size();
340    if ( likely( pkt_new_size == 0) ) {
341        /* no packet size change */
342        rte_mbuf_t * m_const = get_const_mbuf();
343        if (  m_const != NULL) {
344            utl_rte_pktmbuf_add_after(m,m_const);
345        }
346        return (m);
347    }
348
349    /* packet size change there are a few changes */
350    rte_mbuf_t * m_const = get_const_mbuf();
351    if ( (m_const == 0 ) || (pkt_new_size<=prefix_size) ) {
352        /* one mbuf , just trim it */
353        m->data_len = pkt_new_size;
354        m->pkt_len  = pkt_new_size;
355        return (m);
356    }
357
358    rte_mbuf_t * mi= CGlobalInfo::pktmbuf_alloc_small(get_socket_id());
359    assert(mi);
360    rte_pktmbuf_attach(mi,m_const);
361    utl_rte_pktmbuf_add_after2(m,mi);
362
363    if ( pkt_new_size < m->pkt_len) {
364        /* need to trim it */
365        mi->data_len = (pkt_new_size - prefix_size);
366        m->pkt_len   = pkt_new_size;
367    }
368    return (m);
369}
370
371void CGenNodeStateless::free_stl_vm_buf(){
372        rte_mbuf_t * m ;
373         m=get_const_mbuf();
374         if (m) {
375             rte_pktmbuf_free(m); /* reduce the ref counter */
376             /* clear the const marker */
377             clear_const_mbuf();
378         }
379
380         free_prefix_header();
381
382         if (m_vm_flow_var) {
383             /* free flow var */
384             free(m_vm_flow_var);
385             m_vm_flow_var=0;
386         }
387}
388
389
390
391void CGenNodeStateless::free_stl_node(){
392
393    if ( is_cache_mbuf_array() ){
394        /* do we have cache of mbuf pre allocated */
395        cache_mbuf_array_free();
396    }else{
397        /* if we have cache mbuf free it */
398        rte_mbuf_t * m=get_cache_mbuf();
399        if (m) {
400                rte_pktmbuf_free(m);
401                m_cache_mbuf=0;
402        }
403    }
404    free_stl_vm_buf();
405}
406
407
408bool TrexStatelessDpPerPort::update_number_of_active_streams(uint32_t d){
409    m_active_streams-=d; /* reduce the number of streams */
410    if (m_active_streams == 0) {
411        return (true);
412    }
413    return (false);
414}
415
416bool TrexStatelessDpPerPort::resume_traffic(uint8_t port_id){
417
418    /* we are working with continues streams so we must be in transmit mode */
419    assert(m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE);
420
421    for (auto dp_stream : m_active_nodes) {
422        CGenNodeStateless * node =dp_stream.m_node;
423        assert(node->get_port_id() == port_id);
424        assert(node->is_pause() == true);
425        node->set_pause(false);
426    }
427    m_state = TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
428    return (true);
429}
430
431bool TrexStatelessDpPerPort::update_traffic(uint8_t port_id, double factor) {
432
433    assert( (m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING ||
434            (m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE)) );
435
436    for (auto dp_stream : m_active_nodes) {
437        CGenNodeStateless * node = dp_stream.m_node;
438        assert(node->get_port_id() == port_id);
439
440        node->update_rate(factor);
441    }
442
443    return (true);
444}
445
446bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){
447
448    /* we are working with continues streams so we must be in transmit mode */
449    assert(m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING);
450
451    for (auto dp_stream : m_active_nodes) {
452        CGenNodeStateless * node =dp_stream.m_node;
453        assert(node->get_port_id() == port_id);
454        assert(node->is_pause() == false);
455        node->set_pause(true);
456    }
457    m_state = TrexStatelessDpPerPort::ppSTATE_PAUSE;
458    return (true);
459}
460
461bool TrexStatelessDpPerPort::push_pcap(uint8_t port_id,
462                                       const std::string &pcap_filename,
463                                       double ipg_usec,
464                                       double speedup,
465                                       uint32_t count) {
466
467    /* push pcap can only happen on an idle port from the core prespective */
468    assert(m_state == TrexStatelessDpPerPort::ppSTATE_IDLE);
469
470    CGenNodePCAP *pcap_node = m_core->allocate_pcap_node();
471    if (!pcap_node) {
472        return (false);
473    }
474
475    pkt_dir_t dir          = m_core->m_node_gen.m_v_if->port_id_to_dir(port_id);
476    socket_id_t socket_id  = m_core->m_node_gen.m_socket_id;
477
478    uint8_t mac_addr[12];
479    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, mac_addr);
480
481    bool rc = pcap_node->create(port_id,
482                                dir,
483                                socket_id,
484                                mac_addr,
485                                pcap_filename,
486                                ipg_usec,
487                                speedup,
488                                count);
489    if (!rc) {
490        m_core->free_node((CGenNode *)pcap_node);
491        return (false);
492    }
493
494    /* schedule the node for now */
495    pcap_node->m_time = m_core->m_cur_time_sec;
496    m_core->m_node_gen.add_node((CGenNode *)pcap_node);
497
498    /* hold a pointer to the node */
499    assert(m_active_pcap_node == NULL);
500    m_active_pcap_node = pcap_node;
501
502    m_state = TrexStatelessDpPerPort::ppSTATE_PCAP_TX;
503    return (true);
504}
505
506
507bool TrexStatelessDpPerPort::stop_traffic(uint8_t  port_id,
508                                          bool     stop_on_id,
509                                          int      event_id){
510
511
512    if (m_state == TrexStatelessDpPerPort::ppSTATE_IDLE) {
513        assert(m_active_streams==0);
514        return false;
515    }
516
517    /* there could be race of stop after stop */
518    if ( stop_on_id ) {
519        if (event_id != m_event_id){
520            /* we can't stop it is an old message */
521            return false;
522        }
523    }
524
525    for (auto dp_stream : m_active_nodes) {
526        CGenNodeStateless * node =dp_stream.m_node;
527        assert(node->get_port_id() == port_id);
528        if ( node->get_state() == CGenNodeStateless::ss_ACTIVE) {
529            node->mark_for_free();
530            m_active_streams--;
531            dp_stream.DeleteOnlyStream();
532
533        }else{
534            dp_stream.Delete(m_core);
535        }
536    }
537
538    /* check for active PCAP node */
539    if (m_active_pcap_node) {
540        /* when got async stop from outside or duration */
541        if (m_active_pcap_node->is_active()) {
542            m_active_pcap_node->mark_for_free();
543        } else {
544            /* graceful stop - node was put out by the scheduler */
545            m_core->free_node( (CGenNode *)m_active_pcap_node);
546        }
547
548        m_active_pcap_node = NULL;
549    }
550
551    /* active stream should be zero */
552    assert(m_active_streams==0);
553    m_active_nodes.clear();
554    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
555    return (true);
556}
557
558
559void TrexStatelessDpPerPort::create(CFlowGenListPerThread   *  core){
560    m_core=core;
561    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
562    m_active_streams=0;
563    m_active_nodes.clear();
564    m_active_pcap_node = NULL;
565}
566
567
568
569void
570TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) {
571    m_thread_id = thread_id;
572    m_core = core;
573    m_local_port_offset = 2*core->getDualPortId();
574
575    CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
576
577    m_ring_from_cp = cp_dp->getRingCpToDp(thread_id);
578    m_ring_to_cp   = cp_dp->getRingDpToCp(thread_id);
579
580    m_state = STATE_IDLE;
581
582    int i;
583    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
584        m_ports[i].create(core);
585    }
586}
587
588
589/* move to the next stream, old stream move to INACTIVE */
590bool TrexStatelessDpCore::set_stateless_next_node(CGenNodeStateless * cur_node,
591                                                  CGenNodeStateless * next_node){
592
593    assert(cur_node);
594    TrexStatelessDpPerPort * lp_port = get_port_db(cur_node->m_port_id);
595    bool schedule =false;
596
597    bool to_stop_port=false;
598
599    if (next_node == NULL) {
600        /* there is no next stream , reduce the number of active streams*/
601        to_stop_port = lp_port->update_number_of_active_streams(1);
602
603    }else{
604        uint8_t state=next_node->get_state();
605
606        /* can't be FREE_RESUSE */
607        assert(state != CGenNodeStateless::ss_FREE_RESUSE);
608        if (state == CGenNodeStateless::ss_INACTIVE ) {
609
610            if (cur_node->m_action_counter > 0) {
611                cur_node->m_action_counter--;
612                if (cur_node->m_action_counter==0) {
613                    to_stop_port = lp_port->update_number_of_active_streams(1);
614                }else{
615                    /* refill start info and scedule, no update in active streams  */
616                    next_node->refresh();
617                    schedule = true;
618                }
619            }else{
620                /* refill start info and scedule, no update in active streams  */
621                next_node->refresh();
622                schedule = true;
623            }
624
625        }else{
626            to_stop_port = lp_port->update_number_of_active_streams(1);
627        }
628    }
629
630    if ( to_stop_port ) {
631        /* call stop port explictly to move the state */
632        stop_traffic(cur_node->m_port_id,false,0);
633    }
634
635    return ( schedule );
636}
637
638
639
640/**
641 * in idle state loop, the processor most of the time sleeps
642 * and periodically checks for messages
643 *
644 * @author imarom (01-Nov-15)
645 */
646void
647TrexStatelessDpCore::idle_state_loop() {
648
649    const int SHORT_DELAY_MS    = 2;
650    const int LONG_DELAY_MS     = 50;
651    const int DEEP_SLEEP_LIMIT  = 2000;
652
653    int counter = 0;
654
655    while (m_state == STATE_IDLE) {
656        m_core->m_node_gen.m_v_if->flush_dp_rx_queue();
657        bool had_msg = periodic_check_for_cp_messages();
658        if (had_msg) {
659            counter = 0;
660            continue;
661        }
662
663        /* enter deep sleep only if enough time had passed */
664        if (counter < DEEP_SLEEP_LIMIT) {
665            delay(SHORT_DELAY_MS);
666            counter++;
667        } else {
668            delay(LONG_DELAY_MS);
669        }
670
671    }
672}
673
674
675
676void TrexStatelessDpCore::quit_main_loop(){
677    m_core->set_terminate_mode(true); /* mark it as terminated */
678    m_state = STATE_TERMINATE;
679    add_global_duration(0.0001);
680}
681
682
683/**
684 * scehduler runs when traffic exists
685 * it will return when no more transmitting is done on this
686 * core
687 *
688 * @author imarom (01-Nov-15)
689 */
690void
691TrexStatelessDpCore::start_scheduler() {
692    /* creates a maintenace job using the scheduler */
693    CGenNode * node_sync = m_core->create_node() ;
694    node_sync->m_type = CGenNode::FLOW_SYNC;
695    node_sync->m_time = m_core->m_cur_time_sec + SYNC_TIME_OUT;
696    m_core->m_node_gen.add_node(node_sync);
697
698    double old_offset = 0.0;
699    m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset);
700    /* bail out in case of terminate */
701    if (m_state != TrexStatelessDpCore::STATE_TERMINATE) {
702        m_core->m_node_gen.close_file(m_core);
703        m_state = STATE_IDLE; /* we exit from all ports and we have nothing to do, we move to IDLE state */
704    }
705}
706
707
708void
709TrexStatelessDpCore::run_once(){
710
711    idle_state_loop();
712
713    if ( m_state == STATE_TERMINATE ){
714        return;
715    }
716
717    start_scheduler();
718}
719
720
721
722
723void
724TrexStatelessDpCore::start() {
725
726    while (true) {
727        run_once();
728
729        if ( m_core->is_terminated_by_master() ) {
730            break;
731        }
732    }
733}
734
735/* only if both port are idle we can exit */
736void
737TrexStatelessDpCore::schedule_exit(){
738
739    CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
740
741    node->m_type = CGenNode::COMMAND;
742
743    node->m_cmd = new TrexStatelessDpCanQuit();
744
745    /* make sure it will be scheduled after the current node */
746    node->m_time = m_core->m_cur_time_sec ;
747
748    m_core->m_node_gen.add_node((CGenNode *)node);
749}
750
751
752void
753TrexStatelessDpCore::add_global_duration(double duration){
754    if (duration > 0.0) {
755        CGenNode *node = m_core->create_node() ;
756
757        node->m_type = CGenNode::EXIT_SCHED;
758
759        /* make sure it will be scheduled after the current node */
760        node->m_time = m_core->m_cur_time_sec + duration ;
761
762        m_core->m_node_gen.add_node(node);
763    }
764}
765
766/* add per port exit */
767void
768TrexStatelessDpCore::add_port_duration(double duration,
769                                       uint8_t port_id,
770                                       int event_id){
771    if (duration > 0.0) {
772        CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
773
774        node->m_type = CGenNode::COMMAND;
775
776        /* make sure it will be scheduled after the current node */
777        node->m_time = m_core->m_cur_time_sec + duration ;
778
779        TrexStatelessDpStop * cmd=new TrexStatelessDpStop(port_id);
780
781
782        /* test this */
783        m_core->m_non_active_nodes++;
784        cmd->set_core_ptr(m_core);
785        cmd->set_event_id(event_id);
786        cmd->set_wait_for_event_id(true);
787
788        node->m_cmd = cmd;
789
790        m_core->m_node_gen.add_node((CGenNode *)node);
791    }
792}
793
794
795void TrexStatelessDpCore::update_mac_addr(TrexStream * stream,
796                                          CGenNodeStateless *node,
797                                          pkt_dir_t dir,
798                                          char *raw_pkt){
799    bool              ov_src = stream->get_override_src_mac_by_pkt_data();
800    TrexStream::stream_dst_mac_t  ov_dst = stream->get_override_dst_mac_mode();
801
802
803    if ( (ov_src == true) && (ov_dst == TrexStream::stPKT) ) {
804        /* nothing to do, take from the packet both */
805        return;
806    }
807
808        /* take from cfg_file */
809    if ( (ov_src == false) &&
810         (ov_dst == TrexStream::stCFG_FILE) ){
811
812          m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*)raw_pkt);
813          return;
814    }
815
816    /* save the pkt*/
817    char tmp_pkt[12];
818    memcpy(tmp_pkt,raw_pkt,12);
819
820    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*)raw_pkt);
821
822    if ((ov_src == true) && (ov_dst == TrexStream::stCFG_FILE)) {
823        memcpy(raw_pkt+6,tmp_pkt+6,6);
824    }
825
826    if ((ov_src == false) && (ov_dst == TrexStream::stPKT)) {
827        memcpy(raw_pkt,tmp_pkt,6);
828    }
829}
830
831
832void TrexStatelessDpCore::replay_vm_into_cache(TrexStream * stream,
833                                               CGenNodeStateless *node){
834
835    uint16_t      cache_size = stream->m_cache_size;
836    assert(cache_size>0);
837    rte_mbuf_t * m=0;
838
839    uint32_t buf_size = CGenNodeCacheMbuf::get_object_size(cache_size);
840    CGenNodeCacheMbuf * p = (CGenNodeCacheMbuf *)malloc(buf_size);
841    assert(p);
842    memset(p,0,buf_size);
843
844    int i;
845    for (i=0; i<cache_size; i++) {
846        p->m_array[i] =  node->alloc_node_with_vm();
847    }
848    /* save const */
849    m=node->get_const_mbuf();
850    if (m) {
851        p->m_mbuf_const=m;
852        rte_pktmbuf_refcnt_update(m,1);
853    }
854
855    /* free all VM and const mbuf */
856    node->free_stl_vm_buf();
857
858    /* copy to local node meory */
859    node->cache_mbuf_array_copy(p,cache_size);
860
861    /* free the memory */
862    free(p);
863}
864
865
866void
867TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port,
868                                TrexStream * stream,
869                                TrexStreamsCompiledObj *comp) {
870
871    CGenNodeStateless *node = m_core->create_node_sl();
872
873    node->cache_mbuf_array_init();
874    node->m_batch_size=0;
875
876    /* add periodic */
877    node->m_cache_mbuf=0;
878    node->m_type = CGenNode::STATELESS_PKT;
879
880    node->m_action_counter = stream->m_action_count;
881
882    /* clone the stream from control plane memory to DP memory */
883    node->m_ref_stream_info = stream->clone();
884    /* no need for this memory anymore on the control plane memory */
885    stream->release_dp_object();
886
887    node->m_next_stream=0; /* will be fixed later */
888
889    if ( stream->m_self_start ){
890        /* if self start it is in active mode */
891        node->m_state =CGenNodeStateless::ss_ACTIVE;
892        lp_port->m_active_streams++;
893    }else{
894        node->m_state =CGenNodeStateless::ss_INACTIVE;
895    }
896
897    node->m_time = m_core->m_cur_time_sec + stream->get_start_delay_sec();
898
899    pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(stream->m_port_id);
900    node->m_flags = 0;
901    node->m_src_port =0;
902    node->m_original_packet_data_prefix = 0;
903
904    if (stream->m_rx_check.m_enabled) {
905        node->set_stat_needed();
906        uint8_t hw_id = stream->m_rx_check.m_hw_id;
907        assert (hw_id < MAX_FLOW_STATS + MAX_FLOW_STATS_PAYLOAD);
908        node->set_stat_hw_id(hw_id);
909    }
910
911    /* set socket id */
912    node->set_socket_id(m_core->m_node_gen.m_socket_id);
913
914    /* build a mbuf from a packet */
915
916    uint16_t pkt_size = stream->m_pkt.len;
917    const uint8_t *stream_pkt = stream->m_pkt.binary;
918
919    node->m_pause =0;
920    node->m_stream_type = stream->m_type;
921    node->m_next_time_offset = 1.0 / stream->get_pps();
922    node->m_null_stream = (stream->m_null_stream ? 1 : 0);
923
924    /* stateless specific fields */
925    switch ( stream->m_type ) {
926
927    case TrexStream::stCONTINUOUS :
928        node->m_single_burst=0;
929        node->m_single_burst_refill=0;
930        node->m_multi_bursts=0;
931        break;
932
933    case TrexStream::stSINGLE_BURST :
934        node->m_stream_type             = TrexStream::stMULTI_BURST;
935        node->m_single_burst            = stream->m_burst_total_pkts;
936        node->m_single_burst_refill     = stream->m_burst_total_pkts;
937        node->m_multi_bursts            = 1;  /* single burst in multi burst of 1 */
938        break;
939
940    case TrexStream::stMULTI_BURST :
941        node->m_single_burst        = stream->m_burst_total_pkts;
942        node->m_single_burst_refill = stream->m_burst_total_pkts;
943        node->m_multi_bursts        = stream->m_num_bursts;
944        break;
945    default:
946
947        assert(0);
948    };
949
950    node->m_port_id = stream->m_port_id;
951
952    /* set dir 0 or 1 client or server */
953    node->set_mbuf_cache_dir(dir);
954
955
956    if (node->m_ref_stream_info->getDpVm() == NULL) {
957        /* no VM */
958
959        node->m_vm_flow_var =  NULL;
960        node->m_vm_program  =  NULL;
961        node->m_vm_program_size =0;
962
963                /* allocate const mbuf */
964        rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size);
965        assert(m);
966
967        char *p = rte_pktmbuf_append(m, pkt_size);
968        assert(p);
969        /* copy the packet */
970        memcpy(p,stream_pkt,pkt_size);
971
972        update_mac_addr(stream,node,dir,p);
973
974        /* set the packet as a readonly */
975        node->set_cache_mbuf(m);
976
977        node->m_original_packet_data_prefix =0;
978    }else{
979
980        /* set the program */
981        TrexStream * local_mem_stream = node->m_ref_stream_info;
982
983        StreamVmDp  * lpDpVm = local_mem_stream->getDpVm();
984
985        node->m_vm_flow_var      = lpDpVm->clone_bss(); /* clone the flow var */
986        node->m_vm_program       = lpDpVm->get_program(); /* same ref to the program */
987        node->m_vm_program_size  = lpDpVm->get_program_size();
988
989
990        /* set the random seed if was set */
991        if ( lpDpVm->is_random_seed() ){
992            /* if we have random seed for this program */
993            if (stream->m_random_seed) {
994                node->set_random_seed(stream->m_random_seed);
995            }
996        }
997
998        /* we need to copy the object */
999        if ( pkt_size > lpDpVm->get_prefix_size() ) {
1000            /* we need const packet */
1001            uint16_t const_pkt_size  = pkt_size - lpDpVm->get_prefix_size() ;
1002            rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), const_pkt_size );
1003            assert(m);
1004
1005            char *p = rte_pktmbuf_append(m, const_pkt_size);
1006            assert(p);
1007
1008            /* copy packet data */
1009            memcpy(p,(stream_pkt + lpDpVm->get_prefix_size()),const_pkt_size);
1010
1011            node->set_const_mbuf(m);
1012        }
1013
1014
1015        if ( lpDpVm->is_pkt_size_var() ) {
1016            // mark the node as varible size
1017            node->set_var_pkt_size();
1018        }
1019
1020
1021        if (lpDpVm->get_prefix_size() > pkt_size ) {
1022            lpDpVm->set_prefix_size(pkt_size);
1023        }
1024
1025        /* copy the headr */
1026        uint16_t header_size = lpDpVm->get_prefix_size();
1027        assert(header_size);
1028        node->alloc_prefix_header(header_size);
1029        uint8_t *p=node->m_original_packet_data_prefix;
1030        assert(p);
1031
1032        memcpy(p,stream_pkt , header_size);
1033
1034        update_mac_addr(stream,node,dir,(char *)p);
1035
1036        if (stream->m_cache_size > 0 ) {
1037            /* we need to create cache of objects */
1038            replay_vm_into_cache(stream, node);
1039        }
1040    }
1041
1042
1043    CDpOneStream one_stream;
1044
1045    one_stream.m_dp_stream = node->m_ref_stream_info;
1046    one_stream.m_node =node;
1047
1048    lp_port->m_active_nodes.push_back(one_stream);
1049
1050    /* schedule only if active */
1051    if (node->m_state == CGenNodeStateless::ss_ACTIVE) {
1052        m_core->m_node_gen.add_node((CGenNode *)node);
1053    }
1054}
1055
1056void
1057TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj,
1058                                   double duration,
1059                                   int event_id) {
1060
1061
1062    TrexStatelessDpPerPort * lp_port=get_port_db(obj->get_port_id());
1063    lp_port->m_active_streams = 0;
1064    lp_port->set_event_id(event_id);
1065
1066    /* update cur time */
1067    if ( CGlobalInfo::is_realtime()  ){
1068        m_core->m_cur_time_sec = now_sec() + SCHD_OFFSET_DTIME ;
1069    }
1070
1071    /* no nodes in the list */
1072    assert(lp_port->m_active_nodes.size()==0);
1073
1074    for (auto single_stream : obj->get_objects()) {
1075        /* all commands should be for the same port */
1076        assert(obj->get_port_id() == single_stream.m_stream->m_port_id);
1077        add_stream(lp_port,single_stream.m_stream,obj);
1078    }
1079
1080    uint32_t nodes = lp_port->m_active_nodes.size();
1081    /* find next stream */
1082    assert(nodes == obj->get_objects().size());
1083
1084    int cnt=0;
1085
1086    /* set the next_stream pointer  */
1087    for (auto single_stream : obj->get_objects()) {
1088
1089        if (single_stream.m_stream->is_dp_next_stream() ) {
1090            int stream_id = single_stream.m_stream->m_next_stream_id;
1091            assert(stream_id<nodes);
1092            /* point to the next stream , stream_id is fixed */
1093            lp_port->m_active_nodes[cnt].m_node->m_next_stream = lp_port->m_active_nodes[stream_id].m_node ;
1094        }
1095        cnt++;
1096    }
1097
1098    lp_port->m_state =TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
1099    m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
1100
1101
1102    if ( duration > 0.0 ){
1103        add_port_duration( duration ,obj->get_port_id(),event_id );
1104    }
1105
1106}
1107
1108
1109bool TrexStatelessDpCore::are_all_ports_idle(){
1110
1111    bool res=true;
1112    int i;
1113    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
1114        if ( m_ports[i].m_state != TrexStatelessDpPerPort::ppSTATE_IDLE ){
1115            res=false;
1116        }
1117    }
1118    return (res);
1119}
1120
1121
1122void
1123TrexStatelessDpCore::resume_traffic(uint8_t port_id){
1124
1125    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1126
1127    lp_port->resume_traffic(port_id);
1128}
1129
1130
1131void
1132TrexStatelessDpCore::pause_traffic(uint8_t port_id){
1133
1134    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1135
1136    lp_port->pause_traffic(port_id);
1137}
1138
1139void
1140TrexStatelessDpCore::push_pcap(uint8_t port_id,
1141                               int event_id,
1142                               const std::string &pcap_filename,
1143                               double ipg_usec,
1144                               double speedup,
1145                               uint32_t count,
1146                               double duration) {
1147
1148    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1149
1150    lp_port->set_event_id(event_id);
1151
1152    /* delegate the command to the port */
1153    bool rc = lp_port->push_pcap(port_id, pcap_filename, ipg_usec, speedup, count);
1154    if (!rc) {
1155        /* report back that we stopped */
1156        CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
1157        TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
1158                                                                       port_id,
1159                                                                       event_id,
1160                                                                       false);
1161        ring->Enqueue((CGenNode *)event_msg);
1162        return;
1163    }
1164
1165
1166    if (duration > 0.0) {
1167        add_port_duration(duration, port_id, event_id);
1168    }
1169
1170     m_state = TrexStatelessDpCore::STATE_PCAP_TX;
1171}
1172
1173void
1174TrexStatelessDpCore::update_traffic(uint8_t port_id, double factor) {
1175
1176    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1177
1178    lp_port->update_traffic(port_id, factor);
1179}
1180
1181
1182void
1183TrexStatelessDpCore::stop_traffic(uint8_t  port_id,
1184                                  bool     stop_on_id,
1185                                  int      event_id) {
1186    /* we cannot remove nodes not from the top of the queue so
1187       for every active node - make sure next time
1188       the scheduler invokes it, it will be free */
1189
1190    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1191    if ( lp_port->stop_traffic(port_id,stop_on_id,event_id) == false){
1192        return;
1193    }
1194
1195    /* flush the TX queue before sending done message to the CP */
1196    m_core->flush_tx_queue();
1197
1198    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
1199    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
1200                                                                   port_id,
1201                                                                   lp_port->get_event_id());
1202    ring->Enqueue((CGenNode *)event_msg);
1203
1204}
1205
1206/**
1207 * handle a message from CP to DP
1208 *
1209 */
1210void
1211TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) {
1212    msg->handle(this);
1213    delete msg;
1214}
1215
1216void
1217TrexStatelessDpCore::barrier(uint8_t port_id, int event_id) {
1218
1219    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
1220    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
1221                                                                   port_id,
1222                                                                   event_id);
1223    ring->Enqueue((CGenNode *)event_msg);
1224}
1225
1226
1227/**
1228 * PCAP node
1229 */
1230bool CGenNodePCAP::create(uint8_t port_id,
1231                          pkt_dir_t dir,
1232                          socket_id_t socket_id,
1233                          const uint8_t *mac_addr,
1234                          const std::string &pcap_filename,
1235                          double ipg_usec,
1236                          double speedup,
1237                          uint32_t count) {
1238    std::stringstream ss;
1239
1240    m_type       = CGenNode::PCAP_PKT;
1241    m_flags      = 0;
1242    m_src_port   = 0;
1243    m_port_id    = port_id;
1244    m_count      = count;
1245
1246    /* mark this node as slow path */
1247    set_slow_path(true);
1248
1249    if (ipg_usec != -1) {
1250        /* fixed IPG */
1251        m_ipg_sec = usec_to_sec(ipg_usec / speedup);
1252        m_speedup = 0;
1253    } else {
1254        /* packet IPG */
1255        m_ipg_sec = -1;
1256        m_speedup  = speedup;
1257    }
1258
1259    /* copy MAC addr info */
1260    memcpy(m_mac_addr, mac_addr, 12);
1261
1262    /* set the dir */
1263    set_mbuf_dir(dir);
1264    set_socket_id(socket_id);
1265
1266    /* create the PCAP reader */
1267    m_reader = CCapReaderFactory::CreateReader((char *)pcap_filename.c_str(), 0, ss);
1268    if (!m_reader) {
1269        return false;
1270    }
1271
1272    m_raw_packet = new CCapPktRaw();
1273    if ( m_reader->ReadPacket(m_raw_packet) == false ){
1274        /* handle error */
1275        delete m_reader;
1276        return (false);
1277    }
1278
1279    /* this is the reference time */
1280    //m_base_time = m_raw_packet->get_time();
1281    m_last_pkt_time = m_raw_packet->get_time();
1282
1283    /* ready */
1284    m_state = PCAP_ACTIVE;
1285
1286    return true;
1287}
1288
1289/**
1290 * cleanup for PCAP node
1291 *
1292 * @author imarom (08-May-16)
1293 */
1294void CGenNodePCAP::destroy() {
1295
1296    if (m_raw_packet) {
1297        delete m_raw_packet;
1298        m_raw_packet = NULL;
1299    }
1300
1301    if (m_reader) {
1302        delete m_reader;
1303        m_reader = NULL;
1304    }
1305
1306    m_state = PCAP_INVALID;
1307}
1308
1309