trex_stateless_dp_core.cpp revision 3c4a29e1
1/*
2 Itay Marom
3 Hanoch Haim
4 Cisco Systems, Inc.
5*/
6
7/*
8Copyright (c) 2015-2016 Cisco Systems, Inc.
9
10Licensed under the Apache License, Version 2.0 (the "License");
11you may not use this file except in compliance with the License.
12You may obtain a copy of the License at
13
14    http://www.apache.org/licenses/LICENSE-2.0
15
16Unless required by applicable law or agreed to in writing, software
17distributed under the License is distributed on an "AS IS" BASIS,
18WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19See the License for the specific language governing permissions and
20limitations under the License.
21*/
22#include "bp_sim.h"
23#include "trex_stateless_dp_core.h"
24#include "trex_stateless_messaging.h"
25#include "trex_stream.h"
26#include "trex_stream_node.h"
27#include "trex_streams_compiler.h"
28#include "mbuf.h"
29
30
31
32
33void CGenNodeStateless::cache_mbuf_array_init(){
34    m_cache_size=0;
35    m_cache_array_cnt=0;
36}
37
38
39
40void CGenNodeStateless::cache_mbuf_array_copy(CGenNodeCacheMbuf *obj,
41                                              uint16_t size){
42
43    int i;
44    cache_mbuf_array_alloc(size);
45    for (i=0; i<size; i++) {
46        cache_mbuf_array_set(i,obj->m_array[i]);
47    }
48    cache_mbuf_array_set_const_mbuf(obj->m_mbuf_const);
49}
50
51
52rte_mbuf_t ** CGenNodeStateless::cache_mbuf_array_alloc(uint16_t size){
53
54    uint32_t buf_size = CGenNodeCacheMbuf::get_object_size(size);
55    /* TBD  replace with align, zero API */
56    m_cache_mbuf = (void *)malloc(buf_size);
57    assert(m_cache_mbuf);
58    memset(m_cache_mbuf,0,buf_size);
59
60    m_flags |= SL_NODE_CONST_MBUF_CACHE_ARRAY;
61    m_cache_size=size;
62    m_cache_array_cnt=0;
63    return ((rte_mbuf_t **)m_cache_mbuf);
64}
65
66void CGenNodeStateless::cache_mbuf_array_free(){
67
68    assert(m_cache_mbuf);
69    int i;
70    for (i=0; i<(int)m_cache_size; i++) {
71        rte_mbuf_t * m=cache_mbuf_array_get((uint16_t)i);
72        assert(m);
73        rte_pktmbuf_free(m);
74    }
75
76    /* free the const */
77    rte_mbuf_t * m=cache_mbuf_array_get_const_mbuf() ;
78    if (m) {
79        rte_pktmbuf_free(m);
80    }
81
82    free(m_cache_mbuf);
83    m_cache_mbuf=0;
84}
85
86
87rte_mbuf_t * CGenNodeStateless::cache_mbuf_array_get(uint16_t index){
88
89    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
90    return (p->m_array[index]);
91}
92
93void CGenNodeStateless::cache_mbuf_array_set_const_mbuf(rte_mbuf_t * m){
94    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
95    p->m_mbuf_const=m;
96}
97
98rte_mbuf_t * CGenNodeStateless::cache_mbuf_array_get_const_mbuf(){
99    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
100    return (p->m_mbuf_const);
101}
102
103
104void CGenNodeStateless::cache_mbuf_array_set(uint16_t index,
105                                             rte_mbuf_t * m){
106    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
107    p->m_array[index]=m;
108}
109
110
111void CDpOneStream::Delete(CFlowGenListPerThread   * core){
112    assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE);
113    core->free_node((CGenNode *)m_node);
114    delete m_dp_stream;
115    m_node=0;
116    m_dp_stream=0;
117}
118
119void CDpOneStream::DeleteOnlyStream(){
120    assert(m_dp_stream);
121    delete m_dp_stream;
122    m_dp_stream=0;
123}
124
125int CGenNodeStateless::get_stream_id(){
126    if (m_state ==CGenNodeStateless::ss_FREE_RESUSE) {
127        return (-1); // not valid
128    }
129    assert(m_ref_stream_info);
130    return ((int)m_ref_stream_info->m_stream_id);
131}
132
133
134void CGenNodeStateless::DumpHeader(FILE *fd){
135    fprintf(fd," pkt_id, time, port , action , state, stream_id , stype , m-burst# , burst# \n");
136
137}
138void CGenNodeStateless::Dump(FILE *fd){
139    fprintf(fd," %2.4f, %3lu, %s,%s, %3d, %s, %3lu, %3lu  \n",
140            m_time,
141            (ulong)m_port_id,
142            "s-pkt", //action
143            get_stream_state_str(m_state ).c_str(),
144            get_stream_id(),   //stream_id
145            TrexStream::get_stream_type_str(m_stream_type).c_str(), //stype
146            (ulong)m_multi_bursts,
147            (ulong)m_single_burst
148            );
149}
150
151
152
153void CGenNodeStateless::refresh_vm_bss(){
154    if ( m_vm_flow_var ) {
155        StreamVmDp  * vm_s=m_ref_stream_info->m_vm_dp;
156        assert(vm_s);
157        memcpy(m_vm_flow_var,vm_s->get_bss(),vm_s->get_bss_size());
158
159        if ( vm_s->is_random_seed() ){
160            /* if we have random seed for this program */
161            if (m_ref_stream_info->m_random_seed) {
162                set_random_seed(m_ref_stream_info->m_random_seed);
163            }
164        }
165    }
166}
167
168
169/**
170 * this function called when stream restart after it was inactive
171 */
172void CGenNodeStateless::refresh(){
173
174    /* refill the stream info */
175    m_single_burst    = m_single_burst_refill;
176    m_multi_bursts    = m_ref_stream_info->m_num_bursts;
177    m_state           = CGenNodeStateless::ss_ACTIVE;
178
179    /* refresh init value */
180#if 0
181    /* TBD should add a JSON varible for that */
182    refresh_vm_bss();
183#endif
184}
185
186
187void CGenNodeCommand::free_command(){
188
189    assert(m_cmd);
190    m_cmd->on_node_remove();
191    delete m_cmd;
192}
193
194
195std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state){
196    std::string res;
197
198    switch (stream_state) {
199    case CGenNodeStateless::ss_FREE_RESUSE :
200         res="FREE    ";
201        break;
202    case CGenNodeStateless::ss_INACTIVE :
203        res="INACTIVE ";
204        break;
205    case CGenNodeStateless::ss_ACTIVE :
206        res="ACTIVE   ";
207        break;
208    default:
209        res="Unknow   ";
210    };
211    return(res);
212}
213
214/*
215 * Allocate mbuf for flow stat (and latency) info sending
216 * m - Original mbuf (can be complicated mbuf data structure)
217 * fsp_head - return pointer in which the flow stat info should be filled
218 * is_const - is the given mbuf const
219 * return new mbuf structure in which the fsp_head can be written. If needed, orginal mbuf is freed.
220 */
221rte_mbuf_t * CGenNodeStateless::alloc_flow_stat_mbuf(rte_mbuf_t *m, struct flow_stat_payload_header *&fsp_head
222                                                     , bool is_const) {
223    rte_mbuf_t *m_ret = NULL, *m_lat = NULL;
224    uint16_t fsp_head_size = sizeof(struct flow_stat_payload_header);
225
226    if (is_const) {
227        // const mbuf case
228        if (rte_pktmbuf_data_len(m) > 128) {
229            m_ret = CGlobalInfo::pktmbuf_alloc_small(get_socket_id());
230            assert(m_ret);
231            // alloc mbuf just for the latency header
232            m_lat = CGlobalInfo::pktmbuf_alloc( get_socket_id(), fsp_head_size);
233            assert(m_lat);
234            fsp_head = (struct flow_stat_payload_header *)rte_pktmbuf_append(m_lat, fsp_head_size);
235            rte_pktmbuf_attach(m_ret, m);
236            rte_pktmbuf_trim(m_ret, sizeof(struct flow_stat_payload_header));
237            utl_rte_pktmbuf_add_after2(m_ret, m_lat);
238            // ref count was updated when we took the (const) mbuf, and again in rte_pktmbuf_attach
239            // so need do decrease now, to avoid leak.
240            rte_pktmbuf_refcnt_update(m, -1);
241            return m_ret;
242        } else {
243            // Short packet. Just copy all bytes.
244            m_ret = CGlobalInfo::pktmbuf_alloc( get_socket_id(), rte_pktmbuf_data_len(m) );
245            assert(m_ret);
246            char *p = rte_pktmbuf_mtod(m, char*);
247            char *p_new = rte_pktmbuf_append(m_ret, rte_pktmbuf_data_len(m));
248            memcpy(p_new , p, rte_pktmbuf_data_len(m));
249            fsp_head = (struct flow_stat_payload_header *)(p_new + rte_pktmbuf_data_len(m) - fsp_head_size);
250            rte_pktmbuf_free(m);
251            return m_ret;
252        }
253    } else {
254        // Field engine (vm)
255        if (rte_pktmbuf_is_contiguous(m)) {
256            // one, r/w mbuf
257            char *p = rte_pktmbuf_mtod(m, char*);
258            fsp_head = (struct flow_stat_payload_header *)(p + rte_pktmbuf_data_len(m) - fsp_head_size);
259            return m;
260        } else {
261            // r/w --> read only. Should do something like:
262            // Alloc indirect,. make r/w->indirect point to read_only) -> new fsp_header
263            // for the mean time, just copy the entire packet.
264            m_ret = CGlobalInfo::pktmbuf_alloc( get_socket_id(), rte_pktmbuf_pkt_len(m) );
265            assert(m_ret);
266            char *p_new = rte_pktmbuf_append(m_ret, rte_pktmbuf_pkt_len(m));
267            rte_mbuf_t *m_free = m;
268            while (m != NULL) {
269                char *p = rte_pktmbuf_mtod(m, char*);
270                memcpy(p_new, p, m->data_len);
271                p_new += m->data_len;
272                m = m->next;
273            }
274            p_new = rte_pktmbuf_mtod(m_ret, char*);
275            fsp_head = (struct flow_stat_payload_header *)(p_new + rte_pktmbuf_data_len(m_ret) - fsp_head_size);
276            rte_pktmbuf_free(m_free);
277            return m_ret;
278        }
279    }
280}
281
282// test the const case of alloc_flow_stat_mbuf. The more complicated non const case is tested in the simulation.
283bool CGenNodeStateless::alloc_flow_stat_mbuf_test_const() {
284    rte_mbuf_t *m, *m_test;
285    uint16_t sizes[2] = {64, 500};
286    uint16_t size;
287    struct flow_stat_payload_header *fsp_head;
288    char *p;
289
290    set_socket_id(0);
291    for (int test_num = 0; test_num < sizeof(sizes)/sizeof(sizes[0]); test_num++) {
292        size = sizes[test_num];
293        m = CGlobalInfo::pktmbuf_alloc(get_socket_id(), size);
294        p = rte_pktmbuf_append(m, size);
295        for (int i = 0; i < size; i++) {
296            p[i] = (char)i;
297        }
298        m_test = alloc_flow_stat_mbuf(m, fsp_head, true);
299        p = rte_pktmbuf_mtod(m_test, char*);
300        assert(rte_pktmbuf_pkt_len(m_test) == size);
301        for (int i = 0; i < rte_pktmbuf_pkt_len(m_test) - sizeof(*fsp_head); i++) {
302            assert(p[i] == (char)i);
303        }
304        // verify fsp_head points correctly
305        if (size > 128) { // should match threshould in alloc_flow_stat_mbuf
306            assert(rte_pktmbuf_data_len(m_test) == size - sizeof(*fsp_head));
307            assert(rte_pktmbuf_data_len(m_test->next) == sizeof(*fsp_head));
308            assert((char *)fsp_head == rte_pktmbuf_mtod((m_test->next), char*));
309        } else {
310            assert(rte_pktmbuf_data_len(m_test) == size);
311            assert (((char *)fsp_head) + sizeof (*fsp_head) == p + rte_pktmbuf_data_len(m_test));
312        }
313        rte_pktmbuf_free(m_test);
314    }
315    return true;
316}
317
318rte_mbuf_t   * CGenNodeStateless::alloc_node_with_vm(){
319
320    rte_mbuf_t        * m;
321    /* alloc small packet buffer*/
322    uint16_t prefix_size = prefix_header_size();
323    m = CGlobalInfo::pktmbuf_alloc( get_socket_id(), prefix_size );
324    if (m==0) {
325        return (m);
326    }
327    /* TBD remove this, should handle cases of error */
328    assert(m);
329    char *p=rte_pktmbuf_append(m, prefix_size);
330    memcpy( p ,m_original_packet_data_prefix, prefix_size);
331
332
333    /* run the VM program */
334    StreamDPVmInstructionsRunner runner;
335
336    runner.run( (uint32_t*)m_vm_flow_var,
337                m_vm_program_size,
338                m_vm_program,
339                m_vm_flow_var,
340                (uint8_t*)p);
341
342    uint16_t pkt_new_size=runner.get_new_pkt_size();
343    if ( likely( pkt_new_size == 0) ) {
344        /* no packet size change */
345        rte_mbuf_t * m_const = get_const_mbuf();
346        if (  m_const != NULL) {
347            utl_rte_pktmbuf_add_after(m,m_const);
348        }
349        return (m);
350    }
351
352    /* packet size change there are a few changes */
353    rte_mbuf_t * m_const = get_const_mbuf();
354    if ( (m_const == 0 ) || (pkt_new_size<=prefix_size) ) {
355        /* one mbuf , just trim it */
356        m->data_len = pkt_new_size;
357        m->pkt_len  = pkt_new_size;
358        return (m);
359    }
360
361    rte_mbuf_t * mi= CGlobalInfo::pktmbuf_alloc_small(get_socket_id());
362    assert(mi);
363    rte_pktmbuf_attach(mi,m_const);
364    utl_rte_pktmbuf_add_after2(m,mi);
365
366    if ( pkt_new_size < m->pkt_len) {
367        /* need to trim it */
368        mi->data_len = (pkt_new_size - prefix_size);
369        m->pkt_len   = pkt_new_size;
370    }
371    return (m);
372}
373
374void CGenNodeStateless::free_stl_vm_buf(){
375        rte_mbuf_t * m ;
376         m=get_const_mbuf();
377         if (m) {
378             rte_pktmbuf_free(m); /* reduce the ref counter */
379             /* clear the const marker */
380             clear_const_mbuf();
381         }
382
383         free_prefix_header();
384
385         if (m_vm_flow_var) {
386             /* free flow var */
387             free(m_vm_flow_var);
388             m_vm_flow_var=0;
389         }
390}
391
392
393
394void CGenNodeStateless::free_stl_node(){
395
396    if ( is_cache_mbuf_array() ){
397        /* do we have cache of mbuf pre allocated */
398        cache_mbuf_array_free();
399    }else{
400        /* if we have cache mbuf free it */
401        rte_mbuf_t * m=get_cache_mbuf();
402        if (m) {
403                rte_pktmbuf_free(m);
404                m_cache_mbuf=0;
405        }
406    }
407    free_stl_vm_buf();
408}
409
410
411bool TrexStatelessDpPerPort::update_number_of_active_streams(uint32_t d){
412    m_active_streams-=d; /* reduce the number of streams */
413    if (m_active_streams == 0) {
414        return (true);
415    }
416    return (false);
417}
418
419bool TrexStatelessDpPerPort::resume_traffic(uint8_t port_id){
420
421    /* we are working with continues streams so we must be in transmit mode */
422    assert(m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE);
423
424    for (auto dp_stream : m_active_nodes) {
425        CGenNodeStateless * node =dp_stream.m_node;
426        assert(node->get_port_id() == port_id);
427        assert(node->is_pause() == true);
428        node->set_pause(false);
429    }
430    m_state = TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
431    return (true);
432}
433
434bool TrexStatelessDpPerPort::update_traffic(uint8_t port_id, double factor) {
435
436    assert( (m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING ||
437            (m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE)) );
438
439    for (auto dp_stream : m_active_nodes) {
440        CGenNodeStateless * node = dp_stream.m_node;
441        assert(node->get_port_id() == port_id);
442
443        node->update_rate(factor);
444    }
445
446    return (true);
447}
448
449bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){
450
451    /* we are working with continues streams so we must be in transmit mode */
452    assert(m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING);
453
454    for (auto dp_stream : m_active_nodes) {
455        CGenNodeStateless * node =dp_stream.m_node;
456        assert(node->get_port_id() == port_id);
457        assert(node->is_pause() == false);
458        node->set_pause(true);
459    }
460    m_state = TrexStatelessDpPerPort::ppSTATE_PAUSE;
461    return (true);
462}
463
464bool TrexStatelessDpPerPort::push_pcap(uint8_t port_id,
465                                       const std::string &pcap_filename,
466                                       double ipg_usec,
467                                       double speedup,
468                                       uint32_t count) {
469
470    /* push pcap can only happen on an idle port from the core prespective */
471    assert(m_state == TrexStatelessDpPerPort::ppSTATE_IDLE);
472
473    CGenNodePCAP *pcap_node = m_core->allocate_pcap_node();
474    if (!pcap_node) {
475        return (false);
476    }
477
478    pkt_dir_t dir          = m_core->m_node_gen.m_v_if->port_id_to_dir(port_id);
479    socket_id_t socket_id  = m_core->m_node_gen.m_socket_id;
480
481    uint8_t mac_addr[12];
482    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, mac_addr);
483
484    bool rc = pcap_node->create(port_id,
485                                dir,
486                                socket_id,
487                                mac_addr,
488                                pcap_filename,
489                                ipg_usec,
490                                speedup,
491                                count);
492    if (!rc) {
493        m_core->free_node((CGenNode *)pcap_node);
494        return (false);
495    }
496
497    /* schedule the node for now */
498    pcap_node->m_time = m_core->m_cur_time_sec;
499    m_core->m_node_gen.add_node((CGenNode *)pcap_node);
500
501    /* hold a pointer to the node */
502    assert(m_active_pcap_node == NULL);
503    m_active_pcap_node = pcap_node;
504
505    m_state = TrexStatelessDpPerPort::ppSTATE_PCAP_TX;
506    return (true);
507}
508
509
510bool TrexStatelessDpPerPort::stop_traffic(uint8_t  port_id,
511                                          bool     stop_on_id,
512                                          int      event_id){
513
514
515    if (m_state == TrexStatelessDpPerPort::ppSTATE_IDLE) {
516        assert(m_active_streams==0);
517        return false;
518    }
519
520    /* there could be race of stop after stop */
521    if ( stop_on_id ) {
522        if (event_id != m_event_id){
523            /* we can't stop it is an old message */
524            return false;
525        }
526    }
527
528    for (auto dp_stream : m_active_nodes) {
529        CGenNodeStateless * node =dp_stream.m_node;
530        assert(node->get_port_id() == port_id);
531        if ( node->get_state() == CGenNodeStateless::ss_ACTIVE) {
532            node->mark_for_free();
533            m_active_streams--;
534            dp_stream.DeleteOnlyStream();
535
536        }else{
537            dp_stream.Delete(m_core);
538        }
539    }
540
541    /* check for active PCAP node */
542    if (m_active_pcap_node) {
543        /* when got async stop from outside or duration */
544        if (m_active_pcap_node->is_active()) {
545            m_active_pcap_node->mark_for_free();
546        } else {
547            /* graceful stop - node was put out by the scheduler */
548            m_core->free_node( (CGenNode *)m_active_pcap_node);
549        }
550
551        m_active_pcap_node = NULL;
552    }
553
554    /* active stream should be zero */
555    assert(m_active_streams==0);
556    m_active_nodes.clear();
557    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
558    return (true);
559}
560
561
562void TrexStatelessDpPerPort::create(CFlowGenListPerThread   *  core){
563    m_core=core;
564    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
565    m_active_streams=0;
566    m_active_nodes.clear();
567    m_active_pcap_node = NULL;
568}
569
570
571
572void
573TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) {
574    m_thread_id = thread_id;
575    m_core = core;
576    m_local_port_offset = 2*core->getDualPortId();
577
578    CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
579
580    m_ring_from_cp = cp_dp->getRingCpToDp(thread_id);
581    m_ring_to_cp   = cp_dp->getRingDpToCp(thread_id);
582
583    m_state = STATE_IDLE;
584
585    int i;
586    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
587        m_ports[i].create(core);
588    }
589}
590
591
592/* move to the next stream, old stream move to INACTIVE */
593bool TrexStatelessDpCore::set_stateless_next_node(CGenNodeStateless * cur_node,
594                                                  CGenNodeStateless * next_node){
595
596    assert(cur_node);
597    TrexStatelessDpPerPort * lp_port = get_port_db(cur_node->m_port_id);
598    bool schedule =false;
599
600    bool to_stop_port=false;
601
602    if (next_node == NULL) {
603        /* there is no next stream , reduce the number of active streams*/
604        to_stop_port = lp_port->update_number_of_active_streams(1);
605
606    }else{
607        uint8_t state=next_node->get_state();
608
609        /* can't be FREE_RESUSE */
610        assert(state != CGenNodeStateless::ss_FREE_RESUSE);
611        if (state == CGenNodeStateless::ss_INACTIVE ) {
612
613            if (cur_node->m_action_counter > 0) {
614                cur_node->m_action_counter--;
615                if (cur_node->m_action_counter==0) {
616                    to_stop_port = lp_port->update_number_of_active_streams(1);
617                }else{
618                    /* refill start info and scedule, no update in active streams  */
619                    next_node->refresh();
620                    schedule = true;
621                }
622            }else{
623                /* refill start info and scedule, no update in active streams  */
624                next_node->refresh();
625                schedule = true;
626            }
627
628        }else{
629            to_stop_port = lp_port->update_number_of_active_streams(1);
630        }
631    }
632
633    if ( to_stop_port ) {
634        /* call stop port explictly to move the state */
635        stop_traffic(cur_node->m_port_id,false,0);
636    }
637
638    return ( schedule );
639}
640
641
642
643/**
644 * in idle state loop, the processor most of the time sleeps
645 * and periodically checks for messages
646 *
647 * @author imarom (01-Nov-15)
648 */
649void
650TrexStatelessDpCore::idle_state_loop() {
651
652    const int SHORT_DELAY_MS    = 2;
653    const int LONG_DELAY_MS     = 50;
654    const int DEEP_SLEEP_LIMIT  = 2000;
655
656    int counter = 0;
657
658    while (m_state == STATE_IDLE) {
659        m_core->tickle();
660        m_core->m_node_gen.m_v_if->flush_dp_rx_queue();
661        bool had_msg = periodic_check_for_cp_messages();
662        if (had_msg) {
663            counter = 0;
664            continue;
665        }
666
667        /* enter deep sleep only if enough time had passed */
668        if (counter < DEEP_SLEEP_LIMIT) {
669            delay(SHORT_DELAY_MS);
670            counter++;
671        } else {
672            delay(LONG_DELAY_MS);
673        }
674
675    }
676}
677
678
679
680void TrexStatelessDpCore::quit_main_loop(){
681    m_core->set_terminate_mode(true); /* mark it as terminated */
682    m_state = STATE_TERMINATE;
683    add_global_duration(0.0001);
684}
685
686
687/**
688 * scehduler runs when traffic exists
689 * it will return when no more transmitting is done on this
690 * core
691 *
692 * @author imarom (01-Nov-15)
693 */
694void
695TrexStatelessDpCore::start_scheduler() {
696    /* creates a maintenace job using the scheduler */
697    CGenNode * node_sync = m_core->create_node() ;
698    node_sync->m_type = CGenNode::FLOW_SYNC;
699    node_sync->m_time = m_core->m_cur_time_sec + SYNC_TIME_OUT;
700    m_core->m_node_gen.add_node(node_sync);
701
702    double old_offset = 0.0;
703    m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset);
704    /* bail out in case of terminate */
705    if (m_state != TrexStatelessDpCore::STATE_TERMINATE) {
706        m_core->m_node_gen.close_file(m_core);
707        m_state = STATE_IDLE; /* we exit from all ports and we have nothing to do, we move to IDLE state */
708    }
709}
710
711
712void
713TrexStatelessDpCore::run_once(){
714
715    idle_state_loop();
716
717    if ( m_state == STATE_TERMINATE ){
718        return;
719    }
720
721    start_scheduler();
722}
723
724
725
726
727void
728TrexStatelessDpCore::start() {
729
730    while (true) {
731        run_once();
732
733        if ( m_core->is_terminated_by_master() ) {
734            break;
735        }
736    }
737}
738
739/* only if both port are idle we can exit */
740void
741TrexStatelessDpCore::schedule_exit(){
742
743    CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
744
745    node->m_type = CGenNode::COMMAND;
746
747    node->m_cmd = new TrexStatelessDpCanQuit();
748
749    /* make sure it will be scheduled after the current node */
750    node->m_time = m_core->m_cur_time_sec ;
751
752    m_core->m_node_gen.add_node((CGenNode *)node);
753}
754
755
756void
757TrexStatelessDpCore::add_global_duration(double duration){
758    if (duration > 0.0) {
759        CGenNode *node = m_core->create_node() ;
760
761        node->m_type = CGenNode::EXIT_SCHED;
762
763        /* make sure it will be scheduled after the current node */
764        node->m_time = m_core->m_cur_time_sec + duration ;
765
766        m_core->m_node_gen.add_node(node);
767    }
768}
769
770/* add per port exit */
771void
772TrexStatelessDpCore::add_port_duration(double duration,
773                                       uint8_t port_id,
774                                       int event_id){
775    if (duration > 0.0) {
776        CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
777
778        node->m_type = CGenNode::COMMAND;
779
780        /* make sure it will be scheduled after the current node */
781        node->m_time = m_core->m_cur_time_sec + duration ;
782
783        TrexStatelessDpStop * cmd=new TrexStatelessDpStop(port_id);
784
785
786        /* test this */
787        m_core->m_non_active_nodes++;
788        cmd->set_core_ptr(m_core);
789        cmd->set_event_id(event_id);
790        cmd->set_wait_for_event_id(true);
791
792        node->m_cmd = cmd;
793
794        m_core->m_node_gen.add_node((CGenNode *)node);
795    }
796}
797
798
799void TrexStatelessDpCore::update_mac_addr(TrexStream * stream,
800                                          CGenNodeStateless *node,
801                                          pkt_dir_t dir,
802                                          char *raw_pkt){
803    bool              ov_src = stream->get_override_src_mac_by_pkt_data();
804    TrexStream::stream_dst_mac_t  ov_dst = stream->get_override_dst_mac_mode();
805
806
807    if ( (ov_src == true) && (ov_dst == TrexStream::stPKT) ) {
808        /* nothing to do, take from the packet both */
809        return;
810    }
811
812        /* take from cfg_file */
813    if ( (ov_src == false) &&
814         (ov_dst == TrexStream::stCFG_FILE) ){
815
816          m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*)raw_pkt);
817          return;
818    }
819
820    /* save the pkt*/
821    char tmp_pkt[12];
822    memcpy(tmp_pkt,raw_pkt,12);
823
824    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*)raw_pkt);
825
826    if ((ov_src == true) && (ov_dst == TrexStream::stCFG_FILE)) {
827        memcpy(raw_pkt+6,tmp_pkt+6,6);
828    }
829
830    if ((ov_src == false) && (ov_dst == TrexStream::stPKT)) {
831        memcpy(raw_pkt,tmp_pkt,6);
832    }
833}
834
835
836void TrexStatelessDpCore::replay_vm_into_cache(TrexStream * stream,
837                                               CGenNodeStateless *node){
838
839    uint16_t      cache_size = stream->m_cache_size;
840    assert(cache_size>0);
841    rte_mbuf_t * m=0;
842
843    uint32_t buf_size = CGenNodeCacheMbuf::get_object_size(cache_size);
844    CGenNodeCacheMbuf * p = (CGenNodeCacheMbuf *)malloc(buf_size);
845    assert(p);
846    memset(p,0,buf_size);
847
848    int i;
849    for (i=0; i<cache_size; i++) {
850        p->m_array[i] =  node->alloc_node_with_vm();
851    }
852    /* save const */
853    m=node->get_const_mbuf();
854    if (m) {
855        p->m_mbuf_const=m;
856        rte_pktmbuf_refcnt_update(m,1);
857    }
858
859    /* free all VM and const mbuf */
860    node->free_stl_vm_buf();
861
862    /* copy to local node meory */
863    node->cache_mbuf_array_copy(p,cache_size);
864
865    /* free the memory */
866    free(p);
867}
868
869
870void
871TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port,
872                                TrexStream * stream,
873                                TrexStreamsCompiledObj *comp) {
874
875    CGenNodeStateless *node = m_core->create_node_sl();
876
877    node->cache_mbuf_array_init();
878    node->m_batch_size=0;
879
880    /* add periodic */
881    node->m_cache_mbuf=0;
882    node->m_type = CGenNode::STATELESS_PKT;
883
884    node->m_action_counter = stream->m_action_count;
885
886    /* clone the stream from control plane memory to DP memory */
887    node->m_ref_stream_info = stream->clone();
888    /* no need for this memory anymore on the control plane memory */
889    stream->release_dp_object();
890
891    node->m_next_stream=0; /* will be fixed later */
892
893    if ( stream->m_self_start ){
894        /* if self start it is in active mode */
895        node->m_state =CGenNodeStateless::ss_ACTIVE;
896        lp_port->m_active_streams++;
897    }else{
898        node->m_state =CGenNodeStateless::ss_INACTIVE;
899    }
900
901    node->m_time = m_core->m_cur_time_sec + stream->get_start_delay_sec();
902
903    pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(stream->m_port_id);
904    node->m_flags = 0;
905    node->m_src_port =0;
906    node->m_original_packet_data_prefix = 0;
907
908    if (stream->m_rx_check.m_enabled) {
909        node->set_stat_needed();
910        uint8_t hw_id = stream->m_rx_check.m_hw_id;
911        assert (hw_id < MAX_FLOW_STATS + MAX_FLOW_STATS_PAYLOAD);
912        node->set_stat_hw_id(hw_id);
913    }
914
915    /* set socket id */
916    node->set_socket_id(m_core->m_node_gen.m_socket_id);
917
918    /* build a mbuf from a packet */
919
920    uint16_t pkt_size = stream->m_pkt.len;
921    const uint8_t *stream_pkt = stream->m_pkt.binary;
922
923    node->m_pause =0;
924    node->m_stream_type = stream->m_type;
925    node->m_next_time_offset = 1.0 / stream->get_pps();
926    node->m_null_stream = (stream->m_null_stream ? 1 : 0);
927
928    /* stateless specific fields */
929    switch ( stream->m_type ) {
930
931    case TrexStream::stCONTINUOUS :
932        node->m_single_burst=0;
933        node->m_single_burst_refill=0;
934        node->m_multi_bursts=0;
935        break;
936
937    case TrexStream::stSINGLE_BURST :
938        node->m_stream_type             = TrexStream::stMULTI_BURST;
939        node->m_single_burst            = stream->m_burst_total_pkts;
940        node->m_single_burst_refill     = stream->m_burst_total_pkts;
941        node->m_multi_bursts            = 1;  /* single burst in multi burst of 1 */
942        break;
943
944    case TrexStream::stMULTI_BURST :
945        node->m_single_burst        = stream->m_burst_total_pkts;
946        node->m_single_burst_refill = stream->m_burst_total_pkts;
947        node->m_multi_bursts        = stream->m_num_bursts;
948        break;
949    default:
950
951        assert(0);
952    };
953
954    node->m_port_id = stream->m_port_id;
955
956    /* set dir 0 or 1 client or server */
957    node->set_mbuf_cache_dir(dir);
958
959
960    if (node->m_ref_stream_info->getDpVm() == NULL) {
961        /* no VM */
962
963        node->m_vm_flow_var =  NULL;
964        node->m_vm_program  =  NULL;
965        node->m_vm_program_size =0;
966
967                /* allocate const mbuf */
968        rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size);
969        assert(m);
970
971        char *p = rte_pktmbuf_append(m, pkt_size);
972        assert(p);
973        /* copy the packet */
974        memcpy(p,stream_pkt,pkt_size);
975
976        update_mac_addr(stream,node,dir,p);
977
978        /* set the packet as a readonly */
979        node->set_cache_mbuf(m);
980
981        node->m_original_packet_data_prefix =0;
982    }else{
983
984        /* set the program */
985        TrexStream * local_mem_stream = node->m_ref_stream_info;
986
987        StreamVmDp  * lpDpVm = local_mem_stream->getDpVm();
988
989        node->m_vm_flow_var      = lpDpVm->clone_bss(); /* clone the flow var */
990        node->m_vm_program       = lpDpVm->get_program(); /* same ref to the program */
991        node->m_vm_program_size  = lpDpVm->get_program_size();
992
993
994        /* set the random seed if was set */
995        if ( lpDpVm->is_random_seed() ){
996            /* if we have random seed for this program */
997            if (stream->m_random_seed) {
998                node->set_random_seed(stream->m_random_seed);
999            }
1000        }
1001
1002        /* we need to copy the object */
1003        if ( pkt_size > lpDpVm->get_prefix_size() ) {
1004            /* we need const packet */
1005            uint16_t const_pkt_size  = pkt_size - lpDpVm->get_prefix_size() ;
1006            rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), const_pkt_size );
1007            assert(m);
1008
1009            char *p = rte_pktmbuf_append(m, const_pkt_size);
1010            assert(p);
1011
1012            /* copy packet data */
1013            memcpy(p,(stream_pkt + lpDpVm->get_prefix_size()),const_pkt_size);
1014
1015            node->set_const_mbuf(m);
1016        }
1017
1018
1019        if ( lpDpVm->is_pkt_size_var() ) {
1020            // mark the node as varible size
1021            node->set_var_pkt_size();
1022        }
1023
1024
1025        if (lpDpVm->get_prefix_size() > pkt_size ) {
1026            lpDpVm->set_prefix_size(pkt_size);
1027        }
1028
1029        /* copy the headr */
1030        uint16_t header_size = lpDpVm->get_prefix_size();
1031        assert(header_size);
1032        node->alloc_prefix_header(header_size);
1033        uint8_t *p=node->m_original_packet_data_prefix;
1034        assert(p);
1035
1036        memcpy(p,stream_pkt , header_size);
1037
1038        update_mac_addr(stream,node,dir,(char *)p);
1039
1040        if (stream->m_cache_size > 0 ) {
1041            /* we need to create cache of objects */
1042            replay_vm_into_cache(stream, node);
1043        }
1044    }
1045
1046
1047    CDpOneStream one_stream;
1048
1049    one_stream.m_dp_stream = node->m_ref_stream_info;
1050    one_stream.m_node =node;
1051
1052    lp_port->m_active_nodes.push_back(one_stream);
1053
1054    /* schedule only if active */
1055    if (node->m_state == CGenNodeStateless::ss_ACTIVE) {
1056        m_core->m_node_gen.add_node((CGenNode *)node);
1057    }
1058}
1059
1060void
1061TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj,
1062                                   double duration,
1063                                   int event_id) {
1064
1065
1066    TrexStatelessDpPerPort * lp_port=get_port_db(obj->get_port_id());
1067    lp_port->m_active_streams = 0;
1068    lp_port->set_event_id(event_id);
1069
1070    /* update cur time */
1071    if ( CGlobalInfo::is_realtime()  ){
1072        m_core->m_cur_time_sec = now_sec() + SCHD_OFFSET_DTIME ;
1073    }
1074
1075    /* no nodes in the list */
1076    assert(lp_port->m_active_nodes.size()==0);
1077
1078    for (auto single_stream : obj->get_objects()) {
1079        /* all commands should be for the same port */
1080        assert(obj->get_port_id() == single_stream.m_stream->m_port_id);
1081        add_stream(lp_port,single_stream.m_stream,obj);
1082    }
1083
1084    uint32_t nodes = lp_port->m_active_nodes.size();
1085    /* find next stream */
1086    assert(nodes == obj->get_objects().size());
1087
1088    int cnt=0;
1089
1090    /* set the next_stream pointer  */
1091    for (auto single_stream : obj->get_objects()) {
1092
1093        if (single_stream.m_stream->is_dp_next_stream() ) {
1094            int stream_id = single_stream.m_stream->m_next_stream_id;
1095            assert(stream_id<nodes);
1096            /* point to the next stream , stream_id is fixed */
1097            lp_port->m_active_nodes[cnt].m_node->m_next_stream = lp_port->m_active_nodes[stream_id].m_node ;
1098        }
1099        cnt++;
1100    }
1101
1102    lp_port->m_state =TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
1103    m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
1104
1105
1106    if ( duration > 0.0 ){
1107        add_port_duration( duration ,obj->get_port_id(),event_id );
1108    }
1109
1110}
1111
1112
1113bool TrexStatelessDpCore::are_all_ports_idle(){
1114
1115    bool res=true;
1116    int i;
1117    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
1118        if ( m_ports[i].m_state != TrexStatelessDpPerPort::ppSTATE_IDLE ){
1119            res=false;
1120        }
1121    }
1122    return (res);
1123}
1124
1125
1126void
1127TrexStatelessDpCore::resume_traffic(uint8_t port_id){
1128
1129    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1130
1131    lp_port->resume_traffic(port_id);
1132}
1133
1134
1135void
1136TrexStatelessDpCore::pause_traffic(uint8_t port_id){
1137
1138    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1139
1140    lp_port->pause_traffic(port_id);
1141}
1142
1143void
1144TrexStatelessDpCore::push_pcap(uint8_t port_id,
1145                               int event_id,
1146                               const std::string &pcap_filename,
1147                               double ipg_usec,
1148                               double speedup,
1149                               uint32_t count,
1150                               double duration) {
1151
1152    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1153
1154    lp_port->set_event_id(event_id);
1155
1156    /* delegate the command to the port */
1157    bool rc = lp_port->push_pcap(port_id, pcap_filename, ipg_usec, speedup, count);
1158    if (!rc) {
1159        /* report back that we stopped */
1160        CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
1161        TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
1162                                                                       port_id,
1163                                                                       event_id,
1164                                                                       false);
1165        ring->Enqueue((CGenNode *)event_msg);
1166        return;
1167    }
1168
1169
1170    if (duration > 0.0) {
1171        add_port_duration(duration, port_id, event_id);
1172    }
1173
1174     m_state = TrexStatelessDpCore::STATE_PCAP_TX;
1175}
1176
1177void
1178TrexStatelessDpCore::update_traffic(uint8_t port_id, double factor) {
1179
1180    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1181
1182    lp_port->update_traffic(port_id, factor);
1183}
1184
1185
1186void
1187TrexStatelessDpCore::stop_traffic(uint8_t  port_id,
1188                                  bool     stop_on_id,
1189                                  int      event_id) {
1190    /* we cannot remove nodes not from the top of the queue so
1191       for every active node - make sure next time
1192       the scheduler invokes it, it will be free */
1193
1194    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1195    if ( lp_port->stop_traffic(port_id,stop_on_id,event_id) == false){
1196        return;
1197    }
1198
1199    /* flush the TX queue before sending done message to the CP */
1200    m_core->flush_tx_queue();
1201
1202    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
1203    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
1204                                                                   port_id,
1205                                                                   lp_port->get_event_id());
1206    ring->Enqueue((CGenNode *)event_msg);
1207
1208}
1209
1210/**
1211 * handle a message from CP to DP
1212 *
1213 */
1214void
1215TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) {
1216    msg->handle(this);
1217    delete msg;
1218}
1219
1220void
1221TrexStatelessDpCore::barrier(uint8_t port_id, int event_id) {
1222
1223    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
1224    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
1225                                                                   port_id,
1226                                                                   event_id);
1227    ring->Enqueue((CGenNode *)event_msg);
1228}
1229
1230
1231/**
1232 * PCAP node
1233 */
1234bool CGenNodePCAP::create(uint8_t port_id,
1235                          pkt_dir_t dir,
1236                          socket_id_t socket_id,
1237                          const uint8_t *mac_addr,
1238                          const std::string &pcap_filename,
1239                          double ipg_usec,
1240                          double speedup,
1241                          uint32_t count) {
1242    std::stringstream ss;
1243
1244    m_type       = CGenNode::PCAP_PKT;
1245    m_flags      = 0;
1246    m_src_port   = 0;
1247    m_port_id    = port_id;
1248    m_count      = count;
1249
1250    /* mark this node as slow path */
1251    set_slow_path(true);
1252
1253    if (ipg_usec != -1) {
1254        /* fixed IPG */
1255        m_ipg_sec = usec_to_sec(ipg_usec / speedup);
1256        m_speedup = 0;
1257    } else {
1258        /* packet IPG */
1259        m_ipg_sec = -1;
1260        m_speedup  = speedup;
1261    }
1262
1263    /* copy MAC addr info */
1264    memcpy(m_mac_addr, mac_addr, 12);
1265
1266    /* set the dir */
1267    set_mbuf_dir(dir);
1268    set_socket_id(socket_id);
1269
1270    /* create the PCAP reader */
1271    m_reader = CCapReaderFactory::CreateReader((char *)pcap_filename.c_str(), 0, ss);
1272    if (!m_reader) {
1273        return false;
1274    }
1275
1276    m_raw_packet = new CCapPktRaw();
1277    if ( m_reader->ReadPacket(m_raw_packet) == false ){
1278        /* handle error */
1279        delete m_reader;
1280        return (false);
1281    }
1282
1283    /* this is the reference time */
1284    //m_base_time = m_raw_packet->get_time();
1285    m_last_pkt_time = m_raw_packet->get_time();
1286
1287    /* ready */
1288    m_state = PCAP_ACTIVE;
1289
1290    return true;
1291}
1292
1293/**
1294 * cleanup for PCAP node
1295 *
1296 * @author imarom (08-May-16)
1297 */
1298void CGenNodePCAP::destroy() {
1299
1300    if (m_raw_packet) {
1301        delete m_raw_packet;
1302        m_raw_packet = NULL;
1303    }
1304
1305    if (m_reader) {
1306        delete m_reader;
1307        m_reader = NULL;
1308    }
1309
1310    m_state = PCAP_INVALID;
1311}
1312
1313