trex_stateless_dp_core.cpp revision 04eae221
1/*
2 Itay Marom
3 Hanoch Haim
4 Cisco Systems, Inc.
5*/
6
7/*
8Copyright (c) 2015-2016 Cisco Systems, Inc.
9
10Licensed under the Apache License, Version 2.0 (the "License");
11you may not use this file except in compliance with the License.
12You may obtain a copy of the License at
13
14    http://www.apache.org/licenses/LICENSE-2.0
15
16Unless required by applicable law or agreed to in writing, software
17distributed under the License is distributed on an "AS IS" BASIS,
18WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19See the License for the specific language governing permissions and
20limitations under the License.
21*/
22#include "bp_sim.h"
23#include "trex_stateless_dp_core.h"
24#include "trex_stateless_messaging.h"
25#include "trex_stream.h"
26#include "trex_stream_node.h"
27#include "trex_streams_compiler.h"
28#include "mbuf.h"
29
30
31
32
33void CGenNodeStateless::cache_mbuf_array_init(){
34    m_cache_size=0;
35    m_cache_array_cnt=0;
36}
37
38
39
40void CGenNodeStateless::cache_mbuf_array_copy(CGenNodeCacheMbuf *obj,
41                                              uint16_t size){
42
43    int i;
44    cache_mbuf_array_alloc(size);
45    for (i=0; i<size; i++) {
46        cache_mbuf_array_set(i,obj->m_array[i]);
47    }
48    cache_mbuf_array_set_const_mbuf(obj->m_mbuf_const);
49}
50
51
52rte_mbuf_t ** CGenNodeStateless::cache_mbuf_array_alloc(uint16_t size){
53
54    uint32_t buf_size = CGenNodeCacheMbuf::get_object_size(size);
55    /* TBD  replace with align, zero API */
56    m_cache_mbuf = (void *)malloc(buf_size);
57    assert(m_cache_mbuf);
58    memset(m_cache_mbuf,0,buf_size);
59
60    m_flags |= SL_NODE_CONST_MBUF_CACHE_ARRAY;
61    m_cache_size=size;
62    m_cache_array_cnt=0;
63    return ((rte_mbuf_t **)m_cache_mbuf);
64}
65
66void CGenNodeStateless::cache_mbuf_array_free(){
67
68    assert(m_cache_mbuf);
69    int i;
70    for (i=0; i<(int)m_cache_size; i++) {
71        rte_mbuf_t * m=cache_mbuf_array_get((uint16_t)i);
72        assert(m);
73        rte_pktmbuf_free(m);
74    }
75
76    /* free the const */
77    rte_mbuf_t * m=cache_mbuf_array_get_const_mbuf() ;
78    if (m) {
79        rte_pktmbuf_free(m);
80    }
81
82    free(m_cache_mbuf);
83    m_cache_mbuf=0;
84}
85
86
87rte_mbuf_t * CGenNodeStateless::cache_mbuf_array_get(uint16_t index){
88
89    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
90    return (p->m_array[index]);
91}
92
93void CGenNodeStateless::cache_mbuf_array_set_const_mbuf(rte_mbuf_t * m){
94    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
95    p->m_mbuf_const=m;
96}
97
98rte_mbuf_t * CGenNodeStateless::cache_mbuf_array_get_const_mbuf(){
99    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
100    return (p->m_mbuf_const);
101}
102
103
104void CGenNodeStateless::cache_mbuf_array_set(uint16_t index,
105                                             rte_mbuf_t * m){
106    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
107    p->m_array[index]=m;
108}
109
110
111void CDpOneStream::Delete(CFlowGenListPerThread   * core){
112    assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE);
113    core->free_node((CGenNode *)m_node);
114    delete m_dp_stream;
115    m_node=0;
116    m_dp_stream=0;
117}
118
119void CDpOneStream::DeleteOnlyStream(){
120    assert(m_dp_stream);
121    delete m_dp_stream;
122    m_dp_stream=0;
123}
124
125int CGenNodeStateless::get_stream_id(){
126    if (m_state ==CGenNodeStateless::ss_FREE_RESUSE) {
127        return (-1); // not valid
128    }
129    assert(m_ref_stream_info);
130    return ((int)m_ref_stream_info->m_stream_id);
131}
132
133
134void CGenNodeStateless::DumpHeader(FILE *fd){
135    fprintf(fd," pkt_id, time, port , action , state, stream_id , stype , m-burst# , burst# \n");
136
137}
138void CGenNodeStateless::Dump(FILE *fd){
139    fprintf(fd," %2.4f, %3lu, %s,%s, %3d, %s, %3lu, %3lu  \n",
140            m_time,
141            (ulong)m_port_id,
142            "s-pkt", //action
143            get_stream_state_str(m_state ).c_str(),
144            get_stream_id(),   //stream_id
145            TrexStream::get_stream_type_str(m_stream_type).c_str(), //stype
146            (ulong)m_multi_bursts,
147            (ulong)m_single_burst
148            );
149}
150
151
152void CGenNodeStateless::generate_random_seed() {
153    /* seed can be provided by the user */
154    uint32_t unique_seed;
155    if (m_ref_stream_info->m_random_seed) {
156        unique_seed = m_ref_stream_info->m_random_seed;
157    } else {
158        unsigned int tmp = (unsigned int)time(NULL);
159        unique_seed = rand_r(&tmp);
160    }
161
162    /* per thread divergence */
163    unique_seed = (unique_seed * ( (m_thread_id + 1) * 514229 ) ) & 0xFFFFFFFF;
164
165    /* set random */
166    set_random_seed(unique_seed);
167}
168
169
170void CGenNodeStateless::refresh_vm_bss() {
171    if ( m_vm_flow_var ) {
172        StreamVmDp  * vm_s=m_ref_stream_info->m_vm_dp;
173        assert(vm_s);
174        memcpy(m_vm_flow_var,vm_s->get_bss(),vm_s->get_bss_size());
175
176        if ( vm_s->is_random_seed() ) {
177            generate_random_seed();
178        }
179
180    }
181}
182
183
184
185/**
186 * this function called when stream restart after it was inactive
187 */
188void CGenNodeStateless::refresh(){
189
190    /* refill the stream info */
191    m_single_burst    = m_single_burst_refill;
192    m_multi_bursts    = m_ref_stream_info->m_num_bursts;
193    m_state           = CGenNodeStateless::ss_ACTIVE;
194
195    /* refresh init value */
196#if 0
197    /* TBD should add a JSON varible for that */
198    refresh_vm_bss();
199#endif
200}
201
202
203void CGenNodeCommand::free_command(){
204
205    assert(m_cmd);
206    m_cmd->on_node_remove();
207    delete m_cmd;
208}
209
210
211std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state){
212    std::string res;
213
214    switch (stream_state) {
215    case CGenNodeStateless::ss_FREE_RESUSE :
216         res="FREE    ";
217        break;
218    case CGenNodeStateless::ss_INACTIVE :
219        res="INACTIVE ";
220        break;
221    case CGenNodeStateless::ss_ACTIVE :
222        res="ACTIVE   ";
223        break;
224    default:
225        res="Unknow   ";
226    };
227    return(res);
228}
229
230/*
231 * Allocate mbuf for flow stat (and latency) info sending
232 * m - Original mbuf (can be complicated mbuf data structure)
233 * fsp_head - return pointer in which the flow stat info should be filled
234 * is_const - is the given mbuf const
235 * return new mbuf structure in which the fsp_head can be written. If needed, orginal mbuf is freed.
236 */
237rte_mbuf_t * CGenNodeStateless::alloc_flow_stat_mbuf(rte_mbuf_t *m, struct flow_stat_payload_header *&fsp_head
238                                                     , bool is_const) {
239    rte_mbuf_t *m_ret = NULL, *m_lat = NULL;
240    uint16_t fsp_head_size = sizeof(struct flow_stat_payload_header);
241
242    if (is_const) {
243        // const mbuf case
244        if (rte_pktmbuf_data_len(m) > 128) {
245            m_ret = CGlobalInfo::pktmbuf_alloc_small(get_socket_id());
246            assert(m_ret);
247            // alloc mbuf just for the latency header
248            m_lat = CGlobalInfo::pktmbuf_alloc( get_socket_id(), fsp_head_size);
249            assert(m_lat);
250            fsp_head = (struct flow_stat_payload_header *)rte_pktmbuf_append(m_lat, fsp_head_size);
251            rte_pktmbuf_attach(m_ret, m);
252            rte_pktmbuf_trim(m_ret, sizeof(struct flow_stat_payload_header));
253            utl_rte_pktmbuf_add_after2(m_ret, m_lat);
254            // ref count was updated when we took the (const) mbuf, and again in rte_pktmbuf_attach
255            // so need do decrease now, to avoid leak.
256            rte_pktmbuf_refcnt_update(m, -1);
257            return m_ret;
258        } else {
259            // Short packet. Just copy all bytes.
260            m_ret = CGlobalInfo::pktmbuf_alloc( get_socket_id(), rte_pktmbuf_data_len(m) );
261            assert(m_ret);
262            char *p = rte_pktmbuf_mtod(m, char*);
263            char *p_new = rte_pktmbuf_append(m_ret, rte_pktmbuf_data_len(m));
264            memcpy(p_new , p, rte_pktmbuf_data_len(m));
265            fsp_head = (struct flow_stat_payload_header *)(p_new + rte_pktmbuf_data_len(m) - fsp_head_size);
266            rte_pktmbuf_free(m);
267            return m_ret;
268        }
269    } else {
270        // Field engine (vm)
271        if (rte_pktmbuf_is_contiguous(m)) {
272            // one, r/w mbuf
273            char *p = rte_pktmbuf_mtod(m, char*);
274            fsp_head = (struct flow_stat_payload_header *)(p + rte_pktmbuf_data_len(m) - fsp_head_size);
275            return m;
276        } else {
277            // We have: r/w --> read only.
278            // Changing to:
279            // (original) r/w -> (new) indirect (direct is original read_only, after trimming last bytes) -> (new) latency info
280            rte_mbuf_t *m_read_only = m->next, *m_indirect;
281
282            m_indirect = CGlobalInfo::pktmbuf_alloc_small(get_socket_id());
283            assert(m_indirect);
284            // alloc mbuf just for the latency header
285            m_lat = CGlobalInfo::pktmbuf_alloc( get_socket_id(), fsp_head_size);
286            assert(m_lat);
287            fsp_head = (struct flow_stat_payload_header *)rte_pktmbuf_append(m_lat, fsp_head_size);
288            utl_rte_pktmbuf_chain_with_indirect(m, m_indirect, m_read_only, m_lat);
289            m_indirect->data_len = (uint16_t)(m_indirect->data_len - fsp_head_size);
290            return m;
291        }
292    }
293}
294
295// test the const case of alloc_flow_stat_mbuf. The more complicated non const case is tested in the simulation.
296bool CGenNodeStateless::alloc_flow_stat_mbuf_test_const() {
297    rte_mbuf_t *m, *m_test;
298    uint16_t sizes[2] = {64, 500};
299    uint16_t size;
300    struct flow_stat_payload_header *fsp_head;
301    char *p;
302
303    set_socket_id(0);
304    for (int test_num = 0; test_num < sizeof(sizes)/sizeof(sizes[0]); test_num++) {
305        size = sizes[test_num];
306        m = CGlobalInfo::pktmbuf_alloc(get_socket_id(), size);
307        p = rte_pktmbuf_append(m, size);
308        for (int i = 0; i < size; i++) {
309            p[i] = (char)i;
310        }
311        m_test = alloc_flow_stat_mbuf(m, fsp_head, true);
312        p = rte_pktmbuf_mtod(m_test, char*);
313        assert(rte_pktmbuf_pkt_len(m_test) == size);
314        for (int i = 0; i < rte_pktmbuf_pkt_len(m_test) - sizeof(*fsp_head); i++) {
315            assert(p[i] == (char)i);
316        }
317        // verify fsp_head points correctly
318        if (size > 128) { // should match threshould in alloc_flow_stat_mbuf
319            assert(rte_pktmbuf_data_len(m_test) == size - sizeof(*fsp_head));
320            assert(rte_pktmbuf_data_len(m_test->next) == sizeof(*fsp_head));
321            assert((char *)fsp_head == rte_pktmbuf_mtod((m_test->next), char*));
322        } else {
323            assert(rte_pktmbuf_data_len(m_test) == size);
324            assert (((char *)fsp_head) + sizeof (*fsp_head) == p + rte_pktmbuf_data_len(m_test));
325        }
326        rte_pktmbuf_free(m_test);
327    }
328    return true;
329}
330
331rte_mbuf_t   * CGenNodeStateless::alloc_node_with_vm(){
332
333    rte_mbuf_t        * m;
334    /* alloc small packet buffer*/
335    uint16_t prefix_size = prefix_header_size();
336    m = CGlobalInfo::pktmbuf_alloc( get_socket_id(), prefix_size );
337    if (m==0) {
338        return (m);
339    }
340    /* TBD remove this, should handle cases of error */
341    assert(m);
342    char *p=rte_pktmbuf_append(m, prefix_size);
343    memcpy( p ,m_original_packet_data_prefix, prefix_size);
344
345
346    /* run the VM program */
347    StreamDPVmInstructionsRunner runner;
348
349    runner.run( (uint32_t*)m_vm_flow_var,
350                m_vm_program_size,
351                m_vm_program,
352                m_vm_flow_var,
353                (uint8_t*)p);
354
355    uint16_t pkt_new_size=runner.get_new_pkt_size();
356    if ( likely( pkt_new_size == 0) ) {
357        /* no packet size change */
358        rte_mbuf_t * m_const = get_const_mbuf();
359        if (  m_const != NULL) {
360            utl_rte_pktmbuf_add_after(m,m_const);
361        }
362        return (m);
363    }
364
365    /* packet size change there are a few changes */
366    rte_mbuf_t * m_const = get_const_mbuf();
367    if ( (m_const == 0 ) || (pkt_new_size<=prefix_size) ) {
368        /* one mbuf , just trim it */
369        m->data_len = pkt_new_size;
370        m->pkt_len  = pkt_new_size;
371        return (m);
372    }
373
374    rte_mbuf_t * mi= CGlobalInfo::pktmbuf_alloc_small(get_socket_id());
375    assert(mi);
376    rte_pktmbuf_attach(mi,m_const);
377    utl_rte_pktmbuf_add_after2(m,mi);
378
379    if ( pkt_new_size < m->pkt_len) {
380        /* need to trim it */
381        mi->data_len = (pkt_new_size - prefix_size);
382        m->pkt_len   = pkt_new_size;
383    }
384    return (m);
385}
386
387void CGenNodeStateless::free_stl_vm_buf(){
388        rte_mbuf_t * m ;
389         m=get_const_mbuf();
390         if (m) {
391             rte_pktmbuf_free(m); /* reduce the ref counter */
392             /* clear the const marker */
393             clear_const_mbuf();
394         }
395
396         free_prefix_header();
397
398         if (m_vm_flow_var) {
399             /* free flow var */
400             free(m_vm_flow_var);
401             m_vm_flow_var=0;
402         }
403}
404
405
406
407void CGenNodeStateless::free_stl_node(){
408
409    if ( is_cache_mbuf_array() ){
410        /* do we have cache of mbuf pre allocated */
411        cache_mbuf_array_free();
412    }else{
413        /* if we have cache mbuf free it */
414        rte_mbuf_t * m=get_cache_mbuf();
415        if (m) {
416                rte_pktmbuf_free(m);
417                m_cache_mbuf=0;
418        }
419    }
420    free_stl_vm_buf();
421}
422
423
424bool TrexStatelessDpPerPort::update_number_of_active_streams(uint32_t d){
425    m_active_streams-=d; /* reduce the number of streams */
426    if (m_active_streams == 0) {
427        return (true);
428    }
429    return (false);
430}
431
432bool TrexStatelessDpPerPort::resume_traffic(uint8_t port_id){
433
434    /* we are working with continues streams so we must be in transmit mode */
435    assert(m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE);
436
437    for (auto dp_stream : m_active_nodes) {
438        CGenNodeStateless * node =dp_stream.m_node;
439        assert(node->get_port_id() == port_id);
440        assert(node->is_pause() == true);
441        node->set_pause(false);
442    }
443    m_state = TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
444    return (true);
445}
446
447bool TrexStatelessDpPerPort::update_traffic(uint8_t port_id, double factor) {
448
449    assert( (m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING ||
450            (m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE)) );
451
452    for (auto dp_stream : m_active_nodes) {
453        CGenNodeStateless * node = dp_stream.m_node;
454        assert(node->get_port_id() == port_id);
455
456        node->update_rate(factor);
457    }
458
459    return (true);
460}
461
462bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){
463
464    /* we are working with continues streams so we must be in transmit mode */
465    assert(m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING);
466
467    for (auto dp_stream : m_active_nodes) {
468        CGenNodeStateless * node =dp_stream.m_node;
469        assert(node->get_port_id() == port_id);
470        assert(node->is_pause() == false);
471        node->set_pause(true);
472    }
473    m_state = TrexStatelessDpPerPort::ppSTATE_PAUSE;
474    return (true);
475}
476
477bool TrexStatelessDpPerPort::push_pcap(uint8_t port_id,
478                                       const std::string &pcap_filename,
479                                       double ipg_usec,
480                                       double speedup,
481                                       uint32_t count,
482                                       bool is_dual) {
483
484    /* push pcap can only happen on an idle port from the core prespective */
485    assert(m_state == TrexStatelessDpPerPort::ppSTATE_IDLE);
486
487    CGenNodePCAP *pcap_node = m_core->allocate_pcap_node();
488    if (!pcap_node) {
489        return (false);
490    }
491
492    pkt_dir_t dir          = m_core->m_node_gen.m_v_if->port_id_to_dir(port_id);
493    socket_id_t socket_id  = m_core->m_node_gen.m_socket_id;
494
495    uint8_t mac_addr[12];
496    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, mac_addr);
497
498    bool rc = pcap_node->create(port_id,
499                                dir,
500                                socket_id,
501                                mac_addr,
502                                pcap_filename,
503                                ipg_usec,
504                                speedup,
505                                count,
506                                is_dual);
507    if (!rc) {
508        m_core->free_node((CGenNode *)pcap_node);
509        return (false);
510    }
511
512    /* schedule the node for now */
513    pcap_node->m_time = m_core->m_cur_time_sec;
514    m_core->m_node_gen.add_node((CGenNode *)pcap_node);
515
516    /* hold a pointer to the node */
517    assert(m_active_pcap_node == NULL);
518    m_active_pcap_node = pcap_node;
519
520    m_state = TrexStatelessDpPerPort::ppSTATE_PCAP_TX;
521    return (true);
522}
523
524
525bool TrexStatelessDpPerPort::stop_traffic(uint8_t  port_id,
526                                          bool     stop_on_id,
527                                          int      event_id){
528
529
530    if (m_state == TrexStatelessDpPerPort::ppSTATE_IDLE) {
531        assert(m_active_streams==0);
532        return false;
533    }
534
535    /* there could be race of stop after stop */
536    if ( stop_on_id ) {
537        if (event_id != m_event_id){
538            /* we can't stop it is an old message */
539            return false;
540        }
541    }
542
543    for (auto dp_stream : m_active_nodes) {
544        CGenNodeStateless * node =dp_stream.m_node;
545        assert(node->get_port_id() == port_id);
546        if ( node->get_state() == CGenNodeStateless::ss_ACTIVE) {
547            node->mark_for_free();
548            m_active_streams--;
549            dp_stream.DeleteOnlyStream();
550
551        }else{
552            dp_stream.Delete(m_core);
553        }
554    }
555
556    /* check for active PCAP node */
557    if (m_active_pcap_node) {
558        /* when got async stop from outside or duration */
559        if (m_active_pcap_node->is_active()) {
560            m_active_pcap_node->mark_for_free();
561        } else {
562            /* graceful stop - node was put out by the scheduler */
563            m_core->free_node( (CGenNode *)m_active_pcap_node);
564        }
565
566        m_active_pcap_node = NULL;
567    }
568
569    /* active stream should be zero */
570    assert(m_active_streams==0);
571    m_active_nodes.clear();
572    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
573    return (true);
574}
575
576
577void TrexStatelessDpPerPort::create(CFlowGenListPerThread   *  core){
578    m_core=core;
579    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
580    m_active_streams=0;
581    m_active_nodes.clear();
582    m_active_pcap_node = NULL;
583}
584
585
586
587void
588TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) {
589    m_thread_id = thread_id;
590    m_core = core;
591    m_local_port_offset = 2*core->getDualPortId();
592
593    CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
594
595    m_ring_from_cp = cp_dp->getRingCpToDp(thread_id);
596    m_ring_to_cp   = cp_dp->getRingDpToCp(thread_id);
597
598    m_state = STATE_IDLE;
599
600    int i;
601    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
602        m_ports[i].create(core);
603    }
604}
605
606
607/* move to the next stream, old stream move to INACTIVE */
608bool TrexStatelessDpCore::set_stateless_next_node(CGenNodeStateless * cur_node,
609                                                  CGenNodeStateless * next_node){
610
611    assert(cur_node);
612    TrexStatelessDpPerPort * lp_port = get_port_db(cur_node->m_port_id);
613    bool schedule =false;
614
615    bool to_stop_port=false;
616
617    if (next_node == NULL) {
618        /* there is no next stream , reduce the number of active streams*/
619        to_stop_port = lp_port->update_number_of_active_streams(1);
620
621    }else{
622        uint8_t state=next_node->get_state();
623
624        /* can't be FREE_RESUSE */
625        assert(state != CGenNodeStateless::ss_FREE_RESUSE);
626        if (state == CGenNodeStateless::ss_INACTIVE ) {
627
628            if (cur_node->m_action_counter > 0) {
629                cur_node->m_action_counter--;
630                if (cur_node->m_action_counter==0) {
631                    to_stop_port = lp_port->update_number_of_active_streams(1);
632                }else{
633                    /* refill start info and scedule, no update in active streams  */
634                    next_node->refresh();
635                    schedule = true;
636                }
637            }else{
638                /* refill start info and scedule, no update in active streams  */
639                next_node->refresh();
640                schedule = true;
641            }
642
643        }else{
644            to_stop_port = lp_port->update_number_of_active_streams(1);
645        }
646    }
647
648    if ( to_stop_port ) {
649        /* call stop port explictly to move the state */
650        stop_traffic(cur_node->m_port_id,false,0);
651    }
652
653    return ( schedule );
654}
655
656
657
658/**
659 * in idle state loop, the processor most of the time sleeps
660 * and periodically checks for messages
661 *
662 * @author imarom (01-Nov-15)
663 */
664void
665TrexStatelessDpCore::idle_state_loop() {
666
667    const int SHORT_DELAY_MS    = 2;
668    const int LONG_DELAY_MS     = 50;
669    const int DEEP_SLEEP_LIMIT  = 2000;
670
671    int counter = 0;
672
673    while (m_state == STATE_IDLE) {
674        m_core->tickle();
675        m_core->m_node_gen.m_v_if->flush_dp_rx_queue();
676        bool had_msg = periodic_check_for_cp_messages();
677        if (had_msg) {
678            counter = 0;
679            continue;
680        }
681
682        /* enter deep sleep only if enough time had passed */
683        if (counter < DEEP_SLEEP_LIMIT) {
684            delay(SHORT_DELAY_MS);
685            counter++;
686        } else {
687            delay(LONG_DELAY_MS);
688        }
689
690    }
691}
692
693
694
695void TrexStatelessDpCore::quit_main_loop(){
696    m_core->set_terminate_mode(true); /* mark it as terminated */
697    m_state = STATE_TERMINATE;
698    add_global_duration(0.0001);
699}
700
701
702/**
703 * scehduler runs when traffic exists
704 * it will return when no more transmitting is done on this
705 * core
706 *
707 * @author imarom (01-Nov-15)
708 */
709void
710TrexStatelessDpCore::start_scheduler() {
711    /* creates a maintenace job using the scheduler */
712    CGenNode * node_sync = m_core->create_node() ;
713    node_sync->m_type = CGenNode::FLOW_SYNC;
714    node_sync->m_time = m_core->m_cur_time_sec + SYNC_TIME_OUT;
715
716    m_core->m_node_gen.add_node(node_sync);
717
718    double old_offset = 0.0;
719    m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset);
720    /* bail out in case of terminate */
721    if (m_state != TrexStatelessDpCore::STATE_TERMINATE) {
722        m_core->m_node_gen.close_file(m_core);
723        m_state = STATE_IDLE; /* we exit from all ports and we have nothing to do, we move to IDLE state */
724    }
725}
726
727
728void
729TrexStatelessDpCore::run_once(){
730
731    idle_state_loop();
732
733    if ( m_state == STATE_TERMINATE ){
734        return;
735    }
736
737    start_scheduler();
738}
739
740
741
742
743void
744TrexStatelessDpCore::start() {
745
746    while (true) {
747        run_once();
748
749        if ( m_core->is_terminated_by_master() ) {
750            break;
751        }
752    }
753}
754
755/* only if both port are idle we can exit */
756void
757TrexStatelessDpCore::schedule_exit(){
758
759    CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
760
761    node->m_type = CGenNode::COMMAND;
762
763    node->m_cmd = new TrexStatelessDpCanQuit();
764
765    /* make sure it will be scheduled after the current node */
766    node->m_time = m_core->m_cur_time_sec ;
767
768    m_core->m_node_gen.add_node((CGenNode *)node);
769}
770
771
772void
773TrexStatelessDpCore::add_global_duration(double duration){
774    if (duration > 0.0) {
775        CGenNode *node = m_core->create_node() ;
776
777        node->m_type = CGenNode::EXIT_SCHED;
778
779        /* make sure it will be scheduled after the current node */
780        node->m_time = m_core->m_cur_time_sec + duration ;
781
782        m_core->m_node_gen.add_node(node);
783    }
784}
785
786/* add per port exit */
787void
788TrexStatelessDpCore::add_port_duration(double duration,
789                                       uint8_t port_id,
790                                       int event_id){
791    if (duration > 0.0) {
792        CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
793
794        node->m_type = CGenNode::COMMAND;
795
796        /* make sure it will be scheduled after the current node */
797        node->m_time = m_core->m_cur_time_sec + duration ;
798
799        TrexStatelessDpStop * cmd=new TrexStatelessDpStop(port_id);
800
801
802        /* test this */
803        m_core->m_non_active_nodes++;
804        cmd->set_core_ptr(m_core);
805        cmd->set_event_id(event_id);
806        cmd->set_wait_for_event_id(true);
807
808        node->m_cmd = cmd;
809
810        m_core->m_node_gen.add_node((CGenNode *)node);
811    }
812}
813
814
815void TrexStatelessDpCore::update_mac_addr(TrexStream * stream,
816                                          CGenNodeStateless *node,
817                                          pkt_dir_t dir,
818                                          char *raw_pkt){
819    bool              ov_src = stream->get_override_src_mac_by_pkt_data();
820    TrexStream::stream_dst_mac_t  ov_dst = stream->get_override_dst_mac_mode();
821
822
823    if ( (ov_src == true) && (ov_dst == TrexStream::stPKT) ) {
824        /* nothing to do, take from the packet both */
825        return;
826    }
827
828        /* take from cfg_file */
829    if ( (ov_src == false) &&
830         (ov_dst == TrexStream::stCFG_FILE) ){
831
832          m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*)raw_pkt);
833          return;
834    }
835
836    /* save the pkt*/
837    char tmp_pkt[12];
838    memcpy(tmp_pkt,raw_pkt,12);
839
840    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*)raw_pkt);
841
842    if ((ov_src == true) && (ov_dst == TrexStream::stCFG_FILE)) {
843        memcpy(raw_pkt+6,tmp_pkt+6,6);
844    }
845
846    if ((ov_src == false) && (ov_dst == TrexStream::stPKT)) {
847        memcpy(raw_pkt,tmp_pkt,6);
848    }
849}
850
851
852void TrexStatelessDpCore::replay_vm_into_cache(TrexStream * stream,
853                                               CGenNodeStateless *node){
854
855    uint16_t      cache_size = stream->m_cache_size;
856    assert(cache_size>0);
857    rte_mbuf_t * m=0;
858
859    uint32_t buf_size = CGenNodeCacheMbuf::get_object_size(cache_size);
860    CGenNodeCacheMbuf * p = (CGenNodeCacheMbuf *)malloc(buf_size);
861    assert(p);
862    memset(p,0,buf_size);
863
864    int i;
865    for (i=0; i<cache_size; i++) {
866        p->m_array[i] =  node->alloc_node_with_vm();
867    }
868    /* save const */
869    m=node->get_const_mbuf();
870    if (m) {
871        p->m_mbuf_const=m;
872        rte_pktmbuf_refcnt_update(m,1);
873    }
874
875    /* free all VM and const mbuf */
876    node->free_stl_vm_buf();
877
878    /* copy to local node meory */
879    node->cache_mbuf_array_copy(p,cache_size);
880
881    /* free the memory */
882    free(p);
883}
884
885
886void
887TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port,
888                                TrexStream * stream,
889                                TrexStreamsCompiledObj *comp) {
890
891    CGenNodeStateless *node = m_core->create_node_sl();
892
893    node->m_thread_id = m_thread_id;
894    node->cache_mbuf_array_init();
895    node->m_batch_size=0;
896
897    /* add periodic */
898    node->m_cache_mbuf=0;
899    node->m_type = CGenNode::STATELESS_PKT;
900
901    node->m_action_counter = stream->m_action_count;
902
903    /* clone the stream from control plane memory to DP memory */
904    node->m_ref_stream_info = stream->clone();
905    /* no need for this memory anymore on the control plane memory */
906    stream->release_dp_object();
907
908    node->m_next_stream=0; /* will be fixed later */
909
910    if ( stream->m_self_start ){
911        /* if self start it is in active mode */
912        node->m_state =CGenNodeStateless::ss_ACTIVE;
913        lp_port->m_active_streams++;
914    }else{
915        node->m_state =CGenNodeStateless::ss_INACTIVE;
916    }
917
918    node->m_time = m_core->m_cur_time_sec + stream->get_start_delay_sec();
919
920    pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(stream->m_port_id);
921    node->m_flags = 0;
922    node->m_src_port =0;
923    node->m_original_packet_data_prefix = 0;
924
925    if (stream->m_rx_check.m_enabled) {
926        node->set_stat_needed();
927        uint8_t hw_id = stream->m_rx_check.m_hw_id;
928        assert (hw_id < MAX_FLOW_STATS + MAX_FLOW_STATS_PAYLOAD);
929        node->set_stat_hw_id(hw_id);
930        // no support for cache with flow stat payload rules
931        if ((TrexPlatformApi::driver_stat_cap_e)stream->m_rx_check.m_rule_type == TrexPlatformApi::IF_STAT_PAYLOAD) {
932            stream->m_cache_size = 0;
933        }
934    }
935
936    /* set socket id */
937    node->set_socket_id(m_core->m_node_gen.m_socket_id);
938
939    /* build a mbuf from a packet */
940
941    uint16_t pkt_size = stream->m_pkt.len;
942    const uint8_t *stream_pkt = stream->m_pkt.binary;
943
944    node->m_pause =0;
945    node->m_stream_type = stream->m_type;
946    node->m_next_time_offset = 1.0 / stream->get_pps();
947    node->m_null_stream = (stream->m_null_stream ? 1 : 0);
948
949    /* stateless specific fields */
950    switch ( stream->m_type ) {
951
952    case TrexStream::stCONTINUOUS :
953        node->m_single_burst=0;
954        node->m_single_burst_refill=0;
955        node->m_multi_bursts=0;
956        break;
957
958    case TrexStream::stSINGLE_BURST :
959        node->m_stream_type             = TrexStream::stMULTI_BURST;
960        node->m_single_burst            = stream->m_burst_total_pkts;
961        node->m_single_burst_refill     = stream->m_burst_total_pkts;
962        node->m_multi_bursts            = 1;  /* single burst in multi burst of 1 */
963        break;
964
965    case TrexStream::stMULTI_BURST :
966        node->m_single_burst        = stream->m_burst_total_pkts;
967        node->m_single_burst_refill = stream->m_burst_total_pkts;
968        node->m_multi_bursts        = stream->m_num_bursts;
969        break;
970    default:
971
972        assert(0);
973    };
974
975    node->m_port_id = stream->m_port_id;
976
977    /* set dir 0 or 1 client or server */
978    node->set_mbuf_cache_dir(dir);
979
980
981    if (node->m_ref_stream_info->getDpVm() == NULL) {
982        /* no VM */
983
984        node->m_vm_flow_var =  NULL;
985        node->m_vm_program  =  NULL;
986        node->m_vm_program_size =0;
987
988                /* allocate const mbuf */
989        rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size);
990        assert(m);
991
992        char *p = rte_pktmbuf_append(m, pkt_size);
993        assert(p);
994        /* copy the packet */
995        memcpy(p,stream_pkt,pkt_size);
996
997        update_mac_addr(stream,node,dir,p);
998
999        /* set the packet as a readonly */
1000        node->set_cache_mbuf(m);
1001
1002        node->m_original_packet_data_prefix =0;
1003    }else{
1004
1005        /* set the program */
1006        TrexStream * local_mem_stream = node->m_ref_stream_info;
1007
1008        StreamVmDp  * lpDpVm = local_mem_stream->getDpVm();
1009
1010        node->m_vm_flow_var      = lpDpVm->clone_bss(); /* clone the flow var */
1011        node->m_vm_program       = lpDpVm->get_program(); /* same ref to the program */
1012        node->m_vm_program_size  = lpDpVm->get_program_size();
1013
1014        /* generate random seed if needed*/
1015        if (lpDpVm->is_random_seed()) {
1016            node->generate_random_seed();
1017        }
1018
1019        /* we need to copy the object */
1020        if ( pkt_size > lpDpVm->get_prefix_size() ) {
1021            /* we need const packet */
1022            uint16_t const_pkt_size  = pkt_size - lpDpVm->get_prefix_size() ;
1023            rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), const_pkt_size );
1024            assert(m);
1025
1026            char *p = rte_pktmbuf_append(m, const_pkt_size);
1027            assert(p);
1028
1029            /* copy packet data */
1030            memcpy(p,(stream_pkt + lpDpVm->get_prefix_size()),const_pkt_size);
1031
1032            node->set_const_mbuf(m);
1033        }
1034
1035
1036        if ( lpDpVm->is_pkt_size_var() ) {
1037            // mark the node as varible size
1038            node->set_var_pkt_size();
1039        }
1040
1041
1042        if (lpDpVm->get_prefix_size() > pkt_size ) {
1043            lpDpVm->set_prefix_size(pkt_size);
1044        }
1045
1046        /* copy the headr */
1047        uint16_t header_size = lpDpVm->get_prefix_size();
1048        assert(header_size);
1049        node->alloc_prefix_header(header_size);
1050        uint8_t *p=node->m_original_packet_data_prefix;
1051        assert(p);
1052
1053        memcpy(p,stream_pkt , header_size);
1054
1055        update_mac_addr(stream,node,dir,(char *)p);
1056
1057        if (stream->m_cache_size > 0 ) {
1058            /* we need to create cache of objects */
1059            replay_vm_into_cache(stream, node);
1060        }
1061    }
1062
1063
1064    CDpOneStream one_stream;
1065
1066    one_stream.m_dp_stream = node->m_ref_stream_info;
1067    one_stream.m_node =node;
1068
1069    lp_port->m_active_nodes.push_back(one_stream);
1070
1071    /* schedule only if active */
1072    if (node->m_state == CGenNodeStateless::ss_ACTIVE) {
1073        m_core->m_node_gen.add_node((CGenNode *)node);
1074    }
1075}
1076
1077void
1078TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj,
1079                                   double duration,
1080                                   int event_id) {
1081
1082
1083    TrexStatelessDpPerPort * lp_port=get_port_db(obj->get_port_id());
1084    lp_port->m_active_streams = 0;
1085    lp_port->set_event_id(event_id);
1086
1087    /* update cur time */
1088    if ( CGlobalInfo::is_realtime()  ){
1089        m_core->m_cur_time_sec = now_sec() + SCHD_OFFSET_DTIME ;
1090    }
1091
1092    /* no nodes in the list */
1093    assert(lp_port->m_active_nodes.size()==0);
1094
1095    for (auto single_stream : obj->get_objects()) {
1096        /* all commands should be for the same port */
1097        assert(obj->get_port_id() == single_stream.m_stream->m_port_id);
1098        add_stream(lp_port,single_stream.m_stream,obj);
1099    }
1100
1101    uint32_t nodes = lp_port->m_active_nodes.size();
1102    /* find next stream */
1103    assert(nodes == obj->get_objects().size());
1104
1105    int cnt=0;
1106
1107    /* set the next_stream pointer  */
1108    for (auto single_stream : obj->get_objects()) {
1109
1110        if (single_stream.m_stream->is_dp_next_stream() ) {
1111            int stream_id = single_stream.m_stream->m_next_stream_id;
1112            assert(stream_id<nodes);
1113            /* point to the next stream , stream_id is fixed */
1114            lp_port->m_active_nodes[cnt].m_node->m_next_stream = lp_port->m_active_nodes[stream_id].m_node ;
1115        }
1116        cnt++;
1117    }
1118
1119    lp_port->m_state =TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
1120    m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
1121
1122
1123    if ( duration > 0.0 ){
1124        add_port_duration( duration ,obj->get_port_id(),event_id );
1125    }
1126
1127}
1128
1129
1130bool TrexStatelessDpCore::are_all_ports_idle(){
1131
1132    bool res=true;
1133    int i;
1134    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
1135        if ( m_ports[i].m_state != TrexStatelessDpPerPort::ppSTATE_IDLE ){
1136            res=false;
1137        }
1138    }
1139    return (res);
1140}
1141
1142
1143void
1144TrexStatelessDpCore::resume_traffic(uint8_t port_id){
1145
1146    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1147
1148    lp_port->resume_traffic(port_id);
1149}
1150
1151
1152void
1153TrexStatelessDpCore::pause_traffic(uint8_t port_id){
1154
1155    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1156
1157    lp_port->pause_traffic(port_id);
1158}
1159
1160void
1161TrexStatelessDpCore::push_pcap(uint8_t port_id,
1162                               int event_id,
1163                               const std::string &pcap_filename,
1164                               double ipg_usec,
1165                               double speedup,
1166                               uint32_t count,
1167                               double duration,
1168                               bool is_dual) {
1169
1170    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1171
1172    lp_port->set_event_id(event_id);
1173
1174    /* delegate the command to the port */
1175    bool rc = lp_port->push_pcap(port_id, pcap_filename, ipg_usec, speedup, count, is_dual);
1176    if (!rc) {
1177        /* report back that we stopped */
1178        CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
1179        TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
1180                                                                       port_id,
1181                                                                       event_id,
1182                                                                       false);
1183        ring->Enqueue((CGenNode *)event_msg);
1184        return;
1185    }
1186
1187
1188    if (duration > 0.0) {
1189        add_port_duration(duration, port_id, event_id);
1190    }
1191
1192     m_state = TrexStatelessDpCore::STATE_PCAP_TX;
1193}
1194
1195void
1196TrexStatelessDpCore::update_traffic(uint8_t port_id, double factor) {
1197
1198    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1199
1200    lp_port->update_traffic(port_id, factor);
1201}
1202
1203
1204void
1205TrexStatelessDpCore::stop_traffic(uint8_t  port_id,
1206                                  bool     stop_on_id,
1207                                  int      event_id) {
1208    /* we cannot remove nodes not from the top of the queue so
1209       for every active node - make sure next time
1210       the scheduler invokes it, it will be free */
1211
1212    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1213    if ( lp_port->stop_traffic(port_id,stop_on_id,event_id) == false){
1214        return;
1215    }
1216
1217    /* flush the TX queue before sending done message to the CP */
1218    m_core->flush_tx_queue();
1219
1220    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
1221    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
1222                                                                   port_id,
1223                                                                   lp_port->get_event_id());
1224    ring->Enqueue((CGenNode *)event_msg);
1225
1226}
1227
1228/**
1229 * handle a message from CP to DP
1230 *
1231 */
1232void
1233TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) {
1234    msg->handle(this);
1235    delete msg;
1236}
1237
1238void
1239TrexStatelessDpCore::barrier(uint8_t port_id, int event_id) {
1240
1241    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
1242    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
1243                                                                   port_id,
1244                                                                   event_id);
1245    ring->Enqueue((CGenNode *)event_msg);
1246}
1247
1248
1249/**
1250 * PCAP node
1251 */
1252bool CGenNodePCAP::create(uint8_t port_id,
1253                          pkt_dir_t dir,
1254                          socket_id_t socket_id,
1255                          const uint8_t *mac_addr,
1256                          const std::string &pcap_filename,
1257                          double ipg_usec,
1258                          double speedup,
1259                          uint32_t count,
1260                          bool is_dual) {
1261    std::stringstream ss;
1262
1263    m_type       = CGenNode::PCAP_PKT;
1264    m_flags      = 0;
1265    m_src_port   = 0;
1266    m_port_id    = port_id;
1267    m_count      = count;
1268    m_is_dual    = is_dual;
1269
1270    /* mark this node as slow path */
1271    set_slow_path(true);
1272
1273    if (ipg_usec != -1) {
1274        /* fixed IPG */
1275        m_ipg_sec = usec_to_sec(ipg_usec / speedup);
1276        m_speedup = 0;
1277    } else {
1278        /* packet IPG */
1279        m_ipg_sec = -1;
1280        m_speedup  = speedup;
1281    }
1282
1283    /* copy MAC addr info */
1284    memcpy(m_mac_addr, mac_addr, 12);
1285
1286    /* set the dir */
1287    set_mbuf_dir(dir);
1288    set_socket_id(socket_id);
1289
1290    /* create the PCAP reader */
1291    m_reader = CCapReaderFactory::CreateReader((char *)pcap_filename.c_str(), 0, ss);
1292    if (!m_reader) {
1293        return false;
1294    }
1295
1296    m_raw_packet = new CCapPktRaw();
1297    if ( m_reader->ReadPacket(m_raw_packet) == false ){
1298        /* handle error */
1299        delete m_reader;
1300        return (false);
1301    }
1302
1303    /* this is the reference time */
1304    //m_base_time = m_raw_packet->get_time();
1305    m_last_pkt_time = m_raw_packet->get_time();
1306
1307    /* ready */
1308    m_state = PCAP_ACTIVE;
1309
1310    return true;
1311}
1312
1313/**
1314 * cleanup for PCAP node
1315 *
1316 * @author imarom (08-May-16)
1317 */
1318void CGenNodePCAP::destroy() {
1319
1320    if (m_raw_packet) {
1321        delete m_raw_packet;
1322        m_raw_packet = NULL;
1323    }
1324
1325    if (m_reader) {
1326        delete m_reader;
1327        m_reader = NULL;
1328    }
1329
1330    m_state = PCAP_INVALID;
1331}
1332
1333