trex_stateless_dp_core.cpp revision b87818b8
1/*
2 Itay Marom
3 Hanoch Haim
4 Cisco Systems, Inc.
5*/
6
7/*
8Copyright (c) 2015-2016 Cisco Systems, Inc.
9
10Licensed under the Apache License, Version 2.0 (the "License");
11you may not use this file except in compliance with the License.
12You may obtain a copy of the License at
13
14    http://www.apache.org/licenses/LICENSE-2.0
15
16Unless required by applicable law or agreed to in writing, software
17distributed under the License is distributed on an "AS IS" BASIS,
18WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19See the License for the specific language governing permissions and
20limitations under the License.
21*/
22#include "bp_sim.h"
23#include "trex_stateless_dp_core.h"
24#include "trex_stateless_messaging.h"
25#include "trex_stream.h"
26#include "trex_stream_node.h"
27#include "trex_streams_compiler.h"
28#include "mbuf.h"
29
30
31
32
33void CGenNodeStateless::cache_mbuf_array_init(){
34    m_cache_size=0;
35    m_cache_array_cnt=0;
36}
37
38
39
40void CGenNodeStateless::cache_mbuf_array_copy(CGenNodeCacheMbuf *obj,
41                                              uint16_t size){
42
43    int i;
44    cache_mbuf_array_alloc(size);
45    for (i=0; i<size; i++) {
46        cache_mbuf_array_set(i,obj->m_array[i]);
47    }
48    cache_mbuf_array_set_const_mbuf(obj->m_mbuf_const);
49}
50
51
52rte_mbuf_t ** CGenNodeStateless::cache_mbuf_array_alloc(uint16_t size){
53
54    uint32_t buf_size = CGenNodeCacheMbuf::get_object_size(size);
55    /* TBD  replace with align, zero API */
56    m_cache_mbuf = (void *)malloc(buf_size);
57    assert(m_cache_mbuf);
58    memset(m_cache_mbuf,0,buf_size);
59
60    m_flags |= SL_NODE_CONST_MBUF_CACHE_ARRAY;
61    m_cache_size=size;
62    m_cache_array_cnt=0;
63    return ((rte_mbuf_t **)m_cache_mbuf);
64}
65
66void CGenNodeStateless::cache_mbuf_array_free(){
67
68    assert(m_cache_mbuf);
69    int i;
70    for (i=0; i<(int)m_cache_size; i++) {
71        rte_mbuf_t * m=cache_mbuf_array_get((uint16_t)i);
72        assert(m);
73        rte_pktmbuf_free(m);
74    }
75
76    /* free the const */
77    rte_mbuf_t * m=cache_mbuf_array_get_const_mbuf() ;
78    if (m) {
79        rte_pktmbuf_free(m);
80    }
81
82    free(m_cache_mbuf);
83    m_cache_mbuf=0;
84}
85
86
87rte_mbuf_t * CGenNodeStateless::cache_mbuf_array_get(uint16_t index){
88
89    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
90    return (p->m_array[index]);
91}
92
93void CGenNodeStateless::cache_mbuf_array_set_const_mbuf(rte_mbuf_t * m){
94    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
95    p->m_mbuf_const=m;
96}
97
98rte_mbuf_t * CGenNodeStateless::cache_mbuf_array_get_const_mbuf(){
99    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
100    return (p->m_mbuf_const);
101}
102
103
104void CGenNodeStateless::cache_mbuf_array_set(uint16_t index,
105                                             rte_mbuf_t * m){
106    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
107    p->m_array[index]=m;
108}
109
110
111void CDpOneStream::Delete(CFlowGenListPerThread   * core){
112    assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE);
113    core->free_node((CGenNode *)m_node);
114    delete m_dp_stream;
115    m_node=0;
116    m_dp_stream=0;
117}
118
119void CDpOneStream::DeleteOnlyStream(){
120    assert(m_dp_stream);
121    delete m_dp_stream;
122    m_dp_stream=0;
123}
124
125int CGenNodeStateless::get_stream_id(){
126    if (m_state ==CGenNodeStateless::ss_FREE_RESUSE) {
127        return (-1); // not valid
128    }
129    assert(m_ref_stream_info);
130    return ((int)m_ref_stream_info->m_stream_id);
131}
132
133
134void CGenNodeStateless::DumpHeader(FILE *fd){
135    fprintf(fd," pkt_id, time, port , action , state, stream_id , stype , m-burst# , burst# \n");
136
137}
138void CGenNodeStateless::Dump(FILE *fd){
139    fprintf(fd," %2.4f, %3lu, %s,%s, %3d, %s, %3lu, %3lu  \n",
140            m_time,
141            (ulong)m_port_id,
142            "s-pkt", //action
143            get_stream_state_str(m_state ).c_str(),
144            get_stream_id(),   //stream_id
145            TrexStream::get_stream_type_str(m_stream_type).c_str(), //stype
146            (ulong)m_multi_bursts,
147            (ulong)m_single_burst
148            );
149}
150
151
152void CGenNodeStateless::generate_random_seed() {
153    /* seed can be provided by the user */
154    uint32_t unique_seed;
155    if (m_ref_stream_info->m_random_seed) {
156        unique_seed = m_ref_stream_info->m_random_seed;
157    } else {
158        unsigned int tmp = (unsigned int)time(NULL);
159        unique_seed = rand_r(&tmp);
160    }
161
162    /* per thread divergence */
163    unique_seed = (unique_seed * ( (m_thread_id + 1) * 514229 ) ) & 0xFFFFFFFF;
164
165    /* set random */
166    set_random_seed(unique_seed);
167}
168
169
170void CGenNodeStateless::refresh_vm_bss() {
171    if ( m_vm_flow_var ) {
172        StreamVmDp  * vm_s=m_ref_stream_info->m_vm_dp;
173        assert(vm_s);
174        memcpy(m_vm_flow_var,vm_s->get_bss(),vm_s->get_bss_size());
175
176        if ( vm_s->is_random_seed() ) {
177            generate_random_seed();
178        }
179
180    }
181}
182
183
184
185/**
186 * this function called when stream restart after it was inactive
187 */
188void CGenNodeStateless::refresh(){
189
190    /* refill the stream info */
191    m_single_burst    = m_single_burst_refill;
192    m_multi_bursts    = m_ref_stream_info->m_num_bursts;
193    m_state           = CGenNodeStateless::ss_ACTIVE;
194
195    /* refresh init value */
196#if 0
197    /* TBD should add a JSON varible for that */
198    refresh_vm_bss();
199#endif
200}
201
202
203void CGenNodeCommand::free_command(){
204
205    assert(m_cmd);
206    m_cmd->on_node_remove();
207    delete m_cmd;
208}
209
210
211std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state){
212    std::string res;
213
214    switch (stream_state) {
215    case CGenNodeStateless::ss_FREE_RESUSE :
216         res="FREE    ";
217        break;
218    case CGenNodeStateless::ss_INACTIVE :
219        res="INACTIVE ";
220        break;
221    case CGenNodeStateless::ss_ACTIVE :
222        res="ACTIVE   ";
223        break;
224    default:
225        res="Unknow   ";
226    };
227    return(res);
228}
229
230/*
231 * Allocate mbuf for flow stat (and latency) info sending
232 * m - Original mbuf (can be complicated mbuf data structure)
233 * fsp_head - return pointer in which the flow stat info should be filled
234 * is_const - is the given mbuf const
235 * return new mbuf structure in which the fsp_head can be written. If needed, orginal mbuf is freed.
236 */
237rte_mbuf_t * CGenNodeStateless::alloc_flow_stat_mbuf(rte_mbuf_t *m, struct flow_stat_payload_header *&fsp_head
238                                                     , bool is_const) {
239    rte_mbuf_t *m_ret = NULL, *m_lat = NULL;
240    uint16_t fsp_head_size = sizeof(struct flow_stat_payload_header);
241
242    if (is_const) {
243        // const mbuf case
244        if (rte_pktmbuf_data_len(m) > 128) {
245            m_ret = CGlobalInfo::pktmbuf_alloc_small(get_socket_id());
246            assert(m_ret);
247            // alloc mbuf just for the latency header
248            m_lat = CGlobalInfo::pktmbuf_alloc( get_socket_id(), fsp_head_size);
249            assert(m_lat);
250            fsp_head = (struct flow_stat_payload_header *)rte_pktmbuf_append(m_lat, fsp_head_size);
251            rte_pktmbuf_attach(m_ret, m);
252            rte_pktmbuf_trim(m_ret, sizeof(struct flow_stat_payload_header));
253            utl_rte_pktmbuf_add_after2(m_ret, m_lat);
254            // ref count was updated when we took the (const) mbuf, and again in rte_pktmbuf_attach
255            // so need do decrease now, to avoid leak.
256            rte_pktmbuf_refcnt_update(m, -1);
257            return m_ret;
258        } else {
259            // Short packet. Just copy all bytes.
260            m_ret = CGlobalInfo::pktmbuf_alloc( get_socket_id(), rte_pktmbuf_data_len(m) );
261            assert(m_ret);
262            char *p = rte_pktmbuf_mtod(m, char*);
263            char *p_new = rte_pktmbuf_append(m_ret, rte_pktmbuf_data_len(m));
264            memcpy(p_new , p, rte_pktmbuf_data_len(m));
265            fsp_head = (struct flow_stat_payload_header *)(p_new + rte_pktmbuf_data_len(m) - fsp_head_size);
266            rte_pktmbuf_free(m);
267            return m_ret;
268        }
269    } else {
270        // Field engine (vm)
271        if (rte_pktmbuf_is_contiguous(m)) {
272            // one, r/w mbuf
273            char *p = rte_pktmbuf_mtod(m, char*);
274            fsp_head = (struct flow_stat_payload_header *)(p + rte_pktmbuf_data_len(m) - fsp_head_size);
275            return m;
276        } else {
277            // We have: r/w --> read only.
278            // Changing to:
279            // (original) r/w -> (new) indirect (direct is original read_only, after trimming last bytes) -> (new) latency info
280            rte_mbuf_t *m_read_only = m->next, *m_indirect;
281
282            m_indirect = CGlobalInfo::pktmbuf_alloc_small(get_socket_id());
283            assert(m_indirect);
284            // alloc mbuf just for the latency header
285            m_lat = CGlobalInfo::pktmbuf_alloc( get_socket_id(), fsp_head_size);
286            assert(m_lat);
287            fsp_head = (struct flow_stat_payload_header *)rte_pktmbuf_append(m_lat, fsp_head_size);
288            utl_rte_pktmbuf_chain_with_indirect(m, m_indirect, m_read_only, m_lat);
289            m_indirect->data_len = (uint16_t)(m_indirect->data_len - fsp_head_size);
290            return m;
291        }
292    }
293}
294
295// test the const case of alloc_flow_stat_mbuf. The more complicated non const case is tested in the simulation.
296bool CGenNodeStateless::alloc_flow_stat_mbuf_test_const() {
297    rte_mbuf_t *m, *m_test;
298    uint16_t sizes[2] = {64, 500};
299    uint16_t size;
300    struct flow_stat_payload_header *fsp_head;
301    char *p;
302
303    set_socket_id(0);
304    for (int test_num = 0; test_num < sizeof(sizes)/sizeof(sizes[0]); test_num++) {
305        size = sizes[test_num];
306        m = CGlobalInfo::pktmbuf_alloc(get_socket_id(), size);
307        p = rte_pktmbuf_append(m, size);
308        for (int i = 0; i < size; i++) {
309            p[i] = (char)i;
310        }
311        m_test = alloc_flow_stat_mbuf(m, fsp_head, true);
312        p = rte_pktmbuf_mtod(m_test, char*);
313        assert(rte_pktmbuf_pkt_len(m_test) == size);
314        for (int i = 0; i < rte_pktmbuf_pkt_len(m_test) - sizeof(*fsp_head); i++) {
315            assert(p[i] == (char)i);
316        }
317        // verify fsp_head points correctly
318        if (size > 128) { // should match threshould in alloc_flow_stat_mbuf
319            assert(rte_pktmbuf_data_len(m_test) == size - sizeof(*fsp_head));
320            assert(rte_pktmbuf_data_len(m_test->next) == sizeof(*fsp_head));
321            assert((char *)fsp_head == rte_pktmbuf_mtod((m_test->next), char*));
322        } else {
323            assert(rte_pktmbuf_data_len(m_test) == size);
324            assert (((char *)fsp_head) + sizeof (*fsp_head) == p + rte_pktmbuf_data_len(m_test));
325        }
326        rte_pktmbuf_free(m_test);
327    }
328    return true;
329}
330
331rte_mbuf_t   * CGenNodeStateless::alloc_node_with_vm(){
332
333    rte_mbuf_t        * m;
334    /* alloc small packet buffer*/
335    uint16_t prefix_size = prefix_header_size();
336    m = CGlobalInfo::pktmbuf_alloc( get_socket_id(), prefix_size );
337    if (m==0) {
338        return (m);
339    }
340    /* TBD remove this, should handle cases of error */
341    assert(m);
342    char *p=rte_pktmbuf_append(m, prefix_size);
343    memcpy( p ,m_original_packet_data_prefix, prefix_size);
344
345
346    /* run the VM program */
347    StreamDPVmInstructionsRunner runner;
348
349    runner.run( (uint32_t*)m_vm_flow_var,
350                m_vm_program_size,
351                m_vm_program,
352                m_vm_flow_var,
353                (uint8_t*)p);
354
355    uint16_t pkt_new_size=runner.get_new_pkt_size();
356    if ( likely( pkt_new_size == 0) ) {
357        /* no packet size change */
358        rte_mbuf_t * m_const = get_const_mbuf();
359        if (  m_const != NULL) {
360            utl_rte_pktmbuf_add_after(m,m_const);
361        }
362        return (m);
363    }
364
365    /* packet size change there are a few changes */
366    rte_mbuf_t * m_const = get_const_mbuf();
367    if ( (m_const == 0 ) || (pkt_new_size<=prefix_size) ) {
368        /* one mbuf , just trim it */
369        m->data_len = pkt_new_size;
370        m->pkt_len  = pkt_new_size;
371        return (m);
372    }
373
374    rte_mbuf_t * mi= CGlobalInfo::pktmbuf_alloc_small(get_socket_id());
375    assert(mi);
376    rte_pktmbuf_attach(mi,m_const);
377    utl_rte_pktmbuf_add_after2(m,mi);
378
379    if ( pkt_new_size < m->pkt_len) {
380        /* need to trim it */
381        mi->data_len = (pkt_new_size - prefix_size);
382        m->pkt_len   = pkt_new_size;
383    }
384    return (m);
385}
386
387void CGenNodeStateless::free_stl_vm_buf(){
388        rte_mbuf_t * m ;
389         m=get_const_mbuf();
390         if (m) {
391             rte_pktmbuf_free(m); /* reduce the ref counter */
392             /* clear the const marker */
393             clear_const_mbuf();
394         }
395
396         free_prefix_header();
397
398         if (m_vm_flow_var) {
399             /* free flow var */
400             free(m_vm_flow_var);
401             m_vm_flow_var=0;
402         }
403}
404
405
406
407void CGenNodeStateless::free_stl_node(){
408
409    if ( is_cache_mbuf_array() ){
410        /* do we have cache of mbuf pre allocated */
411        cache_mbuf_array_free();
412    }else{
413        /* if we have cache mbuf free it */
414        rte_mbuf_t * m=get_cache_mbuf();
415        if (m) {
416                rte_pktmbuf_free(m);
417                m_cache_mbuf=0;
418        }
419    }
420    free_stl_vm_buf();
421}
422
423
424bool TrexStatelessDpPerPort::update_number_of_active_streams(uint32_t d){
425    m_active_streams-=d; /* reduce the number of streams */
426    if (m_active_streams == 0) {
427        return (true);
428    }
429    return (false);
430}
431
432bool TrexStatelessDpPerPort::resume_traffic(uint8_t port_id){
433
434    /* we are working with continues streams so we must be in transmit mode */
435    assert(m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE);
436
437    for (auto dp_stream : m_active_nodes) {
438        CGenNodeStateless * node =dp_stream.m_node;
439        assert(node->get_port_id() == port_id);
440        assert(node->is_pause() == true);
441        node->set_pause(false);
442    }
443    m_state = TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
444    return (true);
445}
446
447bool TrexStatelessDpPerPort::update_traffic(uint8_t port_id, double factor) {
448
449    assert( (m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING ||
450            (m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE)) );
451
452    for (auto dp_stream : m_active_nodes) {
453        CGenNodeStateless * node = dp_stream.m_node;
454        assert(node->get_port_id() == port_id);
455
456        node->update_rate(factor);
457    }
458
459    return (true);
460}
461
462bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){
463
464    /* we are working with continues streams so we must be in transmit mode */
465    assert(m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING);
466
467    for (auto dp_stream : m_active_nodes) {
468        CGenNodeStateless * node =dp_stream.m_node;
469        assert(node->get_port_id() == port_id);
470        assert(node->is_pause() == false);
471        node->set_pause(true);
472    }
473    m_state = TrexStatelessDpPerPort::ppSTATE_PAUSE;
474    return (true);
475}
476
477bool TrexStatelessDpPerPort::push_pcap(uint8_t port_id,
478                                       const std::string &pcap_filename,
479                                       double ipg_usec,
480                                       double speedup,
481                                       uint32_t count,
482                                       bool is_dual) {
483
484    /* push pcap can only happen on an idle port from the core prespective */
485    assert(m_state == TrexStatelessDpPerPort::ppSTATE_IDLE);
486
487    CGenNodePCAP *pcap_node = m_core->allocate_pcap_node();
488    if (!pcap_node) {
489        return (false);
490    }
491
492    pkt_dir_t dir          = m_core->m_node_gen.m_v_if->port_id_to_dir(port_id);
493    socket_id_t socket_id  = m_core->m_node_gen.m_socket_id;
494
495    /* main port */
496    uint8_t mac_addr[12];
497    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, mac_addr);
498
499    /* for dual */
500    uint8_t slave_mac_addr[12];
501    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir ^ 0x1, slave_mac_addr);
502
503    bool rc = pcap_node->create(port_id,
504                                dir,
505                                socket_id,
506                                mac_addr,
507                                slave_mac_addr,
508                                pcap_filename,
509                                ipg_usec,
510                                speedup,
511                                count,
512                                is_dual);
513    if (!rc) {
514        m_core->free_node((CGenNode *)pcap_node);
515        return (false);
516    }
517
518    /* schedule the node for now */
519    pcap_node->m_time = m_core->m_cur_time_sec;
520    m_core->m_node_gen.add_node((CGenNode *)pcap_node);
521
522    /* hold a pointer to the node */
523    assert(m_active_pcap_node == NULL);
524    m_active_pcap_node = pcap_node;
525
526    m_state = TrexStatelessDpPerPort::ppSTATE_PCAP_TX;
527    return (true);
528}
529
530
531bool TrexStatelessDpPerPort::stop_traffic(uint8_t  port_id,
532                                          bool     stop_on_id,
533                                          int      event_id){
534
535
536    if (m_state == TrexStatelessDpPerPort::ppSTATE_IDLE) {
537        assert(m_active_streams==0);
538        return false;
539    }
540
541    /* there could be race of stop after stop */
542    if ( stop_on_id ) {
543        if (event_id != m_event_id){
544            /* we can't stop it is an old message */
545            return false;
546        }
547    }
548
549    for (auto dp_stream : m_active_nodes) {
550        CGenNodeStateless * node =dp_stream.m_node;
551        assert(node->get_port_id() == port_id);
552        if ( node->get_state() == CGenNodeStateless::ss_ACTIVE) {
553            node->mark_for_free();
554            m_active_streams--;
555            dp_stream.DeleteOnlyStream();
556
557        }else{
558            dp_stream.Delete(m_core);
559        }
560    }
561
562    /* check for active PCAP node */
563    if (m_active_pcap_node) {
564        /* when got async stop from outside or duration */
565        if (m_active_pcap_node->is_active()) {
566            m_active_pcap_node->mark_for_free();
567        } else {
568            /* graceful stop - node was put out by the scheduler */
569            m_core->free_node( (CGenNode *)m_active_pcap_node);
570        }
571
572        m_active_pcap_node = NULL;
573    }
574
575    /* active stream should be zero */
576    assert(m_active_streams==0);
577    m_active_nodes.clear();
578    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
579    return (true);
580}
581
582
583void TrexStatelessDpPerPort::create(CFlowGenListPerThread   *  core){
584    m_core=core;
585    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
586    m_active_streams=0;
587    m_active_nodes.clear();
588    m_active_pcap_node = NULL;
589}
590
591
592
593void
594TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) {
595    m_thread_id = thread_id;
596    m_core = core;
597    m_local_port_offset = 2*core->getDualPortId();
598
599    CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
600
601    m_ring_from_cp = cp_dp->getRingCpToDp(thread_id);
602    m_ring_to_cp   = cp_dp->getRingDpToCp(thread_id);
603
604    m_state = STATE_IDLE;
605
606    int i;
607    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
608        m_ports[i].create(core);
609    }
610}
611
612
613/* move to the next stream, old stream move to INACTIVE */
614bool TrexStatelessDpCore::set_stateless_next_node(CGenNodeStateless * cur_node,
615                                                  CGenNodeStateless * next_node){
616
617    assert(cur_node);
618    TrexStatelessDpPerPort * lp_port = get_port_db(cur_node->m_port_id);
619    bool schedule =false;
620
621    bool to_stop_port=false;
622
623    if (next_node == NULL) {
624        /* there is no next stream , reduce the number of active streams*/
625        to_stop_port = lp_port->update_number_of_active_streams(1);
626
627    }else{
628        uint8_t state=next_node->get_state();
629
630        /* can't be FREE_RESUSE */
631        assert(state != CGenNodeStateless::ss_FREE_RESUSE);
632        if (state == CGenNodeStateless::ss_INACTIVE ) {
633
634            if (cur_node->m_action_counter > 0) {
635                cur_node->m_action_counter--;
636                if (cur_node->m_action_counter==0) {
637                    to_stop_port = lp_port->update_number_of_active_streams(1);
638                }else{
639                    /* refill start info and scedule, no update in active streams  */
640                    next_node->refresh();
641                    schedule = true;
642                }
643            }else{
644                /* refill start info and scedule, no update in active streams  */
645                next_node->refresh();
646                schedule = true;
647            }
648
649        }else{
650            to_stop_port = lp_port->update_number_of_active_streams(1);
651        }
652    }
653
654    if ( to_stop_port ) {
655        /* call stop port explictly to move the state */
656        stop_traffic(cur_node->m_port_id,false,0);
657    }
658
659    return ( schedule );
660}
661
662
663
664/**
665 * in idle state loop, the processor most of the time sleeps
666 * and periodically checks for messages
667 *
668 * @author imarom (01-Nov-15)
669 */
670void
671TrexStatelessDpCore::idle_state_loop() {
672
673    const int SHORT_DELAY_MS    = 2;
674    const int LONG_DELAY_MS     = 50;
675    const int DEEP_SLEEP_LIMIT  = 2000;
676
677    int counter = 0;
678
679    while (m_state == STATE_IDLE) {
680        m_core->tickle();
681        m_core->m_node_gen.m_v_if->handle_rx_queue();
682        bool had_msg = periodic_check_for_cp_messages();
683        if (had_msg) {
684            counter = 0;
685            continue;
686        }
687
688        /* enter deep sleep only if enough time had passed */
689        if (counter < DEEP_SLEEP_LIMIT) {
690            delay(SHORT_DELAY_MS);
691            counter++;
692        } else {
693            delay(LONG_DELAY_MS);
694        }
695
696    }
697}
698
699
700
701void TrexStatelessDpCore::quit_main_loop(){
702    m_core->set_terminate_mode(true); /* mark it as terminated */
703    m_state = STATE_TERMINATE;
704    add_global_duration(0.0001);
705}
706
707
708/**
709 * scehduler runs when traffic exists
710 * it will return when no more transmitting is done on this
711 * core
712 *
713 * @author imarom (01-Nov-15)
714 */
715void
716TrexStatelessDpCore::start_scheduler() {
717    /* creates a maintenace job using the scheduler */
718    CGenNode * node_sync = m_core->create_node() ;
719    node_sync->m_type = CGenNode::FLOW_SYNC;
720    node_sync->m_time = m_core->m_cur_time_sec + SYNC_TIME_OUT;
721
722    m_core->m_node_gen.add_node(node_sync);
723
724    double old_offset = 0.0;
725    m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset);
726    /* bail out in case of terminate */
727    if (m_state != TrexStatelessDpCore::STATE_TERMINATE) {
728        m_core->m_node_gen.close_file(m_core);
729        m_state = STATE_IDLE; /* we exit from all ports and we have nothing to do, we move to IDLE state */
730    }
731}
732
733
734void
735TrexStatelessDpCore::run_once(){
736
737    idle_state_loop();
738
739    if ( m_state == STATE_TERMINATE ){
740        return;
741    }
742
743    start_scheduler();
744}
745
746
747
748
749void
750TrexStatelessDpCore::start() {
751
752    while (true) {
753        run_once();
754
755        if ( m_core->is_terminated_by_master() ) {
756            break;
757        }
758    }
759}
760
761/* only if both port are idle we can exit */
762void
763TrexStatelessDpCore::schedule_exit(){
764
765    CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
766
767    node->m_type = CGenNode::COMMAND;
768
769    node->m_cmd = new TrexStatelessDpCanQuit();
770
771    /* make sure it will be scheduled after the current node */
772    node->m_time = m_core->m_cur_time_sec ;
773
774    m_core->m_node_gen.add_node((CGenNode *)node);
775}
776
777
778void
779TrexStatelessDpCore::add_global_duration(double duration){
780    if (duration > 0.0) {
781        CGenNode *node = m_core->create_node() ;
782
783        node->m_type = CGenNode::EXIT_SCHED;
784
785        /* make sure it will be scheduled after the current node */
786        node->m_time = m_core->m_cur_time_sec + duration ;
787
788        m_core->m_node_gen.add_node(node);
789    }
790}
791
792/* add per port exit */
793void
794TrexStatelessDpCore::add_port_duration(double duration,
795                                       uint8_t port_id,
796                                       int event_id){
797    if (duration > 0.0) {
798        CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
799
800        node->m_type = CGenNode::COMMAND;
801
802        /* make sure it will be scheduled after the current node */
803        node->m_time = m_core->m_cur_time_sec + duration ;
804
805        TrexStatelessDpStop * cmd=new TrexStatelessDpStop(port_id);
806
807
808        /* test this */
809        m_core->m_non_active_nodes++;
810        cmd->set_core_ptr(m_core);
811        cmd->set_event_id(event_id);
812        cmd->set_wait_for_event_id(true);
813
814        node->m_cmd = cmd;
815
816        m_core->m_node_gen.add_node((CGenNode *)node);
817    }
818}
819
820
821void TrexStatelessDpCore::update_mac_addr(TrexStream * stream,
822                                          CGenNodeStateless *node,
823                                          pkt_dir_t dir,
824                                          char *raw_pkt){
825    bool              ov_src = stream->get_override_src_mac_by_pkt_data();
826    TrexStream::stream_dst_mac_t  ov_dst = stream->get_override_dst_mac_mode();
827
828
829    if ( (ov_src == true) && (ov_dst == TrexStream::stPKT) ) {
830        /* nothing to do, take from the packet both */
831        return;
832    }
833
834        /* take from cfg_file */
835    if ( (ov_src == false) &&
836         (ov_dst == TrexStream::stCFG_FILE) ){
837
838          m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*)raw_pkt);
839          return;
840    }
841
842    /* save the pkt*/
843    char tmp_pkt[12];
844    memcpy(tmp_pkt,raw_pkt,12);
845
846    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*)raw_pkt);
847
848    if ((ov_src == true) && (ov_dst == TrexStream::stCFG_FILE)) {
849        memcpy(raw_pkt+6,tmp_pkt+6,6);
850    }
851
852    if ((ov_src == false) && (ov_dst == TrexStream::stPKT)) {
853        memcpy(raw_pkt,tmp_pkt,6);
854    }
855}
856
857
858void TrexStatelessDpCore::replay_vm_into_cache(TrexStream * stream,
859                                               CGenNodeStateless *node){
860
861    uint16_t      cache_size = stream->m_cache_size;
862    assert(cache_size>0);
863    rte_mbuf_t * m=0;
864
865    uint32_t buf_size = CGenNodeCacheMbuf::get_object_size(cache_size);
866    CGenNodeCacheMbuf * p = (CGenNodeCacheMbuf *)malloc(buf_size);
867    assert(p);
868    memset(p,0,buf_size);
869
870    int i;
871    for (i=0; i<cache_size; i++) {
872        p->m_array[i] =  node->alloc_node_with_vm();
873    }
874    /* save const */
875    m=node->get_const_mbuf();
876    if (m) {
877        p->m_mbuf_const=m;
878        rte_pktmbuf_refcnt_update(m,1);
879    }
880
881    /* free all VM and const mbuf */
882    node->free_stl_vm_buf();
883
884    /* copy to local node meory */
885    node->cache_mbuf_array_copy(p,cache_size);
886
887    /* free the memory */
888    free(p);
889}
890
891
892void
893TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port,
894                                TrexStream * stream,
895                                TrexStreamsCompiledObj *comp) {
896
897    CGenNodeStateless *node = m_core->create_node_sl();
898
899    node->m_thread_id = m_thread_id;
900    node->cache_mbuf_array_init();
901    node->m_batch_size=0;
902
903    /* add periodic */
904    node->m_cache_mbuf=0;
905    node->m_type = CGenNode::STATELESS_PKT;
906
907    node->m_action_counter = stream->m_action_count;
908
909    /* clone the stream from control plane memory to DP memory */
910    node->m_ref_stream_info = stream->clone();
911    /* no need for this memory anymore on the control plane memory */
912    stream->release_dp_object();
913
914    node->m_next_stream=0; /* will be fixed later */
915
916    if ( stream->m_self_start ){
917        /* if self start it is in active mode */
918        node->m_state =CGenNodeStateless::ss_ACTIVE;
919        lp_port->m_active_streams++;
920    }else{
921        node->m_state =CGenNodeStateless::ss_INACTIVE;
922    }
923
924    node->m_time = m_core->m_cur_time_sec + stream->get_start_delay_sec();
925
926    pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(stream->m_port_id);
927    node->m_flags = 0;
928    node->m_src_port =0;
929    node->m_original_packet_data_prefix = 0;
930
931    if (stream->m_rx_check.m_enabled) {
932        node->set_stat_needed();
933        uint8_t hw_id = stream->m_rx_check.m_hw_id;
934        assert (hw_id < MAX_FLOW_STATS + MAX_FLOW_STATS_PAYLOAD);
935        node->set_stat_hw_id(hw_id);
936        // no support for cache with flow stat payload rules
937        if ((TrexPlatformApi::driver_stat_cap_e)stream->m_rx_check.m_rule_type == TrexPlatformApi::IF_STAT_PAYLOAD) {
938            stream->m_cache_size = 0;
939        }
940    }
941
942    /* set socket id */
943    node->set_socket_id(m_core->m_node_gen.m_socket_id);
944
945    /* build a mbuf from a packet */
946
947    uint16_t pkt_size = stream->m_pkt.len;
948    const uint8_t *stream_pkt = stream->m_pkt.binary;
949
950    node->m_pause =0;
951    node->m_stream_type = stream->m_type;
952    node->m_next_time_offset = 1.0 / stream->get_pps();
953    node->m_null_stream = (stream->m_null_stream ? 1 : 0);
954
955    /* stateless specific fields */
956    switch ( stream->m_type ) {
957
958    case TrexStream::stCONTINUOUS :
959        node->m_single_burst=0;
960        node->m_single_burst_refill=0;
961        node->m_multi_bursts=0;
962        break;
963
964    case TrexStream::stSINGLE_BURST :
965        node->m_stream_type             = TrexStream::stMULTI_BURST;
966        node->m_single_burst            = stream->m_burst_total_pkts;
967        node->m_single_burst_refill     = stream->m_burst_total_pkts;
968        node->m_multi_bursts            = 1;  /* single burst in multi burst of 1 */
969        break;
970
971    case TrexStream::stMULTI_BURST :
972        node->m_single_burst        = stream->m_burst_total_pkts;
973        node->m_single_burst_refill = stream->m_burst_total_pkts;
974        node->m_multi_bursts        = stream->m_num_bursts;
975        break;
976    default:
977
978        assert(0);
979    };
980
981    node->m_port_id = stream->m_port_id;
982
983    /* set dir 0 or 1 client or server */
984    node->set_mbuf_cache_dir(dir);
985
986
987    if (node->m_ref_stream_info->getDpVm() == NULL) {
988        /* no VM */
989
990        node->m_vm_flow_var =  NULL;
991        node->m_vm_program  =  NULL;
992        node->m_vm_program_size =0;
993
994                /* allocate const mbuf */
995        rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size);
996        assert(m);
997
998        char *p = rte_pktmbuf_append(m, pkt_size);
999        assert(p);
1000        /* copy the packet */
1001        memcpy(p,stream_pkt,pkt_size);
1002
1003        update_mac_addr(stream,node,dir,p);
1004
1005        /* set the packet as a readonly */
1006        node->set_cache_mbuf(m);
1007
1008        node->m_original_packet_data_prefix =0;
1009    }else{
1010
1011        /* set the program */
1012        TrexStream * local_mem_stream = node->m_ref_stream_info;
1013
1014        StreamVmDp  * lpDpVm = local_mem_stream->getDpVm();
1015
1016        node->m_vm_flow_var      = lpDpVm->clone_bss(); /* clone the flow var */
1017        node->m_vm_program       = lpDpVm->get_program(); /* same ref to the program */
1018        node->m_vm_program_size  = lpDpVm->get_program_size();
1019
1020        /* generate random seed if needed*/
1021        if (lpDpVm->is_random_seed()) {
1022            node->generate_random_seed();
1023        }
1024
1025        /* we need to copy the object */
1026        if ( pkt_size > lpDpVm->get_prefix_size() ) {
1027            /* we need const packet */
1028            uint16_t const_pkt_size  = pkt_size - lpDpVm->get_prefix_size() ;
1029            rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), const_pkt_size );
1030            assert(m);
1031
1032            char *p = rte_pktmbuf_append(m, const_pkt_size);
1033            assert(p);
1034
1035            /* copy packet data */
1036            memcpy(p,(stream_pkt + lpDpVm->get_prefix_size()),const_pkt_size);
1037
1038            node->set_const_mbuf(m);
1039        }
1040
1041
1042        if ( lpDpVm->is_pkt_size_var() ) {
1043            // mark the node as varible size
1044            node->set_var_pkt_size();
1045        }
1046
1047
1048        if (lpDpVm->get_prefix_size() > pkt_size ) {
1049            lpDpVm->set_prefix_size(pkt_size);
1050        }
1051
1052        /* copy the headr */
1053        uint16_t header_size = lpDpVm->get_prefix_size();
1054        assert(header_size);
1055        node->alloc_prefix_header(header_size);
1056        uint8_t *p=node->m_original_packet_data_prefix;
1057        assert(p);
1058
1059        memcpy(p,stream_pkt , header_size);
1060
1061        update_mac_addr(stream,node,dir,(char *)p);
1062
1063        if (stream->m_cache_size > 0 ) {
1064            /* we need to create cache of objects */
1065            replay_vm_into_cache(stream, node);
1066        }
1067    }
1068
1069
1070    CDpOneStream one_stream;
1071
1072    one_stream.m_dp_stream = node->m_ref_stream_info;
1073    one_stream.m_node =node;
1074
1075    lp_port->m_active_nodes.push_back(one_stream);
1076
1077    /* schedule only if active */
1078    if (node->m_state == CGenNodeStateless::ss_ACTIVE) {
1079        m_core->m_node_gen.add_node((CGenNode *)node);
1080    }
1081}
1082
1083void
1084TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj,
1085                                   double duration,
1086                                   int event_id) {
1087
1088
1089    TrexStatelessDpPerPort * lp_port=get_port_db(obj->get_port_id());
1090    lp_port->m_active_streams = 0;
1091    lp_port->set_event_id(event_id);
1092
1093    /* update cur time */
1094    if ( CGlobalInfo::is_realtime()  ){
1095        m_core->m_cur_time_sec = now_sec() + SCHD_OFFSET_DTIME ;
1096    }
1097
1098    /* no nodes in the list */
1099    assert(lp_port->m_active_nodes.size()==0);
1100
1101    for (auto single_stream : obj->get_objects()) {
1102        /* all commands should be for the same port */
1103        assert(obj->get_port_id() == single_stream.m_stream->m_port_id);
1104        add_stream(lp_port,single_stream.m_stream,obj);
1105    }
1106
1107    uint32_t nodes = lp_port->m_active_nodes.size();
1108    /* find next stream */
1109    assert(nodes == obj->get_objects().size());
1110
1111    int cnt=0;
1112
1113    /* set the next_stream pointer  */
1114    for (auto single_stream : obj->get_objects()) {
1115
1116        if (single_stream.m_stream->is_dp_next_stream() ) {
1117            int stream_id = single_stream.m_stream->m_next_stream_id;
1118            assert(stream_id<nodes);
1119            /* point to the next stream , stream_id is fixed */
1120            lp_port->m_active_nodes[cnt].m_node->m_next_stream = lp_port->m_active_nodes[stream_id].m_node ;
1121        }
1122        cnt++;
1123    }
1124
1125    lp_port->m_state =TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
1126    m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
1127
1128
1129    if ( duration > 0.0 ){
1130        add_port_duration( duration ,obj->get_port_id(),event_id );
1131    }
1132
1133}
1134
1135
1136bool TrexStatelessDpCore::are_all_ports_idle(){
1137
1138    bool res=true;
1139    int i;
1140    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
1141        if ( m_ports[i].m_state != TrexStatelessDpPerPort::ppSTATE_IDLE ){
1142            res=false;
1143        }
1144    }
1145    return (res);
1146}
1147
1148
1149void
1150TrexStatelessDpCore::resume_traffic(uint8_t port_id){
1151
1152    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1153
1154    lp_port->resume_traffic(port_id);
1155}
1156
1157
1158void
1159TrexStatelessDpCore::pause_traffic(uint8_t port_id){
1160
1161    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1162
1163    lp_port->pause_traffic(port_id);
1164}
1165
1166void
1167TrexStatelessDpCore::push_pcap(uint8_t port_id,
1168                               int event_id,
1169                               const std::string &pcap_filename,
1170                               double ipg_usec,
1171                               double speedup,
1172                               uint32_t count,
1173                               double duration,
1174                               bool is_dual) {
1175
1176    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1177
1178    lp_port->set_event_id(event_id);
1179
1180    /* delegate the command to the port */
1181    bool rc = lp_port->push_pcap(port_id, pcap_filename, ipg_usec, speedup, count, is_dual);
1182    if (!rc) {
1183        /* report back that we stopped */
1184        CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
1185        TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
1186                                                                       port_id,
1187                                                                       event_id,
1188                                                                       false);
1189        ring->Enqueue((CGenNode *)event_msg);
1190        return;
1191    }
1192
1193
1194    if (duration > 0.0) {
1195        add_port_duration(duration, port_id, event_id);
1196    }
1197
1198     m_state = TrexStatelessDpCore::STATE_PCAP_TX;
1199}
1200
1201void
1202TrexStatelessDpCore::update_traffic(uint8_t port_id, double factor) {
1203
1204    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1205
1206    lp_port->update_traffic(port_id, factor);
1207}
1208
1209
1210void
1211TrexStatelessDpCore::stop_traffic(uint8_t  port_id,
1212                                  bool     stop_on_id,
1213                                  int      event_id) {
1214    /* we cannot remove nodes not from the top of the queue so
1215       for every active node - make sure next time
1216       the scheduler invokes it, it will be free */
1217
1218    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1219    if ( lp_port->stop_traffic(port_id,stop_on_id,event_id) == false){
1220        return;
1221    }
1222
1223    /* flush the TX queue before sending done message to the CP */
1224    m_core->flush_tx_queue();
1225
1226    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
1227    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
1228                                                                   port_id,
1229                                                                   lp_port->get_event_id());
1230    ring->Enqueue((CGenNode *)event_msg);
1231
1232}
1233
1234/**
1235 * handle a message from CP to DP
1236 *
1237 */
1238void
1239TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) {
1240    msg->handle(this);
1241    delete msg;
1242}
1243
1244void
1245TrexStatelessDpCore::barrier(uint8_t port_id, int event_id) {
1246
1247    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
1248    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
1249                                                                   port_id,
1250                                                                   event_id);
1251    ring->Enqueue((CGenNode *)event_msg);
1252}
1253
1254
1255/**
1256 * PCAP node
1257 */
1258bool CGenNodePCAP::create(uint8_t port_id,
1259                          pkt_dir_t dir,
1260                          socket_id_t socket_id,
1261                          const uint8_t *mac_addr,
1262                          const uint8_t *slave_mac_addr,
1263                          const std::string &pcap_filename,
1264                          double ipg_usec,
1265                          double speedup,
1266                          uint32_t count,
1267                          bool is_dual) {
1268    std::stringstream ss;
1269
1270    m_type       = CGenNode::PCAP_PKT;
1271    m_flags      = 0;
1272    m_src_port   = 0;
1273    m_port_id    = port_id;
1274    m_count      = count;
1275    m_is_dual    = is_dual;
1276    m_dir        = dir;
1277
1278    /* mark this node as slow path */
1279    set_slow_path(true);
1280
1281    if (ipg_usec != -1) {
1282        /* fixed IPG */
1283        m_ipg_sec = usec_to_sec(ipg_usec / speedup);
1284        m_speedup = 0;
1285    } else {
1286        /* packet IPG */
1287        m_ipg_sec = -1;
1288        m_speedup  = speedup;
1289    }
1290
1291    /* copy MAC addr info */
1292    memcpy(m_mac_addr, mac_addr, 12);
1293    memcpy(m_slave_mac_addr, slave_mac_addr, 12);
1294
1295
1296    set_socket_id(socket_id);
1297
1298    /* create the PCAP reader */
1299    m_reader = CCapReaderFactory::CreateReader((char *)pcap_filename.c_str(), 0, ss);
1300    if (!m_reader) {
1301        return false;
1302    }
1303
1304    m_raw_packet = new CCapPktRaw();
1305    if ( m_reader->ReadPacket(m_raw_packet) == false ){
1306        /* handle error */
1307        delete m_reader;
1308        return (false);
1309    }
1310
1311    /* set the dir */
1312    set_mbuf_dir(dir);
1313
1314    /* update the direction (for dual mode) */
1315    update_pkt_dir();
1316
1317    /* this is the reference time */
1318    m_last_pkt_time = m_raw_packet->get_time();
1319
1320    /* ready */
1321    m_state = PCAP_ACTIVE;
1322
1323    return true;
1324}
1325
1326/**
1327 * cleanup for PCAP node
1328 *
1329 * @author imarom (08-May-16)
1330 */
1331void CGenNodePCAP::destroy() {
1332
1333    if (m_raw_packet) {
1334        delete m_raw_packet;
1335        m_raw_packet = NULL;
1336    }
1337
1338    if (m_reader) {
1339        delete m_reader;
1340        m_reader = NULL;
1341    }
1342
1343    m_state = PCAP_INVALID;
1344}
1345
1346