trex_stateless_dp_core.cpp revision e4c8e44b
1/*
2 Itay Marom
3 Hanoch Haim
4 Cisco Systems, Inc.
5*/
6
7/*
8Copyright (c) 2015-2016 Cisco Systems, Inc.
9
10Licensed under the Apache License, Version 2.0 (the "License");
11you may not use this file except in compliance with the License.
12You may obtain a copy of the License at
13
14    http://www.apache.org/licenses/LICENSE-2.0
15
16Unless required by applicable law or agreed to in writing, software
17distributed under the License is distributed on an "AS IS" BASIS,
18WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19See the License for the specific language governing permissions and
20limitations under the License.
21*/
22#include "bp_sim.h"
23#include "trex_stateless_dp_core.h"
24#include "trex_stateless_messaging.h"
25#include "trex_stream.h"
26#include "trex_stream_node.h"
27#include "trex_streams_compiler.h"
28#include "mbuf.h"
29
30
31
32
33void CGenNodeStateless::cache_mbuf_array_init(){
34    m_cache_size=0;
35    m_cache_array_cnt=0;
36}
37
38
39
40void CGenNodeStateless::cache_mbuf_array_copy(CGenNodeCacheMbuf *obj,
41                                              uint16_t size){
42
43    int i;
44    cache_mbuf_array_alloc(size);
45    for (i=0; i<size; i++) {
46        cache_mbuf_array_set(i,obj->m_array[i]);
47    }
48    cache_mbuf_array_set_const_mbuf(obj->m_mbuf_const);
49}
50
51
52rte_mbuf_t ** CGenNodeStateless::cache_mbuf_array_alloc(uint16_t size){
53
54    uint32_t buf_size = CGenNodeCacheMbuf::get_object_size(size);
55    /* TBD  replace with align, zero API */
56    m_cache_mbuf = (void *)malloc(buf_size);
57    assert(m_cache_mbuf);
58    memset(m_cache_mbuf,0,buf_size);
59
60    m_flags |= SL_NODE_CONST_MBUF_CACHE_ARRAY;
61    m_cache_size=size;
62    m_cache_array_cnt=0;
63    return ((rte_mbuf_t **)m_cache_mbuf);
64}
65
66void CGenNodeStateless::cache_mbuf_array_free(){
67
68    assert(m_cache_mbuf);
69    int i;
70    for (i=0; i<(int)m_cache_size; i++) {
71        rte_mbuf_t * m=cache_mbuf_array_get((uint16_t)i);
72        assert(m);
73        rte_pktmbuf_free(m);
74    }
75
76    /* free the const */
77    rte_mbuf_t * m=cache_mbuf_array_get_const_mbuf() ;
78    if (m) {
79        rte_pktmbuf_free(m);
80    }
81
82    free(m_cache_mbuf);
83    m_cache_mbuf=0;
84}
85
86
87rte_mbuf_t * CGenNodeStateless::cache_mbuf_array_get(uint16_t index){
88
89    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
90    return (p->m_array[index]);
91}
92
93void CGenNodeStateless::cache_mbuf_array_set_const_mbuf(rte_mbuf_t * m){
94    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
95    p->m_mbuf_const=m;
96}
97
98rte_mbuf_t * CGenNodeStateless::cache_mbuf_array_get_const_mbuf(){
99    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
100    return (p->m_mbuf_const);
101}
102
103
104void CGenNodeStateless::cache_mbuf_array_set(uint16_t index,
105                                             rte_mbuf_t * m){
106    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
107    p->m_array[index]=m;
108}
109
110
111void CDpOneStream::Delete(CFlowGenListPerThread   * core){
112    assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE);
113    core->free_node((CGenNode *)m_node);
114    delete m_dp_stream;
115    m_node=0;
116    m_dp_stream=0;
117}
118
119void CDpOneStream::DeleteOnlyStream(){
120    assert(m_dp_stream);
121    delete m_dp_stream;
122    m_dp_stream=0;
123}
124
125int CGenNodeStateless::get_stream_id(){
126    if (m_state ==CGenNodeStateless::ss_FREE_RESUSE) {
127        return (-1); // not valid
128    }
129    assert(m_ref_stream_info);
130    return ((int)m_ref_stream_info->m_stream_id);
131}
132
133
134void CGenNodeStateless::DumpHeader(FILE *fd){
135    fprintf(fd," pkt_id, time, port , action , state, stream_id , stype , m-burst# , burst# \n");
136
137}
138void CGenNodeStateless::Dump(FILE *fd){
139    fprintf(fd," %2.4f, %3lu, %s,%s, %3d, %s, %3lu, %3lu  \n",
140            m_time,
141            (ulong)m_port_id,
142            "s-pkt", //action
143            get_stream_state_str(m_state ).c_str(),
144            get_stream_id(),   //stream_id
145            TrexStream::get_stream_type_str(m_stream_type).c_str(), //stype
146            (ulong)m_multi_bursts,
147            (ulong)m_single_burst
148            );
149}
150
151
152void CGenNodeStateless::generate_random_seed() {
153    /* seed can be provided by the user */
154    uint32_t unique_seed;
155    if (m_ref_stream_info->m_random_seed) {
156        unique_seed = m_ref_stream_info->m_random_seed;
157    } else {
158        unsigned int tmp = (unsigned int)time(NULL);
159        unique_seed = rand_r(&tmp);
160    }
161
162    /* per thread divergence */
163    unique_seed = (unique_seed * ( (m_thread_id + 1) * 514229 ) ) & 0xFFFFFFFF;
164
165    /* set random */
166    set_random_seed(unique_seed);
167}
168
169
170void CGenNodeStateless::refresh_vm_bss() {
171    if ( m_vm_flow_var ) {
172        StreamVmDp  * vm_s=m_ref_stream_info->m_vm_dp;
173        assert(vm_s);
174        memcpy(m_vm_flow_var,vm_s->get_bss(),vm_s->get_bss_size());
175
176        if ( vm_s->is_random_seed() ) {
177            generate_random_seed();
178        }
179
180    }
181}
182
183
184
185/**
186 * this function called when stream restart after it was inactive
187 */
188void CGenNodeStateless::refresh(){
189
190    /* refill the stream info */
191    m_single_burst    = m_single_burst_refill;
192    m_multi_bursts    = m_ref_stream_info->m_num_bursts;
193    m_state           = CGenNodeStateless::ss_ACTIVE;
194
195    /* refresh init value */
196#if 0
197    /* TBD should add a JSON varible for that */
198    refresh_vm_bss();
199#endif
200}
201
202
203void CGenNodeCommand::free_command(){
204
205    assert(m_cmd);
206    m_cmd->on_node_remove();
207    delete m_cmd;
208}
209
210
211std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state){
212    std::string res;
213
214    switch (stream_state) {
215    case CGenNodeStateless::ss_FREE_RESUSE :
216         res="FREE    ";
217        break;
218    case CGenNodeStateless::ss_INACTIVE :
219        res="INACTIVE ";
220        break;
221    case CGenNodeStateless::ss_ACTIVE :
222        res="ACTIVE   ";
223        break;
224    default:
225        res="Unknow   ";
226    };
227    return(res);
228}
229
230/*
231 * Allocate mbuf for flow stat (and latency) info sending
232 * m - Original mbuf (can be complicated mbuf data structure)
233 * fsp_head - return pointer in which the flow stat info should be filled
234 * is_const - is the given mbuf const
235 * return new mbuf structure in which the fsp_head can be written. If needed, orginal mbuf is freed.
236 */
237rte_mbuf_t * CGenNodeStateless::alloc_flow_stat_mbuf(rte_mbuf_t *m, struct flow_stat_payload_header *&fsp_head
238                                                     , bool is_const) {
239    rte_mbuf_t *m_ret = NULL, *m_lat = NULL;
240    uint16_t fsp_head_size = sizeof(struct flow_stat_payload_header);
241
242    if (is_const) {
243        // const mbuf case
244        if (rte_pktmbuf_data_len(m) > 128) {
245            m_ret = CGlobalInfo::pktmbuf_alloc_small(get_socket_id());
246            assert(m_ret);
247            // alloc mbuf just for the latency header
248            m_lat = CGlobalInfo::pktmbuf_alloc( get_socket_id(), fsp_head_size);
249            assert(m_lat);
250            fsp_head = (struct flow_stat_payload_header *)rte_pktmbuf_append(m_lat, fsp_head_size);
251            rte_pktmbuf_attach(m_ret, m);
252            rte_pktmbuf_trim(m_ret, sizeof(struct flow_stat_payload_header));
253            utl_rte_pktmbuf_add_after2(m_ret, m_lat);
254            // ref count was updated when we took the (const) mbuf, and again in rte_pktmbuf_attach
255            // so need do decrease now, to avoid leak.
256            rte_pktmbuf_refcnt_update(m, -1);
257            return m_ret;
258        } else {
259            // Short packet. Just copy all bytes.
260            m_ret = CGlobalInfo::pktmbuf_alloc( get_socket_id(), rte_pktmbuf_data_len(m) );
261            assert(m_ret);
262            char *p = rte_pktmbuf_mtod(m, char*);
263            char *p_new = rte_pktmbuf_append(m_ret, rte_pktmbuf_data_len(m));
264            memcpy(p_new , p, rte_pktmbuf_data_len(m));
265            fsp_head = (struct flow_stat_payload_header *)(p_new + rte_pktmbuf_data_len(m) - fsp_head_size);
266            rte_pktmbuf_free(m);
267            return m_ret;
268        }
269    } else {
270        // Field engine (vm)
271        if (rte_pktmbuf_is_contiguous(m)) {
272            // one, r/w mbuf
273            char *p = rte_pktmbuf_mtod(m, char*);
274            fsp_head = (struct flow_stat_payload_header *)(p + rte_pktmbuf_data_len(m) - fsp_head_size);
275            return m;
276        } else {
277            // We have: r/w --> read only.
278            // Changing to:
279            // (original) r/w -> (new) indirect (direct is original read_only, after trimming last bytes) -> (new) latency info
280            rte_mbuf_t *m_read_only = m->next, *m_indirect;
281
282            m_indirect = CGlobalInfo::pktmbuf_alloc_small(get_socket_id());
283            assert(m_indirect);
284            // alloc mbuf just for the latency header
285            m_lat = CGlobalInfo::pktmbuf_alloc( get_socket_id(), fsp_head_size);
286            assert(m_lat);
287            fsp_head = (struct flow_stat_payload_header *)rte_pktmbuf_append(m_lat, fsp_head_size);
288            utl_rte_pktmbuf_chain_with_indirect(m, m_indirect, m_read_only, m_lat);
289            m_indirect->data_len = (uint16_t)(m_indirect->data_len - fsp_head_size);
290            return m;
291        }
292    }
293}
294
295// test the const case of alloc_flow_stat_mbuf. The more complicated non const case is tested in the simulation.
296bool CGenNodeStateless::alloc_flow_stat_mbuf_test_const() {
297    rte_mbuf_t *m, *m_test;
298    uint16_t sizes[2] = {64, 500};
299    uint16_t size;
300    struct flow_stat_payload_header *fsp_head;
301    char *p;
302
303    set_socket_id(0);
304    for (int test_num = 0; test_num < sizeof(sizes)/sizeof(sizes[0]); test_num++) {
305        size = sizes[test_num];
306        m = CGlobalInfo::pktmbuf_alloc(get_socket_id(), size);
307        p = rte_pktmbuf_append(m, size);
308        for (int i = 0; i < size; i++) {
309            p[i] = (char)i;
310        }
311        m_test = alloc_flow_stat_mbuf(m, fsp_head, true);
312        p = rte_pktmbuf_mtod(m_test, char*);
313        assert(rte_pktmbuf_pkt_len(m_test) == size);
314        for (int i = 0; i < rte_pktmbuf_pkt_len(m_test) - sizeof(*fsp_head); i++) {
315            assert(p[i] == (char)i);
316        }
317        // verify fsp_head points correctly
318        if (size > 128) { // should match threshould in alloc_flow_stat_mbuf
319            assert(rte_pktmbuf_data_len(m_test) == size - sizeof(*fsp_head));
320            assert(rte_pktmbuf_data_len(m_test->next) == sizeof(*fsp_head));
321            assert((char *)fsp_head == rte_pktmbuf_mtod((m_test->next), char*));
322        } else {
323            assert(rte_pktmbuf_data_len(m_test) == size);
324            assert (((char *)fsp_head) + sizeof (*fsp_head) == p + rte_pktmbuf_data_len(m_test));
325        }
326        rte_pktmbuf_free(m_test);
327    }
328    return true;
329}
330
331rte_mbuf_t   * CGenNodeStateless::alloc_node_with_vm(){
332
333    rte_mbuf_t        * m;
334    /* alloc small packet buffer*/
335    uint16_t prefix_size = prefix_header_size();
336    m = CGlobalInfo::pktmbuf_alloc( get_socket_id(), prefix_size );
337    if (m==0) {
338        return (m);
339    }
340    /* TBD remove this, should handle cases of error */
341    assert(m);
342    char *p=rte_pktmbuf_append(m, prefix_size);
343    memcpy( p ,m_original_packet_data_prefix, prefix_size);
344
345
346    /* run the VM program */
347    StreamDPVmInstructionsRunner runner;
348    runner.set_mbuf(m);
349
350    runner.run( (uint32_t*)m_vm_flow_var,
351                m_vm_program_size,
352                m_vm_program,
353                m_vm_flow_var,
354                (uint8_t*)p);
355
356    uint16_t pkt_new_size=runner.get_new_pkt_size();
357    if ( likely( pkt_new_size == 0) ) {
358        /* no packet size change */
359        rte_mbuf_t * m_const = get_const_mbuf();
360        if (  m_const != NULL) {
361            utl_rte_pktmbuf_add_after(m,m_const);
362        }
363        return (m);
364    }
365
366    /* packet size change there are a few changes */
367    rte_mbuf_t * m_const = get_const_mbuf();
368    if ( (m_const == 0 ) || (pkt_new_size<=prefix_size) ) {
369        /* one mbuf , just trim it */
370        m->data_len = pkt_new_size;
371        m->pkt_len  = pkt_new_size;
372        return (m);
373    }
374
375    rte_mbuf_t * mi= CGlobalInfo::pktmbuf_alloc_small(get_socket_id());
376    assert(mi);
377    rte_pktmbuf_attach(mi,m_const);
378    utl_rte_pktmbuf_add_after2(m,mi);
379
380    if ( pkt_new_size < m->pkt_len) {
381        /* need to trim it */
382        mi->data_len = (pkt_new_size - prefix_size);
383        m->pkt_len   = pkt_new_size;
384    }
385    return (m);
386}
387
388void CGenNodeStateless::free_stl_vm_buf(){
389        rte_mbuf_t * m ;
390         m=get_const_mbuf();
391         if (m) {
392             rte_pktmbuf_free(m); /* reduce the ref counter */
393             /* clear the const marker */
394             clear_const_mbuf();
395         }
396
397         free_prefix_header();
398
399         if (m_vm_flow_var) {
400             /* free flow var */
401             free(m_vm_flow_var);
402             m_vm_flow_var=0;
403         }
404}
405
406
407
408void CGenNodeStateless::free_stl_node(){
409
410    if ( is_cache_mbuf_array() ){
411        /* do we have cache of mbuf pre allocated */
412        cache_mbuf_array_free();
413    }else{
414        /* if we have cache mbuf free it */
415        rte_mbuf_t * m=get_cache_mbuf();
416        if (m) {
417                rte_pktmbuf_free(m);
418                m_cache_mbuf=0;
419        }
420    }
421    free_stl_vm_buf();
422}
423
424
425bool TrexStatelessDpPerPort::update_number_of_active_streams(uint32_t d){
426    m_active_streams-=d; /* reduce the number of streams */
427    if (m_active_streams == 0) {
428        return (true);
429    }
430    return (false);
431}
432
433bool TrexStatelessDpPerPort::resume_traffic(uint8_t port_id){
434
435    /* we are working with continues streams so we must be in transmit mode */
436    assert(m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE);
437
438    for (auto dp_stream : m_active_nodes) {
439        CGenNodeStateless * node =dp_stream.m_node;
440        assert(node->get_port_id() == port_id);
441        assert(node->is_pause() == true);
442        node->set_pause(false);
443    }
444    m_state = TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
445    return (true);
446}
447
448bool TrexStatelessDpPerPort::update_traffic(uint8_t port_id, double factor) {
449
450    assert( (m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING ||
451            (m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE)) );
452
453    for (auto dp_stream : m_active_nodes) {
454        CGenNodeStateless * node = dp_stream.m_node;
455        assert(node->get_port_id() == port_id);
456
457        node->update_rate(factor);
458    }
459
460    return (true);
461}
462
463bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){
464
465    /* we are working with continues streams so we must be in transmit mode */
466    assert(m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING);
467
468    for (auto dp_stream : m_active_nodes) {
469        CGenNodeStateless * node =dp_stream.m_node;
470        assert(node->get_port_id() == port_id);
471        assert(node->is_pause() == false);
472        node->set_pause(true);
473    }
474    m_state = TrexStatelessDpPerPort::ppSTATE_PAUSE;
475    return (true);
476}
477
478bool TrexStatelessDpPerPort::push_pcap(uint8_t port_id,
479                                       const std::string &pcap_filename,
480                                       double ipg_usec,
481                                       double min_ipg_sec,
482                                       double speedup,
483                                       uint32_t count,
484                                       bool is_dual) {
485
486    /* push pcap can only happen on an idle port from the core prespective */
487    assert(m_state == TrexStatelessDpPerPort::ppSTATE_IDLE);
488
489    CGenNodePCAP *pcap_node = m_core->allocate_pcap_node();
490    if (!pcap_node) {
491        return (false);
492    }
493
494    pkt_dir_t dir          = m_core->m_node_gen.m_v_if->port_id_to_dir(port_id);
495    socket_id_t socket_id  = m_core->m_node_gen.m_socket_id;
496
497    /* main port */
498    uint8_t mac_addr[12];
499    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, mac_addr);
500
501    /* for dual */
502    uint8_t slave_mac_addr[12];
503    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir ^ 0x1, slave_mac_addr);
504
505    bool rc = pcap_node->create(port_id,
506                                dir,
507                                socket_id,
508                                mac_addr,
509                                slave_mac_addr,
510                                pcap_filename,
511                                ipg_usec,
512                                min_ipg_sec,
513                                speedup,
514                                count,
515                                is_dual);
516    if (!rc) {
517        m_core->free_node((CGenNode *)pcap_node);
518        return (false);
519    }
520
521    /* schedule the node for now */
522    pcap_node->m_time = m_core->m_cur_time_sec;
523    m_core->m_node_gen.add_node((CGenNode *)pcap_node);
524
525    /* hold a pointer to the node */
526    assert(m_active_pcap_node == NULL);
527    m_active_pcap_node = pcap_node;
528
529    m_state = TrexStatelessDpPerPort::ppSTATE_PCAP_TX;
530    return (true);
531}
532
533
534bool TrexStatelessDpPerPort::stop_traffic(uint8_t  port_id,
535                                          bool     stop_on_id,
536                                          int      event_id){
537
538
539    if (m_state == TrexStatelessDpPerPort::ppSTATE_IDLE) {
540        assert(m_active_streams==0);
541        return false;
542    }
543
544    /* there could be race of stop after stop */
545    if ( stop_on_id ) {
546        if (event_id != m_event_id){
547            /* we can't stop it is an old message */
548            return false;
549        }
550    }
551
552    for (auto dp_stream : m_active_nodes) {
553        CGenNodeStateless * node =dp_stream.m_node;
554        assert(node->get_port_id() == port_id);
555        if ( node->get_state() == CGenNodeStateless::ss_ACTIVE) {
556            node->mark_for_free();
557            m_active_streams--;
558            dp_stream.DeleteOnlyStream();
559
560        }else{
561            dp_stream.Delete(m_core);
562        }
563    }
564
565    /* check for active PCAP node */
566    if (m_active_pcap_node) {
567        /* when got async stop from outside or duration */
568        if (m_active_pcap_node->is_active()) {
569            m_active_pcap_node->mark_for_free();
570        } else {
571            /* graceful stop - node was put out by the scheduler */
572            m_core->free_node( (CGenNode *)m_active_pcap_node);
573        }
574
575        m_active_pcap_node = NULL;
576    }
577
578    /* active stream should be zero */
579    assert(m_active_streams==0);
580    m_active_nodes.clear();
581    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
582    return (true);
583}
584
585
586void TrexStatelessDpPerPort::create(CFlowGenListPerThread   *  core){
587    m_core=core;
588    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
589    m_active_streams=0;
590    m_active_nodes.clear();
591    m_active_pcap_node = NULL;
592}
593
594
595
596void
597TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) {
598    m_thread_id = thread_id;
599    m_core = core;
600    m_local_port_offset = 2*core->getDualPortId();
601
602    CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
603
604    m_ring_from_cp = cp_dp->getRingCpToDp(thread_id);
605    m_ring_to_cp   = cp_dp->getRingDpToCp(thread_id);
606
607    m_state = STATE_IDLE;
608
609    int i;
610    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
611        m_ports[i].create(core);
612    }
613}
614
615
616/* move to the next stream, old stream move to INACTIVE */
617bool TrexStatelessDpCore::set_stateless_next_node(CGenNodeStateless * cur_node,
618                                                  CGenNodeStateless * next_node){
619
620    assert(cur_node);
621    TrexStatelessDpPerPort * lp_port = get_port_db(cur_node->m_port_id);
622    bool schedule =false;
623
624    bool to_stop_port=false;
625
626    if (next_node == NULL) {
627        /* there is no next stream , reduce the number of active streams*/
628        to_stop_port = lp_port->update_number_of_active_streams(1);
629
630    }else{
631        uint8_t state=next_node->get_state();
632
633        /* can't be FREE_RESUSE */
634        assert(state != CGenNodeStateless::ss_FREE_RESUSE);
635        if (state == CGenNodeStateless::ss_INACTIVE ) {
636
637            if (cur_node->m_action_counter > 0) {
638                cur_node->m_action_counter--;
639                if (cur_node->m_action_counter==0) {
640                    to_stop_port = lp_port->update_number_of_active_streams(1);
641                }else{
642                    /* refill start info and scedule, no update in active streams  */
643                    next_node->refresh();
644                    schedule = true;
645                }
646            }else{
647                /* refill start info and scedule, no update in active streams  */
648                next_node->refresh();
649                schedule = true;
650            }
651
652        }else{
653            to_stop_port = lp_port->update_number_of_active_streams(1);
654        }
655    }
656
657    if ( to_stop_port ) {
658        /* call stop port explictly to move the state */
659        stop_traffic(cur_node->m_port_id,false,0);
660    }
661
662    return ( schedule );
663}
664
665
666
667/**
668 * in idle state loop, the processor most of the time sleeps
669 * and periodically checks for messages
670 *
671 * @author imarom (01-Nov-15)
672 */
673void
674TrexStatelessDpCore::idle_state_loop() {
675
676    const int SHORT_DELAY_MS    = 2;
677    const int LONG_DELAY_MS     = 50;
678    const int DEEP_SLEEP_LIMIT  = 2000;
679
680    int counter = 0;
681
682    while (m_state == STATE_IDLE) {
683        m_core->tickle();
684        m_core->m_node_gen.m_v_if->handle_rx_queue();
685        bool had_msg = periodic_check_for_cp_messages();
686        if (had_msg) {
687            counter = 0;
688            continue;
689        }
690
691        /* enter deep sleep only if enough time had passed */
692        if (counter < DEEP_SLEEP_LIMIT) {
693            delay(SHORT_DELAY_MS);
694            counter++;
695        } else {
696            delay(LONG_DELAY_MS);
697        }
698
699    }
700}
701
702
703
704void TrexStatelessDpCore::quit_main_loop(){
705    m_core->set_terminate_mode(true); /* mark it as terminated */
706    m_state = STATE_TERMINATE;
707    add_global_duration(0.0001);
708}
709
710
711/**
712 * scehduler runs when traffic exists
713 * it will return when no more transmitting is done on this
714 * core
715 *
716 * @author imarom (01-Nov-15)
717 */
718void
719TrexStatelessDpCore::start_scheduler() {
720    /* creates a maintenace job using the scheduler */
721    CGenNode * node_sync = m_core->create_node() ;
722    node_sync->m_type = CGenNode::FLOW_SYNC;
723    node_sync->m_time = m_core->m_cur_time_sec + SYNC_TIME_OUT;
724
725    m_core->m_node_gen.add_node(node_sync);
726
727    double old_offset = 0.0;
728    m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset);
729    /* bail out in case of terminate */
730    if (m_state != TrexStatelessDpCore::STATE_TERMINATE) {
731        m_core->m_node_gen.close_file(m_core);
732        m_state = STATE_IDLE; /* we exit from all ports and we have nothing to do, we move to IDLE state */
733    }
734}
735
736
737void
738TrexStatelessDpCore::run_once(){
739
740    idle_state_loop();
741
742    if ( m_state == STATE_TERMINATE ){
743        return;
744    }
745
746    start_scheduler();
747}
748
749
750
751
752void
753TrexStatelessDpCore::start() {
754
755    while (true) {
756        run_once();
757
758        if ( m_core->is_terminated_by_master() ) {
759            break;
760        }
761    }
762}
763
764/* only if both port are idle we can exit */
765void
766TrexStatelessDpCore::schedule_exit(){
767
768    CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
769
770    node->m_type = CGenNode::COMMAND;
771
772    node->m_cmd = new TrexStatelessDpCanQuit();
773
774    /* make sure it will be scheduled after the current node */
775    node->m_time = m_core->m_cur_time_sec ;
776
777    m_core->m_node_gen.add_node((CGenNode *)node);
778}
779
780
781void
782TrexStatelessDpCore::add_global_duration(double duration){
783    if (duration > 0.0) {
784        CGenNode *node = m_core->create_node() ;
785
786        node->m_type = CGenNode::EXIT_SCHED;
787
788        /* make sure it will be scheduled after the current node */
789        node->m_time = m_core->m_cur_time_sec + duration ;
790
791        m_core->m_node_gen.add_node(node);
792    }
793}
794
795/* add per port exit */
796void
797TrexStatelessDpCore::add_port_duration(double duration,
798                                       uint8_t port_id,
799                                       int event_id){
800    if (duration > 0.0) {
801        CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
802
803        node->m_type = CGenNode::COMMAND;
804
805        /* make sure it will be scheduled after the current node */
806        node->m_time = m_core->m_cur_time_sec + duration ;
807
808        TrexStatelessDpStop * cmd=new TrexStatelessDpStop(port_id);
809
810
811        /* test this */
812        m_core->m_non_active_nodes++;
813        cmd->set_core_ptr(m_core);
814        cmd->set_event_id(event_id);
815        cmd->set_wait_for_event_id(true);
816
817        node->m_cmd = cmd;
818
819        m_core->m_node_gen.add_node((CGenNode *)node);
820    }
821}
822
823
824void TrexStatelessDpCore::update_mac_addr(TrexStream * stream,
825                                          CGenNodeStateless *node,
826                                          pkt_dir_t dir,
827                                          char *raw_pkt){
828    bool              ov_src = stream->get_override_src_mac_by_pkt_data();
829    TrexStream::stream_dst_mac_t  ov_dst = stream->get_override_dst_mac_mode();
830
831
832    if ( (ov_src == true) && (ov_dst == TrexStream::stPKT) ) {
833        /* nothing to do, take from the packet both */
834        return;
835    }
836
837        /* take from cfg_file */
838    if ( (ov_src == false) &&
839         (ov_dst == TrexStream::stCFG_FILE) ){
840
841          m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*)raw_pkt);
842          return;
843    }
844
845    /* save the pkt*/
846    char tmp_pkt[12];
847    memcpy(tmp_pkt,raw_pkt,12);
848
849    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*)raw_pkt);
850
851    if ((ov_src == true) && (ov_dst == TrexStream::stCFG_FILE)) {
852        memcpy(raw_pkt+6,tmp_pkt+6,6);
853    }
854
855    if ((ov_src == false) && (ov_dst == TrexStream::stPKT)) {
856        memcpy(raw_pkt,tmp_pkt,6);
857    }
858}
859
860
861void TrexStatelessDpCore::replay_vm_into_cache(TrexStream * stream,
862                                               CGenNodeStateless *node){
863
864    uint16_t      cache_size = stream->m_cache_size;
865    assert(cache_size>0);
866    rte_mbuf_t * m=0;
867
868    uint32_t buf_size = CGenNodeCacheMbuf::get_object_size(cache_size);
869    CGenNodeCacheMbuf * p = (CGenNodeCacheMbuf *)malloc(buf_size);
870    assert(p);
871    memset(p,0,buf_size);
872
873    int i;
874    for (i=0; i<cache_size; i++) {
875        p->m_array[i] =  node->alloc_node_with_vm();
876    }
877    /* save const */
878    m=node->get_const_mbuf();
879    if (m) {
880        p->m_mbuf_const=m;
881        rte_pktmbuf_refcnt_update(m,1);
882    }
883
884    /* free all VM and const mbuf */
885    node->free_stl_vm_buf();
886
887    /* copy to local node meory */
888    node->cache_mbuf_array_copy(p,cache_size);
889
890    /* free the memory */
891    free(p);
892}
893
894
895void
896TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port,
897                                TrexStream * stream,
898                                TrexStreamsCompiledObj *comp) {
899
900    CGenNodeStateless *node = m_core->create_node_sl();
901
902    node->m_thread_id = m_thread_id;
903    node->cache_mbuf_array_init();
904    node->m_batch_size=0;
905
906    /* add periodic */
907    node->m_cache_mbuf=0;
908    node->m_type = CGenNode::STATELESS_PKT;
909
910    node->m_action_counter = stream->m_action_count;
911
912    /* clone the stream from control plane memory to DP memory */
913    node->m_ref_stream_info = stream->clone();
914    /* no need for this memory anymore on the control plane memory */
915    stream->release_dp_object();
916
917    node->m_next_stream=0; /* will be fixed later */
918
919    if ( stream->m_self_start ){
920        /* if self start it is in active mode */
921        node->m_state =CGenNodeStateless::ss_ACTIVE;
922        lp_port->m_active_streams++;
923    }else{
924        node->m_state =CGenNodeStateless::ss_INACTIVE;
925    }
926
927    node->m_time = m_core->m_cur_time_sec + stream->get_start_delay_sec();
928
929    pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(stream->m_port_id);
930    node->m_flags = 0;
931    node->m_src_port =0;
932    node->m_original_packet_data_prefix = 0;
933
934    if (stream->m_rx_check.m_enabled) {
935        node->set_stat_needed();
936        uint8_t hw_id = stream->m_rx_check.m_hw_id;
937        assert (hw_id < MAX_FLOW_STATS + MAX_FLOW_STATS_PAYLOAD);
938        node->set_stat_hw_id(hw_id);
939        // no support for cache with flow stat payload rules
940        if ((TrexPlatformApi::driver_stat_cap_e)stream->m_rx_check.m_rule_type == TrexPlatformApi::IF_STAT_PAYLOAD) {
941            stream->m_cache_size = 0;
942        }
943    }
944
945    /* set socket id */
946    node->set_socket_id(m_core->m_node_gen.m_socket_id);
947
948    /* build a mbuf from a packet */
949
950    uint16_t pkt_size = stream->m_pkt.len;
951    const uint8_t *stream_pkt = stream->m_pkt.binary;
952
953    node->m_pause =0;
954    node->m_stream_type = stream->m_type;
955    node->m_next_time_offset = 1.0 / stream->get_pps();
956    node->m_null_stream = (stream->m_null_stream ? 1 : 0);
957
958    /* stateless specific fields */
959    switch ( stream->m_type ) {
960
961    case TrexStream::stCONTINUOUS :
962        node->m_single_burst=0;
963        node->m_single_burst_refill=0;
964        node->m_multi_bursts=0;
965        break;
966
967    case TrexStream::stSINGLE_BURST :
968        node->m_stream_type             = TrexStream::stMULTI_BURST;
969        node->m_single_burst            = stream->m_burst_total_pkts;
970        node->m_single_burst_refill     = stream->m_burst_total_pkts;
971        node->m_multi_bursts            = 1;  /* single burst in multi burst of 1 */
972        break;
973
974    case TrexStream::stMULTI_BURST :
975        node->m_single_burst        = stream->m_burst_total_pkts;
976        node->m_single_burst_refill = stream->m_burst_total_pkts;
977        node->m_multi_bursts        = stream->m_num_bursts;
978        break;
979    default:
980
981        assert(0);
982    };
983
984    node->m_port_id = stream->m_port_id;
985
986    /* set dir 0 or 1 client or server */
987    node->set_mbuf_cache_dir(dir);
988
989
990    if (node->m_ref_stream_info->getDpVm() == NULL) {
991        /* no VM */
992
993        node->m_vm_flow_var =  NULL;
994        node->m_vm_program  =  NULL;
995        node->m_vm_program_size =0;
996
997                /* allocate const mbuf */
998        rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size);
999        assert(m);
1000
1001        char *p = rte_pktmbuf_append(m, pkt_size);
1002        assert(p);
1003        /* copy the packet */
1004        memcpy(p,stream_pkt,pkt_size);
1005
1006        update_mac_addr(stream,node,dir,p);
1007
1008        /* set the packet as a readonly */
1009        node->set_cache_mbuf(m);
1010
1011        node->m_original_packet_data_prefix =0;
1012    }else{
1013
1014        /* set the program */
1015        TrexStream * local_mem_stream = node->m_ref_stream_info;
1016
1017        StreamVmDp  * lpDpVm = local_mem_stream->getDpVm();
1018
1019        node->m_vm_flow_var      = lpDpVm->clone_bss(); /* clone the flow var */
1020        node->m_vm_program       = lpDpVm->get_program(); /* same ref to the program */
1021        node->m_vm_program_size  = lpDpVm->get_program_size();
1022
1023        /* generate random seed if needed*/
1024        if (lpDpVm->is_random_seed()) {
1025            node->generate_random_seed();
1026        }
1027
1028        /* we need to copy the object */
1029        if ( pkt_size > lpDpVm->get_prefix_size() ) {
1030            /* we need const packet */
1031            uint16_t const_pkt_size  = pkt_size - lpDpVm->get_prefix_size() ;
1032            rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), const_pkt_size );
1033            assert(m);
1034
1035            char *p = rte_pktmbuf_append(m, const_pkt_size);
1036            assert(p);
1037
1038            /* copy packet data */
1039            memcpy(p,(stream_pkt + lpDpVm->get_prefix_size()),const_pkt_size);
1040
1041            node->set_const_mbuf(m);
1042        }
1043
1044
1045        if ( lpDpVm->is_pkt_size_var() ) {
1046            // mark the node as varible size
1047            node->set_var_pkt_size();
1048        }
1049
1050
1051        if (lpDpVm->get_prefix_size() > pkt_size ) {
1052            lpDpVm->set_prefix_size(pkt_size);
1053        }
1054
1055        /* copy the headr */
1056        uint16_t header_size = lpDpVm->get_prefix_size();
1057        assert(header_size);
1058        node->alloc_prefix_header(header_size);
1059        uint8_t *p=node->m_original_packet_data_prefix;
1060        assert(p);
1061
1062        memcpy(p,stream_pkt , header_size);
1063
1064        update_mac_addr(stream,node,dir,(char *)p);
1065
1066        if (stream->m_cache_size > 0 ) {
1067            /* we need to create cache of objects */
1068            replay_vm_into_cache(stream, node);
1069        }
1070    }
1071
1072
1073    CDpOneStream one_stream;
1074
1075    one_stream.m_dp_stream = node->m_ref_stream_info;
1076    one_stream.m_node =node;
1077
1078    lp_port->m_active_nodes.push_back(one_stream);
1079
1080    /* schedule only if active */
1081    if (node->m_state == CGenNodeStateless::ss_ACTIVE) {
1082        m_core->m_node_gen.add_node((CGenNode *)node);
1083    }
1084}
1085
1086void
1087TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj,
1088                                   double duration,
1089                                   int event_id) {
1090
1091
1092    TrexStatelessDpPerPort * lp_port=get_port_db(obj->get_port_id());
1093    lp_port->m_active_streams = 0;
1094    lp_port->set_event_id(event_id);
1095
1096    /* update cur time */
1097    if ( CGlobalInfo::is_realtime()  ){
1098        m_core->m_cur_time_sec = now_sec() + SCHD_OFFSET_DTIME ;
1099    }
1100
1101    /* no nodes in the list */
1102    assert(lp_port->m_active_nodes.size()==0);
1103
1104    for (auto single_stream : obj->get_objects()) {
1105        /* all commands should be for the same port */
1106        assert(obj->get_port_id() == single_stream.m_stream->m_port_id);
1107        add_stream(lp_port,single_stream.m_stream,obj);
1108    }
1109
1110    uint32_t nodes = lp_port->m_active_nodes.size();
1111    /* find next stream */
1112    assert(nodes == obj->get_objects().size());
1113
1114    int cnt=0;
1115
1116    /* set the next_stream pointer  */
1117    for (auto single_stream : obj->get_objects()) {
1118
1119        if (single_stream.m_stream->is_dp_next_stream() ) {
1120            int stream_id = single_stream.m_stream->m_next_stream_id;
1121            assert(stream_id<nodes);
1122            /* point to the next stream , stream_id is fixed */
1123            lp_port->m_active_nodes[cnt].m_node->m_next_stream = lp_port->m_active_nodes[stream_id].m_node ;
1124        }
1125        cnt++;
1126    }
1127
1128    lp_port->m_state =TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
1129    m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
1130
1131
1132    if ( duration > 0.0 ){
1133        add_port_duration( duration ,obj->get_port_id(),event_id );
1134    }
1135
1136}
1137
1138
1139bool TrexStatelessDpCore::are_all_ports_idle(){
1140
1141    bool res=true;
1142    int i;
1143    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
1144        if ( m_ports[i].m_state != TrexStatelessDpPerPort::ppSTATE_IDLE ){
1145            res=false;
1146        }
1147    }
1148    return (res);
1149}
1150
1151
1152void
1153TrexStatelessDpCore::resume_traffic(uint8_t port_id){
1154
1155    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1156
1157    lp_port->resume_traffic(port_id);
1158}
1159
1160
1161void
1162TrexStatelessDpCore::pause_traffic(uint8_t port_id){
1163
1164    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1165
1166    lp_port->pause_traffic(port_id);
1167}
1168
1169void
1170TrexStatelessDpCore::push_pcap(uint8_t port_id,
1171                               int event_id,
1172                               const std::string &pcap_filename,
1173                               double ipg_usec,
1174                               double m_min_ipg_sec,
1175                               double speedup,
1176                               uint32_t count,
1177                               double duration,
1178                               bool is_dual) {
1179
1180    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1181
1182    lp_port->set_event_id(event_id);
1183
1184    /* delegate the command to the port */
1185    bool rc = lp_port->push_pcap(port_id, pcap_filename, ipg_usec, m_min_ipg_sec, speedup, count, is_dual);
1186    if (!rc) {
1187        /* report back that we stopped */
1188        CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
1189        TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
1190                                                                       port_id,
1191                                                                       event_id,
1192                                                                       false);
1193        ring->Enqueue((CGenNode *)event_msg);
1194        return;
1195    }
1196
1197
1198    if (duration > 0.0) {
1199        add_port_duration(duration, port_id, event_id);
1200    }
1201
1202     m_state = TrexStatelessDpCore::STATE_PCAP_TX;
1203}
1204
1205void
1206TrexStatelessDpCore::update_traffic(uint8_t port_id, double factor) {
1207
1208    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1209
1210    lp_port->update_traffic(port_id, factor);
1211}
1212
1213
1214void
1215TrexStatelessDpCore::stop_traffic(uint8_t  port_id,
1216                                  bool     stop_on_id,
1217                                  int      event_id) {
1218    /* we cannot remove nodes not from the top of the queue so
1219       for every active node - make sure next time
1220       the scheduler invokes it, it will be free */
1221
1222    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1223    if ( lp_port->stop_traffic(port_id,stop_on_id,event_id) == false){
1224        return;
1225    }
1226
1227    /* flush the TX queue before sending done message to the CP */
1228    m_core->flush_tx_queue();
1229
1230    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
1231    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
1232                                                                   port_id,
1233                                                                   lp_port->get_event_id());
1234    ring->Enqueue((CGenNode *)event_msg);
1235
1236}
1237
1238/**
1239 * handle a message from CP to DP
1240 *
1241 */
1242void
1243TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) {
1244    msg->handle(this);
1245    delete msg;
1246}
1247
1248void
1249TrexStatelessDpCore::barrier(uint8_t port_id, int event_id) {
1250
1251    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
1252    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
1253                                                                   port_id,
1254                                                                   event_id);
1255    ring->Enqueue((CGenNode *)event_msg);
1256}
1257
1258
1259/**
1260 * PCAP node
1261 */
1262bool CGenNodePCAP::create(uint8_t port_id,
1263                          pkt_dir_t dir,
1264                          socket_id_t socket_id,
1265                          const uint8_t *mac_addr,
1266                          const uint8_t *slave_mac_addr,
1267                          const std::string &pcap_filename,
1268                          double ipg_usec,
1269                          double min_ipg_sec,
1270                          double speedup,
1271                          uint32_t count,
1272                          bool is_dual) {
1273    std::stringstream ss;
1274
1275    m_type       = CGenNode::PCAP_PKT;
1276    m_flags      = 0;
1277    m_src_port   = 0;
1278    m_port_id    = port_id;
1279    m_count      = count;
1280    m_is_dual    = is_dual;
1281    m_dir        = dir;
1282    m_min_ipg_sec    = min_ipg_sec;
1283
1284    /* mark this node as slow path */
1285    set_slow_path(true);
1286
1287    if (ipg_usec != -1) {
1288        /* fixed IPG */
1289        m_ipg_sec = std::max(min_ipg_sec, usec_to_sec(ipg_usec / speedup));
1290        m_speedup = 0;
1291    } else {
1292        /* packet IPG */
1293        m_ipg_sec = -1;
1294        m_speedup  = speedup;
1295    }
1296
1297    /* copy MAC addr info */
1298    memcpy(m_mac_addr, mac_addr, 12);
1299    memcpy(m_slave_mac_addr, slave_mac_addr, 12);
1300
1301
1302    set_socket_id(socket_id);
1303
1304    /* create the PCAP reader */
1305    m_reader = CCapReaderFactory::CreateReader((char *)pcap_filename.c_str(), 0, ss);
1306    if (!m_reader) {
1307        return false;
1308    }
1309
1310    m_raw_packet = new CCapPktRaw();
1311    if ( m_reader->ReadPacket(m_raw_packet) == false ){
1312        return false;
1313    }
1314
1315    /* set the dir */
1316    set_mbuf_dir(dir);
1317
1318    /* update the direction (for dual mode) */
1319    update_pkt_dir();
1320
1321    /* this is the reference time */
1322    m_last_pkt_time = m_raw_packet->get_time();
1323
1324    /* ready */
1325    m_state = PCAP_ACTIVE;
1326
1327    return true;
1328}
1329
1330/**
1331 * cleanup for PCAP node
1332 *
1333 * @author imarom (08-May-16)
1334 */
1335void CGenNodePCAP::destroy() {
1336
1337    if (m_raw_packet) {
1338        delete m_raw_packet;
1339        m_raw_packet = NULL;
1340    }
1341
1342    if (m_reader) {
1343        delete m_reader;
1344        m_reader = NULL;
1345    }
1346
1347    m_state = PCAP_INVALID;
1348}
1349
1350