trex_stateless_dp_core.cpp revision 4f91be3f
1/*
2 Itay Marom
3 Hanoch Haim
4 Cisco Systems, Inc.
5*/
6
7/*
8Copyright (c) 2015-2016 Cisco Systems, Inc.
9
10Licensed under the Apache License, Version 2.0 (the "License");
11you may not use this file except in compliance with the License.
12You may obtain a copy of the License at
13
14    http://www.apache.org/licenses/LICENSE-2.0
15
16Unless required by applicable law or agreed to in writing, software
17distributed under the License is distributed on an "AS IS" BASIS,
18WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19See the License for the specific language governing permissions and
20limitations under the License.
21*/
22#include "bp_sim.h"
23#include "trex_stateless_dp_core.h"
24#include "trex_stateless_messaging.h"
25#include "trex_stream.h"
26#include "trex_stream_node.h"
27#include "trex_streams_compiler.h"
28#include "mbuf.h"
29
30
31
32
33void CGenNodeStateless::cache_mbuf_array_init(){
34    m_cache_size=0;
35    m_cache_array_cnt=0;
36}
37
38
39
40void CGenNodeStateless::cache_mbuf_array_copy(CGenNodeCacheMbuf *obj,
41                                              uint16_t size){
42
43    int i;
44    cache_mbuf_array_alloc(size);
45    for (i=0; i<size; i++) {
46        cache_mbuf_array_set(i,obj->m_array[i]);
47    }
48    cache_mbuf_array_set_const_mbuf(obj->m_mbuf_const);
49}
50
51
52rte_mbuf_t ** CGenNodeStateless::cache_mbuf_array_alloc(uint16_t size){
53
54    uint32_t buf_size = CGenNodeCacheMbuf::get_object_size(size);
55    /* TBD  replace with align, zero API */
56    m_cache_mbuf = (void *)malloc(buf_size);
57    assert(m_cache_mbuf);
58    memset(m_cache_mbuf,0,buf_size);
59
60    m_flags |= SL_NODE_CONST_MBUF_CACHE_ARRAY;
61    m_cache_size=size;
62    m_cache_array_cnt=0;
63    return ((rte_mbuf_t **)m_cache_mbuf);
64}
65
66void CGenNodeStateless::cache_mbuf_array_free(){
67
68    assert(m_cache_mbuf);
69    int i;
70    for (i=0; i<(int)m_cache_size; i++) {
71        rte_mbuf_t * m=cache_mbuf_array_get((uint16_t)i);
72        assert(m);
73        rte_pktmbuf_free(m);
74    }
75
76    /* free the const */
77    rte_mbuf_t * m=cache_mbuf_array_get_const_mbuf() ;
78    if (m) {
79        rte_pktmbuf_free(m);
80    }
81
82    free(m_cache_mbuf);
83    m_cache_mbuf=0;
84}
85
86
87rte_mbuf_t * CGenNodeStateless::cache_mbuf_array_get(uint16_t index){
88
89    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
90    return (p->m_array[index]);
91}
92
93void CGenNodeStateless::cache_mbuf_array_set_const_mbuf(rte_mbuf_t * m){
94    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
95    p->m_mbuf_const=m;
96}
97
98rte_mbuf_t * CGenNodeStateless::cache_mbuf_array_get_const_mbuf(){
99    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
100    return (p->m_mbuf_const);
101}
102
103
104void CGenNodeStateless::cache_mbuf_array_set(uint16_t index,
105                                             rte_mbuf_t * m){
106    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
107    p->m_array[index]=m;
108}
109
110
111void CDpOneStream::Delete(CFlowGenListPerThread   * core){
112    assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE);
113    core->free_node((CGenNode *)m_node);
114    delete m_dp_stream;
115    m_node=0;
116    m_dp_stream=0;
117}
118
119void CDpOneStream::DeleteOnlyStream(){
120    assert(m_dp_stream);
121    delete m_dp_stream;
122    m_dp_stream=0;
123}
124
125int CGenNodeStateless::get_stream_id(){
126    if (m_state ==CGenNodeStateless::ss_FREE_RESUSE) {
127        return (-1); // not valid
128    }
129    assert(m_ref_stream_info);
130    return ((int)m_ref_stream_info->m_stream_id);
131}
132
133
134void CGenNodeStateless::DumpHeader(FILE *fd){
135    fprintf(fd," pkt_id, time, port , action , state, stream_id , stype , m-burst# , burst# \n");
136
137}
138void CGenNodeStateless::Dump(FILE *fd){
139    fprintf(fd," %2.4f, %3lu, %s,%s, %3d, %s, %3lu, %3lu  \n",
140            m_time,
141            (ulong)m_port_id,
142            "s-pkt", //action
143            get_stream_state_str(m_state ).c_str(),
144            get_stream_id(),   //stream_id
145            TrexStream::get_stream_type_str(m_stream_type).c_str(), //stype
146            (ulong)m_multi_bursts,
147            (ulong)m_single_burst
148            );
149}
150
151
152void CGenNodeStateless::generate_random_seed() {
153    /* seed can be provided by the user */
154    uint32_t unique_seed;
155    if (m_ref_stream_info->m_random_seed) {
156        unique_seed = m_ref_stream_info->m_random_seed;
157    } else {
158        unsigned int tmp = (unsigned int)time(NULL);
159        unique_seed = rand_r(&tmp);
160    }
161
162    /* per thread divergence */
163    unique_seed = (unique_seed * ( (m_thread_id + 1) * 514229 ) ) & 0xFFFFFFFF;
164
165    /* set random */
166    set_random_seed(unique_seed);
167}
168
169
170void CGenNodeStateless::refresh_vm_bss() {
171    if ( m_vm_flow_var ) {
172        StreamVmDp  * vm_s=m_ref_stream_info->m_vm_dp;
173        assert(vm_s);
174        memcpy(m_vm_flow_var,vm_s->get_bss(),vm_s->get_bss_size());
175
176        if ( vm_s->is_random_seed() ) {
177            generate_random_seed();
178        }
179
180    }
181}
182
183
184
185/**
186 * this function called when stream restart after it was inactive
187 */
188void CGenNodeStateless::refresh(){
189
190    /* refill the stream info */
191    m_single_burst    = m_single_burst_refill;
192    m_multi_bursts    = m_ref_stream_info->m_num_bursts;
193    m_state           = CGenNodeStateless::ss_ACTIVE;
194
195    /* refresh init value */
196#if 0
197    /* TBD should add a JSON varible for that */
198    refresh_vm_bss();
199#endif
200}
201
202
203void CGenNodeCommand::free_command(){
204
205    assert(m_cmd);
206    m_cmd->on_node_remove();
207    delete m_cmd;
208}
209
210
211std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state){
212    std::string res;
213
214    switch (stream_state) {
215    case CGenNodeStateless::ss_FREE_RESUSE :
216         res="FREE    ";
217        break;
218    case CGenNodeStateless::ss_INACTIVE :
219        res="INACTIVE ";
220        break;
221    case CGenNodeStateless::ss_ACTIVE :
222        res="ACTIVE   ";
223        break;
224    default:
225        res="Unknow   ";
226    };
227    return(res);
228}
229
230/*
231 * Allocate mbuf for flow stat (and latency) info sending
232 * m - Original mbuf (can be complicated mbuf data structure)
233 * fsp_head - return pointer in which the flow stat info should be filled
234 * is_const - is the given mbuf const
235 * return new mbuf structure in which the fsp_head can be written. If needed, orginal mbuf is freed.
236 */
237rte_mbuf_t * CGenNodeStateless::alloc_flow_stat_mbuf(rte_mbuf_t *m, struct flow_stat_payload_header *&fsp_head
238                                                     , bool is_const) {
239    rte_mbuf_t *m_ret = NULL, *m_lat = NULL;
240    uint16_t fsp_head_size = sizeof(struct flow_stat_payload_header);
241
242    if (is_const) {
243        // const mbuf case
244        if (rte_pktmbuf_data_len(m) > 128) {
245            m_ret = CGlobalInfo::pktmbuf_alloc_small(get_socket_id());
246            assert(m_ret);
247            // alloc mbuf just for the latency header
248            m_lat = CGlobalInfo::pktmbuf_alloc( get_socket_id(), fsp_head_size);
249            assert(m_lat);
250            fsp_head = (struct flow_stat_payload_header *)rte_pktmbuf_append(m_lat, fsp_head_size);
251            rte_pktmbuf_attach(m_ret, m);
252            rte_pktmbuf_trim(m_ret, sizeof(struct flow_stat_payload_header));
253            utl_rte_pktmbuf_add_after2(m_ret, m_lat);
254            // ref count was updated when we took the (const) mbuf, and again in rte_pktmbuf_attach
255            // so need do decrease now, to avoid leak.
256            rte_pktmbuf_refcnt_update(m, -1);
257            return m_ret;
258        } else {
259            // Short packet. Just copy all bytes.
260            m_ret = CGlobalInfo::pktmbuf_alloc( get_socket_id(), rte_pktmbuf_data_len(m) );
261            assert(m_ret);
262            char *p = rte_pktmbuf_mtod(m, char*);
263            char *p_new = rte_pktmbuf_append(m_ret, rte_pktmbuf_data_len(m));
264            memcpy(p_new , p, rte_pktmbuf_data_len(m));
265            fsp_head = (struct flow_stat_payload_header *)(p_new + rte_pktmbuf_data_len(m) - fsp_head_size);
266            rte_pktmbuf_free(m);
267            return m_ret;
268        }
269    } else {
270        // Field engine (vm)
271        if (rte_pktmbuf_is_contiguous(m)) {
272            // one, r/w mbuf
273            char *p = rte_pktmbuf_mtod(m, char*);
274            fsp_head = (struct flow_stat_payload_header *)(p + rte_pktmbuf_data_len(m) - fsp_head_size);
275            return m;
276        } else {
277            // We have: r/w --> read only.
278            // Changing to:
279            // (original) r/w -> (new) indirect (direct is original read_only, after trimming last bytes) -> (new) latency info
280            rte_mbuf_t *m_read_only = m->next, *m_indirect;
281
282            m_indirect = CGlobalInfo::pktmbuf_alloc_small(get_socket_id());
283            assert(m_indirect);
284            // alloc mbuf just for the latency header
285            m_lat = CGlobalInfo::pktmbuf_alloc( get_socket_id(), fsp_head_size);
286            assert(m_lat);
287            fsp_head = (struct flow_stat_payload_header *)rte_pktmbuf_append(m_lat, fsp_head_size);
288            utl_rte_pktmbuf_chain_with_indirect(m, m_indirect, m_read_only, m_lat);
289            m_indirect->data_len = (uint16_t)(m_indirect->data_len - fsp_head_size);
290            return m;
291        }
292    }
293}
294
295// test the const case of alloc_flow_stat_mbuf. The more complicated non const case is tested in the simulation.
296bool CGenNodeStateless::alloc_flow_stat_mbuf_test_const() {
297    rte_mbuf_t *m, *m_test;
298    uint16_t sizes[2] = {64, 500};
299    uint16_t size;
300    struct flow_stat_payload_header *fsp_head;
301    char *p;
302
303    set_socket_id(0);
304    for (int test_num = 0; test_num < sizeof(sizes)/sizeof(sizes[0]); test_num++) {
305        size = sizes[test_num];
306        m = CGlobalInfo::pktmbuf_alloc(get_socket_id(), size);
307        p = rte_pktmbuf_append(m, size);
308        for (int i = 0; i < size; i++) {
309            p[i] = (char)i;
310        }
311        m_test = alloc_flow_stat_mbuf(m, fsp_head, true);
312        p = rte_pktmbuf_mtod(m_test, char*);
313        assert(rte_pktmbuf_pkt_len(m_test) == size);
314        for (int i = 0; i < rte_pktmbuf_pkt_len(m_test) - sizeof(*fsp_head); i++) {
315            assert(p[i] == (char)i);
316        }
317        // verify fsp_head points correctly
318        if (size > 128) { // should match threshould in alloc_flow_stat_mbuf
319            assert(rte_pktmbuf_data_len(m_test) == size - sizeof(*fsp_head));
320            assert(rte_pktmbuf_data_len(m_test->next) == sizeof(*fsp_head));
321            assert((char *)fsp_head == rte_pktmbuf_mtod((m_test->next), char*));
322        } else {
323            assert(rte_pktmbuf_data_len(m_test) == size);
324            assert (((char *)fsp_head) + sizeof (*fsp_head) == p + rte_pktmbuf_data_len(m_test));
325        }
326        rte_pktmbuf_free(m_test);
327    }
328    return true;
329}
330
331rte_mbuf_t   * CGenNodeStateless::alloc_node_with_vm(){
332
333    rte_mbuf_t        * m;
334    /* alloc small packet buffer*/
335    uint16_t prefix_size = prefix_header_size();
336    m = CGlobalInfo::pktmbuf_alloc( get_socket_id(), prefix_size );
337    if (m==0) {
338        return (m);
339    }
340    /* TBD remove this, should handle cases of error */
341    assert(m);
342    char *p=rte_pktmbuf_append(m, prefix_size);
343    memcpy( p ,m_original_packet_data_prefix, prefix_size);
344
345
346    /* run the VM program */
347    StreamDPVmInstructionsRunner runner;
348    runner.set_mbuf(m);
349
350    runner.run( (uint32_t*)m_vm_flow_var,
351                m_vm_program_size,
352                m_vm_program,
353                m_vm_flow_var,
354                (uint8_t*)p);
355
356    uint16_t pkt_new_size=runner.get_new_pkt_size();
357    if ( likely( pkt_new_size == 0) ) {
358        /* no packet size change */
359        rte_mbuf_t * m_const = get_const_mbuf();
360        if (  m_const != NULL) {
361            utl_rte_pktmbuf_add_after(m,m_const);
362        }
363        return (m);
364    }
365
366    /* packet size change there are a few changes */
367    rte_mbuf_t * m_const = get_const_mbuf();
368    if ( (m_const == 0 ) || (pkt_new_size<=prefix_size) ) {
369        /* one mbuf , just trim it */
370        m->data_len = pkt_new_size;
371        m->pkt_len  = pkt_new_size;
372        return (m);
373    }
374
375    rte_mbuf_t * mi= CGlobalInfo::pktmbuf_alloc_small(get_socket_id());
376    assert(mi);
377    rte_pktmbuf_attach(mi,m_const);
378    utl_rte_pktmbuf_add_after2(m,mi);
379
380    if ( pkt_new_size < m->pkt_len) {
381        /* need to trim it */
382        mi->data_len = (pkt_new_size - prefix_size);
383        m->pkt_len   = pkt_new_size;
384    }
385    return (m);
386}
387
388void CGenNodeStateless::free_stl_vm_buf(){
389        rte_mbuf_t * m ;
390         m=get_const_mbuf();
391         if (m) {
392             rte_pktmbuf_free(m); /* reduce the ref counter */
393             /* clear the const marker */
394             clear_const_mbuf();
395         }
396
397         free_prefix_header();
398
399         if (m_vm_flow_var) {
400             /* free flow var */
401             free(m_vm_flow_var);
402             m_vm_flow_var=0;
403         }
404}
405
406
407
408void CGenNodeStateless::free_stl_node(){
409
410    if ( is_cache_mbuf_array() ){
411        /* do we have cache of mbuf pre allocated */
412        cache_mbuf_array_free();
413    }else{
414        /* if we have cache mbuf free it */
415        rte_mbuf_t * m=get_cache_mbuf();
416        if (m) {
417                rte_pktmbuf_free(m);
418                m_cache_mbuf=0;
419        }
420    }
421    free_stl_vm_buf();
422}
423
424
425bool TrexStatelessDpPerPort::update_number_of_active_streams(uint32_t d){
426    m_active_streams-=d; /* reduce the number of streams */
427    if (m_active_streams == 0) {
428        return (true);
429    }
430    return (false);
431}
432
433bool TrexStatelessDpPerPort::resume_traffic(uint8_t port_id){
434
435    /* we are working with continues streams so we must be in transmit mode */
436    assert(m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE);
437
438    for (auto dp_stream : m_active_nodes) {
439        CGenNodeStateless * node =dp_stream.m_node;
440        assert(node->get_port_id() == port_id);
441        assert(node->is_pause() == true);
442        node->set_pause(false);
443    }
444    m_state = TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
445    return (true);
446}
447
448bool TrexStatelessDpPerPort::update_traffic(uint8_t port_id, double factor) {
449
450    assert( (m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING ||
451            (m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE)) );
452
453    for (auto dp_stream : m_active_nodes) {
454        CGenNodeStateless * node = dp_stream.m_node;
455        assert(node->get_port_id() == port_id);
456
457        node->update_rate(factor);
458    }
459
460    return (true);
461}
462
463bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){
464
465    /* we are working with continues streams so we must be in transmit mode */
466    assert(m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING);
467
468    for (auto dp_stream : m_active_nodes) {
469        CGenNodeStateless * node =dp_stream.m_node;
470        assert(node->get_port_id() == port_id);
471        assert(node->is_pause() == false);
472        node->set_pause(true);
473    }
474    m_state = TrexStatelessDpPerPort::ppSTATE_PAUSE;
475    return (true);
476}
477
478bool TrexStatelessDpPerPort::push_pcap(uint8_t port_id,
479                                       const std::string &pcap_filename,
480                                       double ipg_usec,
481                                       double speedup,
482                                       uint32_t count,
483                                       bool is_dual) {
484
485    /* push pcap can only happen on an idle port from the core prespective */
486    assert(m_state == TrexStatelessDpPerPort::ppSTATE_IDLE);
487
488    CGenNodePCAP *pcap_node = m_core->allocate_pcap_node();
489    if (!pcap_node) {
490        return (false);
491    }
492
493    pkt_dir_t dir          = m_core->m_node_gen.m_v_if->port_id_to_dir(port_id);
494    socket_id_t socket_id  = m_core->m_node_gen.m_socket_id;
495
496    /* main port */
497    uint8_t mac_addr[12];
498    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, mac_addr);
499
500    /* for dual */
501    uint8_t slave_mac_addr[12];
502    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir ^ 0x1, slave_mac_addr);
503
504    bool rc = pcap_node->create(port_id,
505                                dir,
506                                socket_id,
507                                mac_addr,
508                                slave_mac_addr,
509                                pcap_filename,
510                                ipg_usec,
511                                speedup,
512                                count,
513                                is_dual);
514    if (!rc) {
515        m_core->free_node((CGenNode *)pcap_node);
516        return (false);
517    }
518
519    /* schedule the node for now */
520    pcap_node->m_time = m_core->m_cur_time_sec;
521    m_core->m_node_gen.add_node((CGenNode *)pcap_node);
522
523    /* hold a pointer to the node */
524    assert(m_active_pcap_node == NULL);
525    m_active_pcap_node = pcap_node;
526
527    m_state = TrexStatelessDpPerPort::ppSTATE_PCAP_TX;
528    return (true);
529}
530
531
532bool TrexStatelessDpPerPort::stop_traffic(uint8_t  port_id,
533                                          bool     stop_on_id,
534                                          int      event_id){
535
536
537    if (m_state == TrexStatelessDpPerPort::ppSTATE_IDLE) {
538        assert(m_active_streams==0);
539        return false;
540    }
541
542    /* there could be race of stop after stop */
543    if ( stop_on_id ) {
544        if (event_id != m_event_id){
545            /* we can't stop it is an old message */
546            return false;
547        }
548    }
549
550    for (auto dp_stream : m_active_nodes) {
551        CGenNodeStateless * node =dp_stream.m_node;
552        assert(node->get_port_id() == port_id);
553        if ( node->get_state() == CGenNodeStateless::ss_ACTIVE) {
554            node->mark_for_free();
555            m_active_streams--;
556            dp_stream.DeleteOnlyStream();
557
558        }else{
559            dp_stream.Delete(m_core);
560        }
561    }
562
563    /* check for active PCAP node */
564    if (m_active_pcap_node) {
565        /* when got async stop from outside or duration */
566        if (m_active_pcap_node->is_active()) {
567            m_active_pcap_node->mark_for_free();
568        } else {
569            /* graceful stop - node was put out by the scheduler */
570            m_core->free_node( (CGenNode *)m_active_pcap_node);
571        }
572
573        m_active_pcap_node = NULL;
574    }
575
576    /* active stream should be zero */
577    assert(m_active_streams==0);
578    m_active_nodes.clear();
579    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
580    return (true);
581}
582
583
584void TrexStatelessDpPerPort::create(CFlowGenListPerThread   *  core){
585    m_core=core;
586    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
587    m_active_streams=0;
588    m_active_nodes.clear();
589    m_active_pcap_node = NULL;
590}
591
592
593
594void
595TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) {
596    m_thread_id = thread_id;
597    m_core = core;
598    m_local_port_offset = 2*core->getDualPortId();
599
600    CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
601
602    m_ring_from_cp = cp_dp->getRingCpToDp(thread_id);
603    m_ring_to_cp   = cp_dp->getRingDpToCp(thread_id);
604
605    m_state = STATE_IDLE;
606
607    int i;
608    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
609        m_ports[i].create(core);
610    }
611}
612
613
614/* move to the next stream, old stream move to INACTIVE */
615bool TrexStatelessDpCore::set_stateless_next_node(CGenNodeStateless * cur_node,
616                                                  CGenNodeStateless * next_node){
617
618    assert(cur_node);
619    TrexStatelessDpPerPort * lp_port = get_port_db(cur_node->m_port_id);
620    bool schedule =false;
621
622    bool to_stop_port=false;
623
624    if (next_node == NULL) {
625        /* there is no next stream , reduce the number of active streams*/
626        to_stop_port = lp_port->update_number_of_active_streams(1);
627
628    }else{
629        uint8_t state=next_node->get_state();
630
631        /* can't be FREE_RESUSE */
632        assert(state != CGenNodeStateless::ss_FREE_RESUSE);
633        if (state == CGenNodeStateless::ss_INACTIVE ) {
634
635            if (cur_node->m_action_counter > 0) {
636                cur_node->m_action_counter--;
637                if (cur_node->m_action_counter==0) {
638                    to_stop_port = lp_port->update_number_of_active_streams(1);
639                }else{
640                    /* refill start info and scedule, no update in active streams  */
641                    next_node->refresh();
642                    schedule = true;
643                }
644            }else{
645                /* refill start info and scedule, no update in active streams  */
646                next_node->refresh();
647                schedule = true;
648            }
649
650        }else{
651            to_stop_port = lp_port->update_number_of_active_streams(1);
652        }
653    }
654
655    if ( to_stop_port ) {
656        /* call stop port explictly to move the state */
657        stop_traffic(cur_node->m_port_id,false,0);
658    }
659
660    return ( schedule );
661}
662
663
664
665/**
666 * in idle state loop, the processor most of the time sleeps
667 * and periodically checks for messages
668 *
669 * @author imarom (01-Nov-15)
670 */
671void
672TrexStatelessDpCore::idle_state_loop() {
673
674    const int SHORT_DELAY_MS    = 2;
675    const int LONG_DELAY_MS     = 50;
676    const int DEEP_SLEEP_LIMIT  = 2000;
677
678    int counter = 0;
679
680    while (m_state == STATE_IDLE) {
681        m_core->tickle();
682        m_core->m_node_gen.m_v_if->handle_rx_queue();
683        bool had_msg = periodic_check_for_cp_messages();
684        if (had_msg) {
685            counter = 0;
686            continue;
687        }
688
689        /* enter deep sleep only if enough time had passed */
690        if (counter < DEEP_SLEEP_LIMIT) {
691            delay(SHORT_DELAY_MS);
692            counter++;
693        } else {
694            delay(LONG_DELAY_MS);
695        }
696
697    }
698}
699
700
701
702void TrexStatelessDpCore::quit_main_loop(){
703    m_core->set_terminate_mode(true); /* mark it as terminated */
704    m_state = STATE_TERMINATE;
705    add_global_duration(0.0001);
706}
707
708
709/**
710 * scehduler runs when traffic exists
711 * it will return when no more transmitting is done on this
712 * core
713 *
714 * @author imarom (01-Nov-15)
715 */
716void
717TrexStatelessDpCore::start_scheduler() {
718    /* creates a maintenace job using the scheduler */
719    CGenNode * node_sync = m_core->create_node() ;
720    node_sync->m_type = CGenNode::FLOW_SYNC;
721    node_sync->m_time = m_core->m_cur_time_sec + SYNC_TIME_OUT;
722
723    m_core->m_node_gen.add_node(node_sync);
724
725    double old_offset = 0.0;
726    m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset);
727    /* bail out in case of terminate */
728    if (m_state != TrexStatelessDpCore::STATE_TERMINATE) {
729        m_core->m_node_gen.close_file(m_core);
730        m_state = STATE_IDLE; /* we exit from all ports and we have nothing to do, we move to IDLE state */
731    }
732}
733
734
735void
736TrexStatelessDpCore::run_once(){
737
738    idle_state_loop();
739
740    if ( m_state == STATE_TERMINATE ){
741        return;
742    }
743
744    start_scheduler();
745}
746
747
748
749
750void
751TrexStatelessDpCore::start() {
752
753    while (true) {
754        run_once();
755
756        if ( m_core->is_terminated_by_master() ) {
757            break;
758        }
759    }
760}
761
762/* only if both port are idle we can exit */
763void
764TrexStatelessDpCore::schedule_exit(){
765
766    CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
767
768    node->m_type = CGenNode::COMMAND;
769
770    node->m_cmd = new TrexStatelessDpCanQuit();
771
772    /* make sure it will be scheduled after the current node */
773    node->m_time = m_core->m_cur_time_sec ;
774
775    m_core->m_node_gen.add_node((CGenNode *)node);
776}
777
778
779void
780TrexStatelessDpCore::add_global_duration(double duration){
781    if (duration > 0.0) {
782        CGenNode *node = m_core->create_node() ;
783
784        node->m_type = CGenNode::EXIT_SCHED;
785
786        /* make sure it will be scheduled after the current node */
787        node->m_time = m_core->m_cur_time_sec + duration ;
788
789        m_core->m_node_gen.add_node(node);
790    }
791}
792
793/* add per port exit */
794void
795TrexStatelessDpCore::add_port_duration(double duration,
796                                       uint8_t port_id,
797                                       int event_id){
798    if (duration > 0.0) {
799        CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
800
801        node->m_type = CGenNode::COMMAND;
802
803        /* make sure it will be scheduled after the current node */
804        node->m_time = m_core->m_cur_time_sec + duration ;
805
806        TrexStatelessDpStop * cmd=new TrexStatelessDpStop(port_id);
807
808
809        /* test this */
810        m_core->m_non_active_nodes++;
811        cmd->set_core_ptr(m_core);
812        cmd->set_event_id(event_id);
813        cmd->set_wait_for_event_id(true);
814
815        node->m_cmd = cmd;
816
817        m_core->m_node_gen.add_node((CGenNode *)node);
818    }
819}
820
821
822void TrexStatelessDpCore::update_mac_addr(TrexStream * stream,
823                                          CGenNodeStateless *node,
824                                          pkt_dir_t dir,
825                                          char *raw_pkt){
826    bool              ov_src = stream->get_override_src_mac_by_pkt_data();
827    TrexStream::stream_dst_mac_t  ov_dst = stream->get_override_dst_mac_mode();
828
829
830    if ( (ov_src == true) && (ov_dst == TrexStream::stPKT) ) {
831        /* nothing to do, take from the packet both */
832        return;
833    }
834
835        /* take from cfg_file */
836    if ( (ov_src == false) &&
837         (ov_dst == TrexStream::stCFG_FILE) ){
838
839          m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*)raw_pkt);
840          return;
841    }
842
843    /* save the pkt*/
844    char tmp_pkt[12];
845    memcpy(tmp_pkt,raw_pkt,12);
846
847    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*)raw_pkt);
848
849    if ((ov_src == true) && (ov_dst == TrexStream::stCFG_FILE)) {
850        memcpy(raw_pkt+6,tmp_pkt+6,6);
851    }
852
853    if ((ov_src == false) && (ov_dst == TrexStream::stPKT)) {
854        memcpy(raw_pkt,tmp_pkt,6);
855    }
856}
857
858
859void TrexStatelessDpCore::replay_vm_into_cache(TrexStream * stream,
860                                               CGenNodeStateless *node){
861
862    uint16_t      cache_size = stream->m_cache_size;
863    assert(cache_size>0);
864    rte_mbuf_t * m=0;
865
866    uint32_t buf_size = CGenNodeCacheMbuf::get_object_size(cache_size);
867    CGenNodeCacheMbuf * p = (CGenNodeCacheMbuf *)malloc(buf_size);
868    assert(p);
869    memset(p,0,buf_size);
870
871    int i;
872    for (i=0; i<cache_size; i++) {
873        p->m_array[i] =  node->alloc_node_with_vm();
874    }
875    /* save const */
876    m=node->get_const_mbuf();
877    if (m) {
878        p->m_mbuf_const=m;
879        rte_pktmbuf_refcnt_update(m,1);
880    }
881
882    /* free all VM and const mbuf */
883    node->free_stl_vm_buf();
884
885    /* copy to local node meory */
886    node->cache_mbuf_array_copy(p,cache_size);
887
888    /* free the memory */
889    free(p);
890}
891
892
893void
894TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port,
895                                TrexStream * stream,
896                                TrexStreamsCompiledObj *comp) {
897
898    CGenNodeStateless *node = m_core->create_node_sl();
899
900    node->m_thread_id = m_thread_id;
901    node->cache_mbuf_array_init();
902    node->m_batch_size=0;
903
904    /* add periodic */
905    node->m_cache_mbuf=0;
906    node->m_type = CGenNode::STATELESS_PKT;
907
908    node->m_action_counter = stream->m_action_count;
909
910    /* clone the stream from control plane memory to DP memory */
911    node->m_ref_stream_info = stream->clone();
912    /* no need for this memory anymore on the control plane memory */
913    stream->release_dp_object();
914
915    node->m_next_stream=0; /* will be fixed later */
916
917    if ( stream->m_self_start ){
918        /* if self start it is in active mode */
919        node->m_state =CGenNodeStateless::ss_ACTIVE;
920        lp_port->m_active_streams++;
921    }else{
922        node->m_state =CGenNodeStateless::ss_INACTIVE;
923    }
924
925    node->m_time = m_core->m_cur_time_sec + stream->get_start_delay_sec();
926
927    pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(stream->m_port_id);
928    node->m_flags = 0;
929    node->m_src_port =0;
930    node->m_original_packet_data_prefix = 0;
931
932    if (stream->m_rx_check.m_enabled) {
933        node->set_stat_needed();
934        uint8_t hw_id = stream->m_rx_check.m_hw_id;
935        assert (hw_id < MAX_FLOW_STATS + MAX_FLOW_STATS_PAYLOAD);
936        node->set_stat_hw_id(hw_id);
937        // no support for cache with flow stat payload rules
938        if ((TrexPlatformApi::driver_stat_cap_e)stream->m_rx_check.m_rule_type == TrexPlatformApi::IF_STAT_PAYLOAD) {
939            stream->m_cache_size = 0;
940        }
941    }
942
943    /* set socket id */
944    node->set_socket_id(m_core->m_node_gen.m_socket_id);
945
946    /* build a mbuf from a packet */
947
948    uint16_t pkt_size = stream->m_pkt.len;
949    const uint8_t *stream_pkt = stream->m_pkt.binary;
950
951    node->m_pause =0;
952    node->m_stream_type = stream->m_type;
953    node->m_next_time_offset = 1.0 / stream->get_pps();
954    node->m_null_stream = (stream->m_null_stream ? 1 : 0);
955
956    /* stateless specific fields */
957    switch ( stream->m_type ) {
958
959    case TrexStream::stCONTINUOUS :
960        node->m_single_burst=0;
961        node->m_single_burst_refill=0;
962        node->m_multi_bursts=0;
963        break;
964
965    case TrexStream::stSINGLE_BURST :
966        node->m_stream_type             = TrexStream::stMULTI_BURST;
967        node->m_single_burst            = stream->m_burst_total_pkts;
968        node->m_single_burst_refill     = stream->m_burst_total_pkts;
969        node->m_multi_bursts            = 1;  /* single burst in multi burst of 1 */
970        break;
971
972    case TrexStream::stMULTI_BURST :
973        node->m_single_burst        = stream->m_burst_total_pkts;
974        node->m_single_burst_refill = stream->m_burst_total_pkts;
975        node->m_multi_bursts        = stream->m_num_bursts;
976        break;
977    default:
978
979        assert(0);
980    };
981
982    node->m_port_id = stream->m_port_id;
983
984    /* set dir 0 or 1 client or server */
985    node->set_mbuf_cache_dir(dir);
986
987
988    if (node->m_ref_stream_info->getDpVm() == NULL) {
989        /* no VM */
990
991        node->m_vm_flow_var =  NULL;
992        node->m_vm_program  =  NULL;
993        node->m_vm_program_size =0;
994
995                /* allocate const mbuf */
996        rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size);
997        assert(m);
998
999        char *p = rte_pktmbuf_append(m, pkt_size);
1000        assert(p);
1001        /* copy the packet */
1002        memcpy(p,stream_pkt,pkt_size);
1003
1004        update_mac_addr(stream,node,dir,p);
1005
1006        /* set the packet as a readonly */
1007        node->set_cache_mbuf(m);
1008
1009        node->m_original_packet_data_prefix =0;
1010    }else{
1011
1012        /* set the program */
1013        TrexStream * local_mem_stream = node->m_ref_stream_info;
1014
1015        StreamVmDp  * lpDpVm = local_mem_stream->getDpVm();
1016
1017        node->m_vm_flow_var      = lpDpVm->clone_bss(); /* clone the flow var */
1018        node->m_vm_program       = lpDpVm->get_program(); /* same ref to the program */
1019        node->m_vm_program_size  = lpDpVm->get_program_size();
1020
1021        /* generate random seed if needed*/
1022        if (lpDpVm->is_random_seed()) {
1023            node->generate_random_seed();
1024        }
1025
1026        /* we need to copy the object */
1027        if ( pkt_size > lpDpVm->get_prefix_size() ) {
1028            /* we need const packet */
1029            uint16_t const_pkt_size  = pkt_size - lpDpVm->get_prefix_size() ;
1030            rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), const_pkt_size );
1031            assert(m);
1032
1033            char *p = rte_pktmbuf_append(m, const_pkt_size);
1034            assert(p);
1035
1036            /* copy packet data */
1037            memcpy(p,(stream_pkt + lpDpVm->get_prefix_size()),const_pkt_size);
1038
1039            node->set_const_mbuf(m);
1040        }
1041
1042
1043        if ( lpDpVm->is_pkt_size_var() ) {
1044            // mark the node as varible size
1045            node->set_var_pkt_size();
1046        }
1047
1048
1049        if (lpDpVm->get_prefix_size() > pkt_size ) {
1050            lpDpVm->set_prefix_size(pkt_size);
1051        }
1052
1053        /* copy the headr */
1054        uint16_t header_size = lpDpVm->get_prefix_size();
1055        assert(header_size);
1056        node->alloc_prefix_header(header_size);
1057        uint8_t *p=node->m_original_packet_data_prefix;
1058        assert(p);
1059
1060        memcpy(p,stream_pkt , header_size);
1061
1062        update_mac_addr(stream,node,dir,(char *)p);
1063
1064        if (stream->m_cache_size > 0 ) {
1065            /* we need to create cache of objects */
1066            replay_vm_into_cache(stream, node);
1067        }
1068    }
1069
1070
1071    CDpOneStream one_stream;
1072
1073    one_stream.m_dp_stream = node->m_ref_stream_info;
1074    one_stream.m_node =node;
1075
1076    lp_port->m_active_nodes.push_back(one_stream);
1077
1078    /* schedule only if active */
1079    if (node->m_state == CGenNodeStateless::ss_ACTIVE) {
1080        m_core->m_node_gen.add_node((CGenNode *)node);
1081    }
1082}
1083
1084void
1085TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj,
1086                                   double duration,
1087                                   int event_id) {
1088
1089
1090    TrexStatelessDpPerPort * lp_port=get_port_db(obj->get_port_id());
1091    lp_port->m_active_streams = 0;
1092    lp_port->set_event_id(event_id);
1093
1094    /* update cur time */
1095    if ( CGlobalInfo::is_realtime()  ){
1096        m_core->m_cur_time_sec = now_sec() + SCHD_OFFSET_DTIME ;
1097    }
1098
1099    /* no nodes in the list */
1100    assert(lp_port->m_active_nodes.size()==0);
1101
1102    for (auto single_stream : obj->get_objects()) {
1103        /* all commands should be for the same port */
1104        assert(obj->get_port_id() == single_stream.m_stream->m_port_id);
1105        add_stream(lp_port,single_stream.m_stream,obj);
1106    }
1107
1108    uint32_t nodes = lp_port->m_active_nodes.size();
1109    /* find next stream */
1110    assert(nodes == obj->get_objects().size());
1111
1112    int cnt=0;
1113
1114    /* set the next_stream pointer  */
1115    for (auto single_stream : obj->get_objects()) {
1116
1117        if (single_stream.m_stream->is_dp_next_stream() ) {
1118            int stream_id = single_stream.m_stream->m_next_stream_id;
1119            assert(stream_id<nodes);
1120            /* point to the next stream , stream_id is fixed */
1121            lp_port->m_active_nodes[cnt].m_node->m_next_stream = lp_port->m_active_nodes[stream_id].m_node ;
1122        }
1123        cnt++;
1124    }
1125
1126    lp_port->m_state =TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
1127    m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
1128
1129
1130    if ( duration > 0.0 ){
1131        add_port_duration( duration ,obj->get_port_id(),event_id );
1132    }
1133
1134}
1135
1136
1137bool TrexStatelessDpCore::are_all_ports_idle(){
1138
1139    bool res=true;
1140    int i;
1141    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
1142        if ( m_ports[i].m_state != TrexStatelessDpPerPort::ppSTATE_IDLE ){
1143            res=false;
1144        }
1145    }
1146    return (res);
1147}
1148
1149
1150void
1151TrexStatelessDpCore::resume_traffic(uint8_t port_id){
1152
1153    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1154
1155    lp_port->resume_traffic(port_id);
1156}
1157
1158
1159void
1160TrexStatelessDpCore::pause_traffic(uint8_t port_id){
1161
1162    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1163
1164    lp_port->pause_traffic(port_id);
1165}
1166
1167void
1168TrexStatelessDpCore::push_pcap(uint8_t port_id,
1169                               int event_id,
1170                               const std::string &pcap_filename,
1171                               double ipg_usec,
1172                               double speedup,
1173                               uint32_t count,
1174                               double duration,
1175                               bool is_dual) {
1176
1177    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1178
1179    lp_port->set_event_id(event_id);
1180
1181    /* delegate the command to the port */
1182    bool rc = lp_port->push_pcap(port_id, pcap_filename, ipg_usec, speedup, count, is_dual);
1183    if (!rc) {
1184        /* report back that we stopped */
1185        CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
1186        TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
1187                                                                       port_id,
1188                                                                       event_id,
1189                                                                       false);
1190        ring->Enqueue((CGenNode *)event_msg);
1191        return;
1192    }
1193
1194
1195    if (duration > 0.0) {
1196        add_port_duration(duration, port_id, event_id);
1197    }
1198
1199     m_state = TrexStatelessDpCore::STATE_PCAP_TX;
1200}
1201
1202void
1203TrexStatelessDpCore::update_traffic(uint8_t port_id, double factor) {
1204
1205    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1206
1207    lp_port->update_traffic(port_id, factor);
1208}
1209
1210
1211void
1212TrexStatelessDpCore::stop_traffic(uint8_t  port_id,
1213                                  bool     stop_on_id,
1214                                  int      event_id) {
1215    /* we cannot remove nodes not from the top of the queue so
1216       for every active node - make sure next time
1217       the scheduler invokes it, it will be free */
1218
1219    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1220    if ( lp_port->stop_traffic(port_id,stop_on_id,event_id) == false){
1221        return;
1222    }
1223
1224    /* flush the TX queue before sending done message to the CP */
1225    m_core->flush_tx_queue();
1226
1227    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
1228    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
1229                                                                   port_id,
1230                                                                   lp_port->get_event_id());
1231    ring->Enqueue((CGenNode *)event_msg);
1232
1233}
1234
1235/**
1236 * handle a message from CP to DP
1237 *
1238 */
1239void
1240TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) {
1241    msg->handle(this);
1242    delete msg;
1243}
1244
1245void
1246TrexStatelessDpCore::barrier(uint8_t port_id, int event_id) {
1247
1248    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
1249    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
1250                                                                   port_id,
1251                                                                   event_id);
1252    ring->Enqueue((CGenNode *)event_msg);
1253}
1254
1255
1256/**
1257 * PCAP node
1258 */
1259bool CGenNodePCAP::create(uint8_t port_id,
1260                          pkt_dir_t dir,
1261                          socket_id_t socket_id,
1262                          const uint8_t *mac_addr,
1263                          const uint8_t *slave_mac_addr,
1264                          const std::string &pcap_filename,
1265                          double ipg_usec,
1266                          double speedup,
1267                          uint32_t count,
1268                          bool is_dual) {
1269    std::stringstream ss;
1270
1271    m_type       = CGenNode::PCAP_PKT;
1272    m_flags      = 0;
1273    m_src_port   = 0;
1274    m_port_id    = port_id;
1275    m_count      = count;
1276    m_is_dual    = is_dual;
1277    m_dir        = dir;
1278
1279    /* mark this node as slow path */
1280    set_slow_path(true);
1281
1282    if (ipg_usec != -1) {
1283        /* fixed IPG */
1284        m_ipg_sec = usec_to_sec(ipg_usec / speedup);
1285        m_speedup = 0;
1286    } else {
1287        /* packet IPG */
1288        m_ipg_sec = -1;
1289        m_speedup  = speedup;
1290    }
1291
1292    /* copy MAC addr info */
1293    memcpy(m_mac_addr, mac_addr, 12);
1294    memcpy(m_slave_mac_addr, slave_mac_addr, 12);
1295
1296
1297    set_socket_id(socket_id);
1298
1299    /* create the PCAP reader */
1300    m_reader = CCapReaderFactory::CreateReader((char *)pcap_filename.c_str(), 0, ss);
1301    if (!m_reader) {
1302        return false;
1303    }
1304
1305    m_raw_packet = new CCapPktRaw();
1306    if ( m_reader->ReadPacket(m_raw_packet) == false ){
1307        /* handle error */
1308        delete m_reader;
1309        return (false);
1310    }
1311
1312    /* set the dir */
1313    set_mbuf_dir(dir);
1314
1315    /* update the direction (for dual mode) */
1316    update_pkt_dir();
1317
1318    /* this is the reference time */
1319    m_last_pkt_time = m_raw_packet->get_time();
1320
1321    /* ready */
1322    m_state = PCAP_ACTIVE;
1323
1324    return true;
1325}
1326
1327/**
1328 * cleanup for PCAP node
1329 *
1330 * @author imarom (08-May-16)
1331 */
1332void CGenNodePCAP::destroy() {
1333
1334    if (m_raw_packet) {
1335        delete m_raw_packet;
1336        m_raw_packet = NULL;
1337    }
1338
1339    if (m_reader) {
1340        delete m_reader;
1341        m_reader = NULL;
1342    }
1343
1344    m_state = PCAP_INVALID;
1345}
1346
1347