trex_stateless_dp_core.cpp revision 558ce764
1/*
2 Itay Marom
3 Hanoch Haim
4 Cisco Systems, Inc.
5*/
6
7/*
8Copyright (c) 2015-2016 Cisco Systems, Inc.
9
10Licensed under the Apache License, Version 2.0 (the "License");
11you may not use this file except in compliance with the License.
12You may obtain a copy of the License at
13
14    http://www.apache.org/licenses/LICENSE-2.0
15
16Unless required by applicable law or agreed to in writing, software
17distributed under the License is distributed on an "AS IS" BASIS,
18WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19See the License for the specific language governing permissions and
20limitations under the License.
21*/
22#include "bp_sim.h"
23#include "trex_stateless_dp_core.h"
24#include "trex_stateless_messaging.h"
25#include "trex_stream.h"
26#include "trex_stream_node.h"
27#include "trex_streams_compiler.h"
28#include "mbuf.h"
29#include "trex_stateless.h"
30
31
32
33void CGenNodeStateless::cache_mbuf_array_init(){
34    m_cache_size=0;
35    m_cache_array_cnt=0;
36}
37
38
39
40void CGenNodeStateless::cache_mbuf_array_copy(CGenNodeCacheMbuf *obj,
41                                              uint16_t size){
42
43    int i;
44    cache_mbuf_array_alloc(size);
45    for (i=0; i<size; i++) {
46        cache_mbuf_array_set(i,obj->m_array[i]);
47    }
48    cache_mbuf_array_set_const_mbuf(obj->m_mbuf_const);
49}
50
51
52rte_mbuf_t ** CGenNodeStateless::cache_mbuf_array_alloc(uint16_t size){
53
54    uint32_t buf_size = CGenNodeCacheMbuf::get_object_size(size);
55    /* TBD  replace with align, zero API */
56    m_cache_mbuf = (void *)malloc(buf_size);
57    assert(m_cache_mbuf);
58    memset(m_cache_mbuf,0,buf_size);
59
60    m_flags |= SL_NODE_CONST_MBUF_CACHE_ARRAY;
61    m_cache_size=size;
62    m_cache_array_cnt=0;
63    return ((rte_mbuf_t **)m_cache_mbuf);
64}
65
66void CGenNodeStateless::cache_mbuf_array_free(){
67
68    assert(m_cache_mbuf);
69    int i;
70    for (i=0; i<(int)m_cache_size; i++) {
71        rte_mbuf_t * m=cache_mbuf_array_get((uint16_t)i);
72        assert(m);
73        rte_pktmbuf_free(m);
74    }
75
76    /* free the const */
77    rte_mbuf_t * m=cache_mbuf_array_get_const_mbuf() ;
78    if (m) {
79        rte_pktmbuf_free(m);
80    }
81
82    free(m_cache_mbuf);
83    m_cache_mbuf=0;
84}
85
86
87rte_mbuf_t * CGenNodeStateless::cache_mbuf_array_get(uint16_t index){
88
89    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
90    return (p->m_array[index]);
91}
92
93void CGenNodeStateless::cache_mbuf_array_set_const_mbuf(rte_mbuf_t * m){
94    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
95    p->m_mbuf_const=m;
96}
97
98rte_mbuf_t * CGenNodeStateless::cache_mbuf_array_get_const_mbuf(){
99    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
100    return (p->m_mbuf_const);
101}
102
103
104void CGenNodeStateless::cache_mbuf_array_set(uint16_t index,
105                                             rte_mbuf_t * m){
106    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
107    p->m_array[index]=m;
108}
109
110
111void CDpOneStream::Delete(CFlowGenListPerThread   * core){
112    assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE);
113    core->free_node((CGenNode *)m_node);
114    delete m_dp_stream;
115    m_node=0;
116    m_dp_stream=0;
117}
118
119void CDpOneStream::DeleteOnlyStream(){
120    assert(m_dp_stream);
121    delete m_dp_stream;
122    m_dp_stream=0;
123}
124
125int CGenNodeStateless::get_stream_id(){
126    if (m_state ==CGenNodeStateless::ss_FREE_RESUSE) {
127        return (-1); // not valid
128    }
129    assert(m_ref_stream_info);
130    return ((int)m_ref_stream_info->m_stream_id);
131}
132
133
134void CGenNodeStateless::DumpHeader(FILE *fd){
135    fprintf(fd," pkt_id, time, port , action , state, stream_id , stype , m-burst# , burst# \n");
136
137}
138void CGenNodeStateless::Dump(FILE *fd){
139    fprintf(fd," %2.4f, %3lu, %s,%s, %3d, %s, %3lu, %3lu  \n",
140            m_time,
141            (ulong)m_port_id,
142            "s-pkt", //action
143            get_stream_state_str(m_state ).c_str(),
144            get_stream_id(),   //stream_id
145            TrexStream::get_stream_type_str(m_stream_type).c_str(), //stype
146            (ulong)m_multi_bursts,
147            (ulong)m_single_burst
148            );
149}
150
151
152void CGenNodeStateless::generate_random_seed() {
153    /* seed can be provided by the user */
154    uint32_t unique_seed;
155    if (m_ref_stream_info->m_random_seed) {
156        unique_seed = m_ref_stream_info->m_random_seed;
157    } else {
158        unsigned int tmp = (unsigned int)time(NULL);
159        unique_seed = rand_r(&tmp);
160    }
161
162    /* per thread divergence */
163    unique_seed = (unique_seed * ( (m_thread_id + 1) * 514229 ) ) & 0xFFFFFFFF;
164
165    /* set random */
166    set_random_seed(unique_seed);
167}
168
169
170void CGenNodeStateless::refresh_vm_bss() {
171    if ( m_vm_flow_var ) {
172        StreamVmDp  * vm_s=m_ref_stream_info->m_vm_dp;
173        assert(vm_s);
174        memcpy(m_vm_flow_var,vm_s->get_bss(),vm_s->get_bss_size());
175
176        if ( vm_s->is_random_seed() ) {
177            generate_random_seed();
178        }
179
180    }
181}
182
183
184
185/**
186 * this function called when stream restart after it was inactive
187 */
188void CGenNodeStateless::refresh(){
189
190    /* refill the stream info */
191    m_single_burst    = m_single_burst_refill;
192    m_multi_bursts    = m_ref_stream_info->m_num_bursts;
193    m_state           = CGenNodeStateless::ss_ACTIVE;
194
195    /* refresh init value */
196#if 0
197    /* TBD should add a JSON varible for that */
198    refresh_vm_bss();
199#endif
200}
201
202
203void CGenNodeCommand::free_command(){
204
205    assert(m_cmd);
206    m_cmd->on_node_remove();
207    delete m_cmd;
208}
209
210
211std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state){
212    std::string res;
213
214    switch (stream_state) {
215    case CGenNodeStateless::ss_FREE_RESUSE :
216         res="FREE    ";
217        break;
218    case CGenNodeStateless::ss_INACTIVE :
219        res="INACTIVE ";
220        break;
221    case CGenNodeStateless::ss_ACTIVE :
222        res="ACTIVE   ";
223        break;
224    default:
225        res="Unknow   ";
226    };
227    return(res);
228}
229
230/*
231 * Allocate mbuf for flow stat (and latency) info sending
232 * m - Original mbuf (can be complicated mbuf data structure)
233 * fsp_head - return pointer in which the flow stat info should be filled
234 * is_const - is the given mbuf const
235 * return new mbuf structure in which the fsp_head can be written. If needed, orginal mbuf is freed.
236 */
237rte_mbuf_t * CGenNodeStateless::alloc_flow_stat_mbuf(rte_mbuf_t *m, struct flow_stat_payload_header *&fsp_head
238                                                     , bool is_const) {
239    rte_mbuf_t *m_ret = NULL, *m_lat = NULL;
240    uint16_t fsp_head_size = sizeof(struct flow_stat_payload_header);
241
242    if (is_const) {
243        // const mbuf case
244        if (rte_pktmbuf_data_len(m) > 128) {
245            m_ret = CGlobalInfo::pktmbuf_alloc_small(get_socket_id());
246            assert(m_ret);
247            // alloc mbuf just for the latency header
248            m_lat = CGlobalInfo::pktmbuf_alloc( get_socket_id(), fsp_head_size);
249            assert(m_lat);
250            fsp_head = (struct flow_stat_payload_header *)rte_pktmbuf_append(m_lat, fsp_head_size);
251            rte_pktmbuf_attach(m_ret, m);
252            rte_pktmbuf_trim(m_ret, sizeof(struct flow_stat_payload_header));
253            utl_rte_pktmbuf_add_after2(m_ret, m_lat);
254            // ref count was updated when we took the (const) mbuf, and again in rte_pktmbuf_attach
255            // so need do decrease now, to avoid leak.
256            rte_pktmbuf_refcnt_update(m, -1);
257            return m_ret;
258        } else {
259            // Short packet. Just copy all bytes.
260            m_ret = CGlobalInfo::pktmbuf_alloc( get_socket_id(), rte_pktmbuf_data_len(m) );
261            assert(m_ret);
262            char *p = rte_pktmbuf_mtod(m, char*);
263            char *p_new = rte_pktmbuf_append(m_ret, rte_pktmbuf_data_len(m));
264            memcpy(p_new , p, rte_pktmbuf_data_len(m));
265            fsp_head = (struct flow_stat_payload_header *)(p_new + rte_pktmbuf_data_len(m) - fsp_head_size);
266            rte_pktmbuf_free(m);
267            return m_ret;
268        }
269    } else {
270        // Field engine (vm)
271        if (rte_pktmbuf_is_contiguous(m)) {
272            // one, r/w mbuf
273            char *p = rte_pktmbuf_mtod(m, char*);
274            fsp_head = (struct flow_stat_payload_header *)(p + rte_pktmbuf_data_len(m) - fsp_head_size);
275            return m;
276        } else {
277            // We have: r/w --> read only.
278            // Changing to:
279            // (original) r/w -> (new) indirect (direct is original read_only, after trimming last bytes) -> (new) latency info
280            rte_mbuf_t *m_read_only = m->next, *m_indirect;
281
282            m_indirect = CGlobalInfo::pktmbuf_alloc_small(get_socket_id());
283            assert(m_indirect);
284            // alloc mbuf just for the latency header
285            m_lat = CGlobalInfo::pktmbuf_alloc( get_socket_id(), fsp_head_size);
286            assert(m_lat);
287            fsp_head = (struct flow_stat_payload_header *)rte_pktmbuf_append(m_lat, fsp_head_size);
288            utl_rte_pktmbuf_chain_with_indirect(m, m_indirect, m_read_only, m_lat);
289            m_indirect->data_len = (uint16_t)(m_indirect->data_len - fsp_head_size);
290            return m;
291        }
292    }
293}
294
295// test the const case of alloc_flow_stat_mbuf. The more complicated non const case is tested in the simulation.
296bool CGenNodeStateless::alloc_flow_stat_mbuf_test_const() {
297    rte_mbuf_t *m, *m_test;
298    uint16_t sizes[2] = {64, 500};
299    uint16_t size;
300    struct flow_stat_payload_header *fsp_head;
301    char *p;
302
303    set_socket_id(0);
304    for (int test_num = 0; test_num < sizeof(sizes)/sizeof(sizes[0]); test_num++) {
305        size = sizes[test_num];
306        m = CGlobalInfo::pktmbuf_alloc(get_socket_id(), size);
307        p = rte_pktmbuf_append(m, size);
308        for (int i = 0; i < size; i++) {
309            p[i] = (char)i;
310        }
311        m_test = alloc_flow_stat_mbuf(m, fsp_head, true);
312        p = rte_pktmbuf_mtod(m_test, char*);
313        assert(rte_pktmbuf_pkt_len(m_test) == size);
314        for (int i = 0; i < rte_pktmbuf_pkt_len(m_test) - sizeof(*fsp_head); i++) {
315            assert(p[i] == (char)i);
316        }
317        // verify fsp_head points correctly
318        if (size > 128) { // should match threshould in alloc_flow_stat_mbuf
319            assert(rte_pktmbuf_data_len(m_test) == size - sizeof(*fsp_head));
320            assert(rte_pktmbuf_data_len(m_test->next) == sizeof(*fsp_head));
321            assert((char *)fsp_head == rte_pktmbuf_mtod((m_test->next), char*));
322        } else {
323            assert(rte_pktmbuf_data_len(m_test) == size);
324            assert (((char *)fsp_head) + sizeof (*fsp_head) == p + rte_pktmbuf_data_len(m_test));
325        }
326        rte_pktmbuf_free(m_test);
327    }
328    return true;
329}
330
331rte_mbuf_t   * CGenNodeStateless::alloc_node_with_vm(){
332
333    rte_mbuf_t        * m;
334    /* alloc small packet buffer*/
335    uint16_t prefix_size = prefix_header_size();
336    m = CGlobalInfo::pktmbuf_alloc( get_socket_id(), prefix_size );
337    if (m==0) {
338        return (m);
339    }
340    /* TBD remove this, should handle cases of error */
341    assert(m);
342    char *p=rte_pktmbuf_append(m, prefix_size);
343    memcpy( p ,m_original_packet_data_prefix, prefix_size);
344
345
346    /* run the VM program */
347    StreamDPVmInstructionsRunner runner;
348    runner.set_mbuf(m);
349
350    runner.run( (uint32_t*)m_vm_flow_var,
351                m_vm_program_size,
352                m_vm_program,
353                m_vm_flow_var,
354                (uint8_t*)p);
355
356    uint16_t pkt_new_size=runner.get_new_pkt_size();
357    if ( likely( pkt_new_size == 0) ) {
358        /* no packet size change */
359        rte_mbuf_t * m_const = get_const_mbuf();
360        if (  m_const != NULL) {
361            utl_rte_pktmbuf_add_after(m,m_const);
362        }
363        return (m);
364    }
365
366    /* packet size change there are a few changes */
367    rte_mbuf_t * m_const = get_const_mbuf();
368    if ( (m_const == 0 ) || (pkt_new_size<=prefix_size) ) {
369        /* one mbuf , just trim it */
370        m->data_len = pkt_new_size;
371        m->pkt_len  = pkt_new_size;
372        return (m);
373    }
374
375    rte_mbuf_t * mi= CGlobalInfo::pktmbuf_alloc_small(get_socket_id());
376    assert(mi);
377    rte_pktmbuf_attach(mi,m_const);
378    utl_rte_pktmbuf_add_after2(m,mi);
379
380    if ( pkt_new_size < m->pkt_len) {
381        /* need to trim it */
382        mi->data_len = (pkt_new_size - prefix_size);
383        m->pkt_len   = pkt_new_size;
384    }
385    return (m);
386}
387
388void CGenNodeStateless::free_stl_vm_buf(){
389        rte_mbuf_t * m ;
390         m=get_const_mbuf();
391         if (m) {
392             rte_pktmbuf_free(m); /* reduce the ref counter */
393             /* clear the const marker */
394             clear_const_mbuf();
395         }
396
397         free_prefix_header();
398
399         if (m_vm_flow_var) {
400             /* free flow var */
401             free(m_vm_flow_var);
402             m_vm_flow_var=0;
403         }
404}
405
406
407
408void CGenNodeStateless::free_stl_node(){
409
410    if ( is_cache_mbuf_array() ){
411        /* do we have cache of mbuf pre allocated */
412        cache_mbuf_array_free();
413    }else{
414        /* if we have cache mbuf free it */
415        rte_mbuf_t * m=get_cache_mbuf();
416        if (m) {
417                rte_pktmbuf_free(m);
418                m_cache_mbuf=0;
419        }
420    }
421    free_stl_vm_buf();
422}
423
424
425bool TrexStatelessDpPerPort::update_number_of_active_streams(uint32_t d){
426    m_active_streams-=d; /* reduce the number of streams */
427    if (m_active_streams == 0) {
428        return (true);
429    }
430    return (false);
431}
432
433bool TrexStatelessDpPerPort::resume_traffic(uint8_t port_id){
434
435    /* we are working with continues streams so we must be in transmit mode */
436    assert(m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE);
437
438    for (auto dp_stream : m_active_nodes) {
439        CGenNodeStateless * node =dp_stream.m_node;
440        assert(node->get_port_id() == port_id);
441        assert(node->is_pause() == true);
442        node->set_pause(false);
443    }
444    m_state = TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
445    return (true);
446}
447
448bool TrexStatelessDpPerPort::update_traffic(uint8_t port_id, double factor) {
449
450    assert( (m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING ||
451            (m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE)) );
452
453    for (auto dp_stream : m_active_nodes) {
454        CGenNodeStateless * node = dp_stream.m_node;
455        assert(node->get_port_id() == port_id);
456
457        node->update_rate(factor);
458    }
459
460    return (true);
461}
462
463bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){
464
465    /* we are working with continues streams so we must be in transmit mode */
466    assert(m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING);
467
468    for (auto dp_stream : m_active_nodes) {
469        CGenNodeStateless * node =dp_stream.m_node;
470        assert(node->get_port_id() == port_id);
471        assert(node->is_pause() == false);
472        node->set_pause(true);
473    }
474    m_state = TrexStatelessDpPerPort::ppSTATE_PAUSE;
475    return (true);
476}
477
478bool TrexStatelessDpPerPort::push_pcap(uint8_t port_id,
479                                       const std::string &pcap_filename,
480                                       double ipg_usec,
481                                       double speedup,
482                                       uint32_t count,
483                                       bool is_dual) {
484
485    /* push pcap can only happen on an idle port from the core prespective */
486    assert(m_state == TrexStatelessDpPerPort::ppSTATE_IDLE);
487
488    CGenNodePCAP *pcap_node = m_core->allocate_pcap_node();
489    if (!pcap_node) {
490        return (false);
491    }
492
493    pkt_dir_t dir          = m_core->m_node_gen.m_v_if->port_id_to_dir(port_id);
494    socket_id_t socket_id  = m_core->m_node_gen.m_socket_id;
495
496    /* main port */
497    uint8_t mac_addr[12];
498    TRexPortAttr *master_port_attr = get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id);
499    master_port_attr->update_src_dst_mac(mac_addr);
500
501    /* for dual */
502    uint8_t slave_mac_addr[12];
503    TRexPortAttr *slave_port_attr = get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id ^ 0x1);
504    slave_port_attr->update_src_dst_mac(slave_mac_addr);
505
506    bool rc = pcap_node->create(port_id,
507                                dir,
508                                socket_id,
509                                mac_addr,
510                                slave_mac_addr,
511                                pcap_filename,
512                                ipg_usec,
513                                speedup,
514                                count,
515                                is_dual);
516    if (!rc) {
517        m_core->free_node((CGenNode *)pcap_node);
518        return (false);
519    }
520
521    /* schedule the node for now */
522    pcap_node->m_time = m_core->m_cur_time_sec;
523    m_core->m_node_gen.add_node((CGenNode *)pcap_node);
524
525    /* hold a pointer to the node */
526    assert(m_active_pcap_node == NULL);
527    m_active_pcap_node = pcap_node;
528
529    m_state = TrexStatelessDpPerPort::ppSTATE_PCAP_TX;
530    return (true);
531}
532
533
534bool TrexStatelessDpPerPort::stop_traffic(uint8_t  port_id,
535                                          bool     stop_on_id,
536                                          int      event_id){
537
538
539    if (m_state == TrexStatelessDpPerPort::ppSTATE_IDLE) {
540        assert(m_active_streams==0);
541        return false;
542    }
543
544    /* there could be race of stop after stop */
545    if ( stop_on_id ) {
546        if (event_id != m_event_id){
547            /* we can't stop it is an old message */
548            return false;
549        }
550    }
551
552    for (auto dp_stream : m_active_nodes) {
553        CGenNodeStateless * node =dp_stream.m_node;
554        assert(node->get_port_id() == port_id);
555        if ( node->get_state() == CGenNodeStateless::ss_ACTIVE) {
556            node->mark_for_free();
557            m_active_streams--;
558            dp_stream.DeleteOnlyStream();
559
560        }else{
561            dp_stream.Delete(m_core);
562        }
563    }
564
565    /* check for active PCAP node */
566    if (m_active_pcap_node) {
567        /* when got async stop from outside or duration */
568        if (m_active_pcap_node->is_active()) {
569            m_active_pcap_node->mark_for_free();
570        } else {
571            /* graceful stop - node was put out by the scheduler */
572            m_core->free_node( (CGenNode *)m_active_pcap_node);
573        }
574
575        m_active_pcap_node = NULL;
576    }
577
578    /* active stream should be zero */
579    assert(m_active_streams==0);
580    m_active_nodes.clear();
581    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
582    return (true);
583}
584
585
586void TrexStatelessDpPerPort::create(CFlowGenListPerThread   *  core){
587    m_core=core;
588    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
589    m_active_streams=0;
590    m_active_nodes.clear();
591    m_active_pcap_node = NULL;
592}
593
594
595
596void
597TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) {
598    m_thread_id = thread_id;
599    m_core = core;
600    m_local_port_offset = 2*core->getDualPortId();
601
602    CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
603
604    m_ring_from_cp = cp_dp->getRingCpToDp(thread_id);
605    m_ring_to_cp   = cp_dp->getRingDpToCp(thread_id);
606
607    m_state = STATE_IDLE;
608
609    int i;
610    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
611        m_ports[i].create(core);
612    }
613}
614
615
616/* move to the next stream, old stream move to INACTIVE */
617bool TrexStatelessDpCore::set_stateless_next_node(CGenNodeStateless * cur_node,
618                                                  CGenNodeStateless * next_node){
619
620    assert(cur_node);
621    TrexStatelessDpPerPort * lp_port = get_port_db(cur_node->m_port_id);
622    bool schedule =false;
623
624    bool to_stop_port=false;
625
626    if (next_node == NULL) {
627        /* there is no next stream , reduce the number of active streams*/
628        to_stop_port = lp_port->update_number_of_active_streams(1);
629
630    }else{
631        uint8_t state=next_node->get_state();
632
633        /* can't be FREE_RESUSE */
634        assert(state != CGenNodeStateless::ss_FREE_RESUSE);
635        if (state == CGenNodeStateless::ss_INACTIVE ) {
636
637            if (cur_node->m_action_counter > 0) {
638                cur_node->m_action_counter--;
639                if (cur_node->m_action_counter==0) {
640                    to_stop_port = lp_port->update_number_of_active_streams(1);
641                }else{
642                    /* refill start info and scedule, no update in active streams  */
643                    next_node->refresh();
644                    schedule = true;
645                }
646            }else{
647                /* refill start info and scedule, no update in active streams  */
648                next_node->refresh();
649                schedule = true;
650            }
651
652        }else{
653            to_stop_port = lp_port->update_number_of_active_streams(1);
654        }
655    }
656
657    if ( to_stop_port ) {
658        /* call stop port explictly to move the state */
659        stop_traffic(cur_node->m_port_id,false,0);
660    }
661
662    return ( schedule );
663}
664
665
666
667/**
668 * in idle state loop, the processor most of the time sleeps
669 * and periodically checks for messages
670 *
671 * @author imarom (01-Nov-15)
672 */
673void
674TrexStatelessDpCore::idle_state_loop() {
675
676    const int SHORT_DELAY_MS    = 2;
677    const int LONG_DELAY_MS     = 50;
678    const int DEEP_SLEEP_LIMIT  = 2000;
679
680    int counter = 0;
681
682    while (m_state == STATE_IDLE) {
683        m_core->tickle();
684        m_core->m_node_gen.m_v_if->handle_rx_queue();
685        bool had_msg = periodic_check_for_cp_messages();
686        if (had_msg) {
687            counter = 0;
688            continue;
689        }
690
691        /* enter deep sleep only if enough time had passed */
692        if (counter < DEEP_SLEEP_LIMIT) {
693            delay(SHORT_DELAY_MS);
694            counter++;
695        } else {
696            delay(LONG_DELAY_MS);
697        }
698
699    }
700}
701
702
703
704void TrexStatelessDpCore::quit_main_loop(){
705    m_core->set_terminate_mode(true); /* mark it as terminated */
706    m_state = STATE_TERMINATE;
707    add_global_duration(0.0001);
708}
709
710
711/**
712 * scehduler runs when traffic exists
713 * it will return when no more transmitting is done on this
714 * core
715 *
716 * @author imarom (01-Nov-15)
717 */
718void
719TrexStatelessDpCore::start_scheduler() {
720    /* creates a maintenace job using the scheduler */
721    CGenNode * node_sync = m_core->create_node() ;
722    node_sync->m_type = CGenNode::FLOW_SYNC;
723    node_sync->m_time = m_core->m_cur_time_sec + SYNC_TIME_OUT;
724
725    m_core->m_node_gen.add_node(node_sync);
726
727    double old_offset = 0.0;
728    m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset);
729    /* bail out in case of terminate */
730    if (m_state != TrexStatelessDpCore::STATE_TERMINATE) {
731        m_core->m_node_gen.close_file(m_core);
732        m_state = STATE_IDLE; /* we exit from all ports and we have nothing to do, we move to IDLE state */
733    }
734}
735
736
737void
738TrexStatelessDpCore::run_once(){
739
740    idle_state_loop();
741
742    if ( m_state == STATE_TERMINATE ){
743        return;
744    }
745
746    start_scheduler();
747}
748
749
750
751
752void
753TrexStatelessDpCore::start() {
754
755    while (true) {
756        run_once();
757
758        if ( m_core->is_terminated_by_master() ) {
759            break;
760        }
761    }
762}
763
764/* only if both port are idle we can exit */
765void
766TrexStatelessDpCore::schedule_exit(){
767
768    CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
769
770    node->m_type = CGenNode::COMMAND;
771
772    node->m_cmd = new TrexStatelessDpCanQuit();
773
774    /* make sure it will be scheduled after the current node */
775    node->m_time = m_core->m_cur_time_sec ;
776
777    m_core->m_node_gen.add_node((CGenNode *)node);
778}
779
780
781void
782TrexStatelessDpCore::add_global_duration(double duration){
783    if (duration > 0.0) {
784        CGenNode *node = m_core->create_node() ;
785
786        node->m_type = CGenNode::EXIT_SCHED;
787
788        /* make sure it will be scheduled after the current node */
789        node->m_time = m_core->m_cur_time_sec + duration ;
790
791        m_core->m_node_gen.add_node(node);
792    }
793}
794
795/* add per port exit */
796void
797TrexStatelessDpCore::add_port_duration(double duration,
798                                       uint8_t port_id,
799                                       int event_id){
800    if (duration > 0.0) {
801        CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
802
803        node->m_type = CGenNode::COMMAND;
804
805        /* make sure it will be scheduled after the current node */
806        node->m_time = m_core->m_cur_time_sec + duration ;
807
808        TrexStatelessDpStop * cmd=new TrexStatelessDpStop(port_id);
809
810
811        /* test this */
812        m_core->m_non_active_nodes++;
813        cmd->set_core_ptr(m_core);
814        cmd->set_event_id(event_id);
815        cmd->set_wait_for_event_id(true);
816
817        node->m_cmd = cmd;
818
819        m_core->m_node_gen.add_node((CGenNode *)node);
820    }
821}
822
823
824void TrexStatelessDpCore::update_mac_addr(TrexStream * stream,
825                                          CGenNodeStateless *node,
826                                          pkt_dir_t dir,
827                                          char *raw_pkt){
828
829    bool ov_src = stream->get_override_src_mac_by_pkt_data();
830    TrexStream::stream_dst_mac_t ov_dst = stream->get_override_dst_mac_mode();
831
832
833    if ( (ov_src == true) && (ov_dst == TrexStream::stPKT) ) {
834        /* nothing to do, take from the packet both */
835        return;
836    }
837
838    TRexPortAttr *port_attr = get_stateless_obj()->get_platform_api()->getPortAttrObj(node->get_port_id());
839
840    /* take from cfg_file */
841    if ( (ov_src == false) &&
842         (ov_dst == TrexStream::stCFG_FILE) ){
843
844          port_attr->update_src_dst_mac((uint8_t *)raw_pkt);
845          return;
846    }
847
848    /* save the pkt*/
849    char tmp_pkt[12];
850    memcpy(tmp_pkt,raw_pkt,12);
851
852    port_attr->update_src_dst_mac((uint8_t *)raw_pkt);
853
854    if ((ov_src == true) && (ov_dst == TrexStream::stCFG_FILE)) {
855        memcpy(raw_pkt+6,tmp_pkt+6,6);
856    }
857
858    if ((ov_src == false) && (ov_dst == TrexStream::stPKT)) {
859        memcpy(raw_pkt,tmp_pkt,6);
860    }
861}
862
863
864void TrexStatelessDpCore::replay_vm_into_cache(TrexStream * stream,
865                                               CGenNodeStateless *node){
866
867    uint16_t      cache_size = stream->m_cache_size;
868    assert(cache_size>0);
869    rte_mbuf_t * m=0;
870
871    uint32_t buf_size = CGenNodeCacheMbuf::get_object_size(cache_size);
872    CGenNodeCacheMbuf * p = (CGenNodeCacheMbuf *)malloc(buf_size);
873    assert(p);
874    memset(p,0,buf_size);
875
876    int i;
877    for (i=0; i<cache_size; i++) {
878        p->m_array[i] =  node->alloc_node_with_vm();
879    }
880    /* save const */
881    m=node->get_const_mbuf();
882    if (m) {
883        p->m_mbuf_const=m;
884        rte_pktmbuf_refcnt_update(m,1);
885    }
886
887    /* free all VM and const mbuf */
888    node->free_stl_vm_buf();
889
890    /* copy to local node meory */
891    node->cache_mbuf_array_copy(p,cache_size);
892
893    /* free the memory */
894    free(p);
895}
896
897
898void
899TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port,
900                                TrexStream * stream,
901                                TrexStreamsCompiledObj *comp) {
902
903    CGenNodeStateless *node = m_core->create_node_sl();
904
905    node->m_thread_id = m_thread_id;
906    node->cache_mbuf_array_init();
907    node->m_batch_size=0;
908
909    /* add periodic */
910    node->m_cache_mbuf=0;
911    node->m_type = CGenNode::STATELESS_PKT;
912
913    node->m_action_counter = stream->m_action_count;
914
915    /* clone the stream from control plane memory to DP memory */
916    node->m_ref_stream_info = stream->clone();
917    /* no need for this memory anymore on the control plane memory */
918    stream->release_dp_object();
919
920    node->m_next_stream=0; /* will be fixed later */
921
922    if ( stream->m_self_start ){
923        /* if self start it is in active mode */
924        node->m_state =CGenNodeStateless::ss_ACTIVE;
925        lp_port->m_active_streams++;
926    }else{
927        node->m_state =CGenNodeStateless::ss_INACTIVE;
928    }
929
930    node->m_time = m_core->m_cur_time_sec + stream->get_start_delay_sec();
931
932    pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(stream->m_port_id);
933    node->m_flags = 0;
934    node->m_src_port =0;
935    node->m_original_packet_data_prefix = 0;
936
937    if (stream->m_rx_check.m_enabled) {
938        node->set_stat_needed();
939        uint8_t hw_id = stream->m_rx_check.m_hw_id;
940        assert (hw_id < MAX_FLOW_STATS + MAX_FLOW_STATS_PAYLOAD);
941        node->set_stat_hw_id(hw_id);
942        // no support for cache with flow stat payload rules
943        if ((TrexPlatformApi::driver_stat_cap_e)stream->m_rx_check.m_rule_type == TrexPlatformApi::IF_STAT_PAYLOAD) {
944            stream->m_cache_size = 0;
945        }
946    }
947
948    /* set socket id */
949    node->set_socket_id(m_core->m_node_gen.m_socket_id);
950
951    /* build a mbuf from a packet */
952
953    uint16_t pkt_size = stream->m_pkt.len;
954    const uint8_t *stream_pkt = stream->m_pkt.binary;
955
956    node->m_pause =0;
957    node->m_stream_type = stream->m_type;
958    node->m_next_time_offset = 1.0 / stream->get_pps();
959    node->m_null_stream = (stream->m_null_stream ? 1 : 0);
960
961    /* stateless specific fields */
962    switch ( stream->m_type ) {
963
964    case TrexStream::stCONTINUOUS :
965        node->m_single_burst=0;
966        node->m_single_burst_refill=0;
967        node->m_multi_bursts=0;
968        break;
969
970    case TrexStream::stSINGLE_BURST :
971        node->m_stream_type             = TrexStream::stMULTI_BURST;
972        node->m_single_burst            = stream->m_burst_total_pkts;
973        node->m_single_burst_refill     = stream->m_burst_total_pkts;
974        node->m_multi_bursts            = 1;  /* single burst in multi burst of 1 */
975        break;
976
977    case TrexStream::stMULTI_BURST :
978        node->m_single_burst        = stream->m_burst_total_pkts;
979        node->m_single_burst_refill = stream->m_burst_total_pkts;
980        node->m_multi_bursts        = stream->m_num_bursts;
981        break;
982    default:
983
984        assert(0);
985    };
986
987    node->m_port_id = stream->m_port_id;
988
989    /* set dir 0 or 1 client or server */
990    node->set_mbuf_cache_dir(dir);
991
992
993    if (node->m_ref_stream_info->getDpVm() == NULL) {
994        /* no VM */
995
996        node->m_vm_flow_var =  NULL;
997        node->m_vm_program  =  NULL;
998        node->m_vm_program_size =0;
999
1000                /* allocate const mbuf */
1001        rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size);
1002        assert(m);
1003
1004        char *p = rte_pktmbuf_append(m, pkt_size);
1005        assert(p);
1006        /* copy the packet */
1007        memcpy(p,stream_pkt,pkt_size);
1008
1009        update_mac_addr(stream,node,dir,p);
1010
1011        /* set the packet as a readonly */
1012        node->set_cache_mbuf(m);
1013
1014        node->m_original_packet_data_prefix =0;
1015    }else{
1016
1017        /* set the program */
1018        TrexStream * local_mem_stream = node->m_ref_stream_info;
1019
1020        StreamVmDp  * lpDpVm = local_mem_stream->getDpVm();
1021
1022        node->m_vm_flow_var      = lpDpVm->clone_bss(); /* clone the flow var */
1023        node->m_vm_program       = lpDpVm->get_program(); /* same ref to the program */
1024        node->m_vm_program_size  = lpDpVm->get_program_size();
1025
1026        /* generate random seed if needed*/
1027        if (lpDpVm->is_random_seed()) {
1028            node->generate_random_seed();
1029        }
1030
1031        /* we need to copy the object */
1032        if ( pkt_size > lpDpVm->get_prefix_size() ) {
1033            /* we need const packet */
1034            uint16_t const_pkt_size  = pkt_size - lpDpVm->get_prefix_size() ;
1035            rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), const_pkt_size );
1036            assert(m);
1037
1038            char *p = rte_pktmbuf_append(m, const_pkt_size);
1039            assert(p);
1040
1041            /* copy packet data */
1042            memcpy(p,(stream_pkt + lpDpVm->get_prefix_size()),const_pkt_size);
1043
1044            node->set_const_mbuf(m);
1045        }
1046
1047
1048        if ( lpDpVm->is_pkt_size_var() ) {
1049            // mark the node as varible size
1050            node->set_var_pkt_size();
1051        }
1052
1053
1054        if (lpDpVm->get_prefix_size() > pkt_size ) {
1055            lpDpVm->set_prefix_size(pkt_size);
1056        }
1057
1058        /* copy the headr */
1059        uint16_t header_size = lpDpVm->get_prefix_size();
1060        assert(header_size);
1061        node->alloc_prefix_header(header_size);
1062        uint8_t *p=node->m_original_packet_data_prefix;
1063        assert(p);
1064
1065        memcpy(p,stream_pkt , header_size);
1066
1067        update_mac_addr(stream,node,dir,(char *)p);
1068
1069        if (stream->m_cache_size > 0 ) {
1070            /* we need to create cache of objects */
1071            replay_vm_into_cache(stream, node);
1072        }
1073    }
1074
1075
1076    CDpOneStream one_stream;
1077
1078    one_stream.m_dp_stream = node->m_ref_stream_info;
1079    one_stream.m_node =node;
1080
1081    lp_port->m_active_nodes.push_back(one_stream);
1082
1083    /* schedule only if active */
1084    if (node->m_state == CGenNodeStateless::ss_ACTIVE) {
1085        m_core->m_node_gen.add_node((CGenNode *)node);
1086    }
1087}
1088
1089void
1090TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj,
1091                                   double duration,
1092                                   int event_id) {
1093
1094
1095    TrexStatelessDpPerPort * lp_port=get_port_db(obj->get_port_id());
1096    lp_port->m_active_streams = 0;
1097    lp_port->set_event_id(event_id);
1098
1099    /* update cur time */
1100    if ( CGlobalInfo::is_realtime()  ){
1101        m_core->m_cur_time_sec = now_sec() + SCHD_OFFSET_DTIME ;
1102    }
1103
1104    /* no nodes in the list */
1105    assert(lp_port->m_active_nodes.size()==0);
1106
1107    for (auto single_stream : obj->get_objects()) {
1108        /* all commands should be for the same port */
1109        assert(obj->get_port_id() == single_stream.m_stream->m_port_id);
1110        add_stream(lp_port,single_stream.m_stream,obj);
1111    }
1112
1113    uint32_t nodes = lp_port->m_active_nodes.size();
1114    /* find next stream */
1115    assert(nodes == obj->get_objects().size());
1116
1117    int cnt=0;
1118
1119    /* set the next_stream pointer  */
1120    for (auto single_stream : obj->get_objects()) {
1121
1122        if (single_stream.m_stream->is_dp_next_stream() ) {
1123            int stream_id = single_stream.m_stream->m_next_stream_id;
1124            assert(stream_id<nodes);
1125            /* point to the next stream , stream_id is fixed */
1126            lp_port->m_active_nodes[cnt].m_node->m_next_stream = lp_port->m_active_nodes[stream_id].m_node ;
1127        }
1128        cnt++;
1129    }
1130
1131    lp_port->m_state =TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
1132    m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
1133
1134
1135    if ( duration > 0.0 ){
1136        add_port_duration( duration ,obj->get_port_id(),event_id );
1137    }
1138
1139}
1140
1141
1142bool TrexStatelessDpCore::are_all_ports_idle(){
1143
1144    bool res=true;
1145    int i;
1146    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
1147        if ( m_ports[i].m_state != TrexStatelessDpPerPort::ppSTATE_IDLE ){
1148            res=false;
1149        }
1150    }
1151    return (res);
1152}
1153
1154
1155void
1156TrexStatelessDpCore::resume_traffic(uint8_t port_id){
1157
1158    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1159
1160    lp_port->resume_traffic(port_id);
1161}
1162
1163
1164void
1165TrexStatelessDpCore::pause_traffic(uint8_t port_id){
1166
1167    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1168
1169    lp_port->pause_traffic(port_id);
1170}
1171
1172void
1173TrexStatelessDpCore::push_pcap(uint8_t port_id,
1174                               int event_id,
1175                               const std::string &pcap_filename,
1176                               double ipg_usec,
1177                               double speedup,
1178                               uint32_t count,
1179                               double duration,
1180                               bool is_dual) {
1181
1182    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1183
1184    lp_port->set_event_id(event_id);
1185
1186    /* delegate the command to the port */
1187    bool rc = lp_port->push_pcap(port_id, pcap_filename, ipg_usec, speedup, count, is_dual);
1188    if (!rc) {
1189        /* report back that we stopped */
1190        CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
1191        TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
1192                                                                       port_id,
1193                                                                       event_id,
1194                                                                       false);
1195        ring->Enqueue((CGenNode *)event_msg);
1196        return;
1197    }
1198
1199
1200    if (duration > 0.0) {
1201        add_port_duration(duration, port_id, event_id);
1202    }
1203
1204     m_state = TrexStatelessDpCore::STATE_PCAP_TX;
1205}
1206
1207void
1208TrexStatelessDpCore::update_traffic(uint8_t port_id, double factor) {
1209
1210    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1211
1212    lp_port->update_traffic(port_id, factor);
1213}
1214
1215
1216void
1217TrexStatelessDpCore::stop_traffic(uint8_t  port_id,
1218                                  bool     stop_on_id,
1219                                  int      event_id) {
1220    /* we cannot remove nodes not from the top of the queue so
1221       for every active node - make sure next time
1222       the scheduler invokes it, it will be free */
1223
1224    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1225    if ( lp_port->stop_traffic(port_id,stop_on_id,event_id) == false){
1226        return;
1227    }
1228
1229    /* flush the TX queue before sending done message to the CP */
1230    m_core->flush_tx_queue();
1231
1232    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
1233    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
1234                                                                   port_id,
1235                                                                   lp_port->get_event_id());
1236    ring->Enqueue((CGenNode *)event_msg);
1237
1238}
1239
1240/**
1241 * handle a message from CP to DP
1242 *
1243 */
1244void
1245TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) {
1246    msg->handle(this);
1247    delete msg;
1248}
1249
1250void
1251TrexStatelessDpCore::barrier(uint8_t port_id, int event_id) {
1252
1253    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
1254    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
1255                                                                   port_id,
1256                                                                   event_id);
1257    ring->Enqueue((CGenNode *)event_msg);
1258}
1259
1260
1261/**
1262 * PCAP node
1263 */
1264bool CGenNodePCAP::create(uint8_t port_id,
1265                          pkt_dir_t dir,
1266                          socket_id_t socket_id,
1267                          const uint8_t *mac_addr,
1268                          const uint8_t *slave_mac_addr,
1269                          const std::string &pcap_filename,
1270                          double ipg_usec,
1271                          double speedup,
1272                          uint32_t count,
1273                          bool is_dual) {
1274    std::stringstream ss;
1275
1276    m_type       = CGenNode::PCAP_PKT;
1277    m_flags      = 0;
1278    m_src_port   = 0;
1279    m_port_id    = port_id;
1280    m_count      = count;
1281    m_is_dual    = is_dual;
1282    m_dir        = dir;
1283
1284    /* mark this node as slow path */
1285    set_slow_path(true);
1286
1287    if (ipg_usec != -1) {
1288        /* fixed IPG */
1289        m_ipg_sec = usec_to_sec(ipg_usec / speedup);
1290        m_speedup = 0;
1291    } else {
1292        /* packet IPG */
1293        m_ipg_sec = -1;
1294        m_speedup  = speedup;
1295    }
1296
1297    /* copy MAC addr info */
1298    memcpy(m_mac_addr, mac_addr, 12);
1299    memcpy(m_slave_mac_addr, slave_mac_addr, 12);
1300
1301
1302    set_socket_id(socket_id);
1303
1304    /* create the PCAP reader */
1305    m_reader = CCapReaderFactory::CreateReader((char *)pcap_filename.c_str(), 0, ss);
1306    if (!m_reader) {
1307        return false;
1308    }
1309
1310    m_raw_packet = new CCapPktRaw();
1311    if ( m_reader->ReadPacket(m_raw_packet) == false ){
1312        /* handle error */
1313        delete m_reader;
1314        return (false);
1315    }
1316
1317    /* set the dir */
1318    set_mbuf_dir(dir);
1319
1320    /* update the direction (for dual mode) */
1321    update_pkt_dir();
1322
1323    /* this is the reference time */
1324    m_last_pkt_time = m_raw_packet->get_time();
1325
1326    /* ready */
1327    m_state = PCAP_ACTIVE;
1328
1329    return true;
1330}
1331
1332/**
1333 * cleanup for PCAP node
1334 *
1335 * @author imarom (08-May-16)
1336 */
1337void CGenNodePCAP::destroy() {
1338
1339    if (m_raw_packet) {
1340        delete m_raw_packet;
1341        m_raw_packet = NULL;
1342    }
1343
1344    if (m_reader) {
1345        delete m_reader;
1346        m_reader = NULL;
1347    }
1348
1349    m_state = PCAP_INVALID;
1350}
1351
1352