trex_stateless_dp_core.cpp revision e946a09e
1/*
2 Itay Marom
3 Hanoch Haim
4 Cisco Systems, Inc.
5*/
6
7/*
8Copyright (c) 2015-2016 Cisco Systems, Inc.
9
10Licensed under the Apache License, Version 2.0 (the "License");
11you may not use this file except in compliance with the License.
12You may obtain a copy of the License at
13
14    http://www.apache.org/licenses/LICENSE-2.0
15
16Unless required by applicable law or agreed to in writing, software
17distributed under the License is distributed on an "AS IS" BASIS,
18WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19See the License for the specific language governing permissions and
20limitations under the License.
21*/
22#include "bp_sim.h"
23#include "trex_stateless_dp_core.h"
24#include "trex_stateless_messaging.h"
25#include "trex_stream.h"
26#include "trex_stream_node.h"
27#include "trex_streams_compiler.h"
28#include "mbuf.h"
29
30
31
32
33void CGenNodeStateless::cache_mbuf_array_init(){
34    m_cache_size=0;
35    m_cache_array_cnt=0;
36}
37
38
39
40void CGenNodeStateless::cache_mbuf_array_copy(CGenNodeCacheMbuf *obj,
41                                              uint16_t size){
42
43    int i;
44    cache_mbuf_array_alloc(size);
45    for (i=0; i<size; i++) {
46        cache_mbuf_array_set(i,obj->m_array[i]);
47    }
48    cache_mbuf_array_set_const_mbuf(obj->m_mbuf_const);
49}
50
51
52rte_mbuf_t ** CGenNodeStateless::cache_mbuf_array_alloc(uint16_t size){
53
54    uint32_t buf_size = CGenNodeCacheMbuf::get_object_size(size);
55    /* TBD  replace with align, zero API */
56    m_cache_mbuf = (void *)malloc(buf_size);
57    assert(m_cache_mbuf);
58    memset(m_cache_mbuf,0,buf_size);
59
60    m_flags |= SL_NODE_CONST_MBUF_CACHE_ARRAY;
61    m_cache_size=size;
62    m_cache_array_cnt=0;
63    return ((rte_mbuf_t **)m_cache_mbuf);
64}
65
66void CGenNodeStateless::cache_mbuf_array_free(){
67
68    assert(m_cache_mbuf);
69    int i;
70    for (i=0; i<(int)m_cache_size; i++) {
71        rte_mbuf_t * m=cache_mbuf_array_get((uint16_t)i);
72        assert(m);
73        rte_pktmbuf_free(m);
74    }
75
76    /* free the const */
77    rte_mbuf_t * m=cache_mbuf_array_get_const_mbuf() ;
78    if (m) {
79        rte_pktmbuf_free(m);
80    }
81
82    free(m_cache_mbuf);
83    m_cache_mbuf=0;
84}
85
86
87rte_mbuf_t * CGenNodeStateless::cache_mbuf_array_get(uint16_t index){
88
89    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
90    return (p->m_array[index]);
91}
92
93void CGenNodeStateless::cache_mbuf_array_set_const_mbuf(rte_mbuf_t * m){
94    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
95    p->m_mbuf_const=m;
96}
97
98rte_mbuf_t * CGenNodeStateless::cache_mbuf_array_get_const_mbuf(){
99    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
100    return (p->m_mbuf_const);
101}
102
103
104void CGenNodeStateless::cache_mbuf_array_set(uint16_t index,
105                                             rte_mbuf_t * m){
106    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
107    p->m_array[index]=m;
108}
109
110
111void CDpOneStream::Delete(CFlowGenListPerThread   * core){
112    assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE);
113    core->free_node((CGenNode *)m_node);
114    delete m_dp_stream;
115    m_node=0;
116    m_dp_stream=0;
117}
118
119void CDpOneStream::DeleteOnlyStream(){
120    assert(m_dp_stream);
121    delete m_dp_stream;
122    m_dp_stream=0;
123}
124
125int CGenNodeStateless::get_stream_id(){
126    if (m_state ==CGenNodeStateless::ss_FREE_RESUSE) {
127        return (-1); // not valid
128    }
129    assert(m_ref_stream_info);
130    return ((int)m_ref_stream_info->m_stream_id);
131}
132
133
134void CGenNodeStateless::DumpHeader(FILE *fd){
135    fprintf(fd," pkt_id, time, port , action , state, stream_id , stype , m-burst# , burst# \n");
136
137}
138void CGenNodeStateless::Dump(FILE *fd){
139    fprintf(fd," %2.4f, %3lu, %s,%s, %3d, %s, %3lu, %3lu  \n",
140            m_time,
141            (ulong)m_port_id,
142            "s-pkt", //action
143            get_stream_state_str(m_state ).c_str(),
144            get_stream_id(),   //stream_id
145            TrexStream::get_stream_type_str(m_stream_type).c_str(), //stype
146            (ulong)m_multi_bursts,
147            (ulong)m_single_burst
148            );
149}
150
151
152void CGenNodeStateless::generate_random_seed() {
153    /* seed can be provided by the user */
154    uint32_t unique_seed;
155    if (m_ref_stream_info->m_random_seed) {
156        unique_seed = m_ref_stream_info->m_random_seed;
157    } else {
158        unsigned int tmp = (unsigned int)time(NULL);
159        unique_seed = rand_r(&tmp);
160    }
161
162    /* per thread divergence */
163    unique_seed = (unique_seed * ( (m_thread_id + 1) * 514229 ) ) & 0xFFFFFFFF;
164
165    /* set random */
166    set_random_seed(unique_seed);
167}
168
169
170void CGenNodeStateless::refresh_vm_bss() {
171    if ( m_vm_flow_var ) {
172        StreamVmDp  * vm_s=m_ref_stream_info->m_vm_dp;
173        assert(vm_s);
174        memcpy(m_vm_flow_var,vm_s->get_bss(),vm_s->get_bss_size());
175
176        if ( vm_s->is_random_seed() ) {
177            generate_random_seed();
178        }
179
180    }
181}
182
183
184
185/**
186 * this function called when stream restart after it was inactive
187 */
188void CGenNodeStateless::refresh(){
189
190    /* refill the stream info */
191    m_single_burst    = m_single_burst_refill;
192    m_multi_bursts    = m_ref_stream_info->m_num_bursts;
193    m_state           = CGenNodeStateless::ss_ACTIVE;
194
195    /* refresh init value */
196#if 0
197    /* TBD should add a JSON varible for that */
198    refresh_vm_bss();
199#endif
200}
201
202
203void CGenNodeCommand::free_command(){
204
205    assert(m_cmd);
206    m_cmd->on_node_remove();
207    delete m_cmd;
208}
209
210
211std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state){
212    std::string res;
213
214    switch (stream_state) {
215    case CGenNodeStateless::ss_FREE_RESUSE :
216         res="FREE    ";
217        break;
218    case CGenNodeStateless::ss_INACTIVE :
219        res="INACTIVE ";
220        break;
221    case CGenNodeStateless::ss_ACTIVE :
222        res="ACTIVE   ";
223        break;
224    default:
225        res="Unknow   ";
226    };
227    return(res);
228}
229
230/*
231 * Allocate mbuf for flow stat (and latency) info sending
232 * m - Original mbuf (can be complicated mbuf data structure)
233 * fsp_head - return pointer in which the flow stat info should be filled
234 * is_const - is the given mbuf const
235 * return new mbuf structure in which the fsp_head can be written. If needed, orginal mbuf is freed.
236 */
237rte_mbuf_t * CGenNodeStateless::alloc_flow_stat_mbuf(rte_mbuf_t *m, struct flow_stat_payload_header *&fsp_head
238                                                     , bool is_const) {
239    rte_mbuf_t *m_ret = NULL, *m_lat = NULL;
240    uint16_t fsp_head_size = sizeof(struct flow_stat_payload_header);
241
242    if (is_const) {
243        // const mbuf case
244        if (rte_pktmbuf_data_len(m) > 128) {
245            m_ret = CGlobalInfo::pktmbuf_alloc_small(get_socket_id());
246            assert(m_ret);
247            // alloc mbuf just for the latency header
248            m_lat = CGlobalInfo::pktmbuf_alloc( get_socket_id(), fsp_head_size);
249            assert(m_lat);
250            fsp_head = (struct flow_stat_payload_header *)rte_pktmbuf_append(m_lat, fsp_head_size);
251            rte_pktmbuf_attach(m_ret, m);
252            rte_pktmbuf_trim(m_ret, sizeof(struct flow_stat_payload_header));
253            utl_rte_pktmbuf_add_after2(m_ret, m_lat);
254            // ref count was updated when we took the (const) mbuf, and again in rte_pktmbuf_attach
255            // so need do decrease now, to avoid leak.
256            rte_pktmbuf_refcnt_update(m, -1);
257            return m_ret;
258        } else {
259            // Short packet. Just copy all bytes.
260            m_ret = CGlobalInfo::pktmbuf_alloc( get_socket_id(), rte_pktmbuf_data_len(m) );
261            assert(m_ret);
262            char *p = rte_pktmbuf_mtod(m, char*);
263            char *p_new = rte_pktmbuf_append(m_ret, rte_pktmbuf_data_len(m));
264            memcpy(p_new , p, rte_pktmbuf_data_len(m));
265            fsp_head = (struct flow_stat_payload_header *)(p_new + rte_pktmbuf_data_len(m) - fsp_head_size);
266            rte_pktmbuf_free(m);
267            return m_ret;
268        }
269    } else {
270        // Field engine (vm)
271        if (rte_pktmbuf_is_contiguous(m)) {
272            // one, r/w mbuf
273            char *p = rte_pktmbuf_mtod(m, char*);
274            fsp_head = (struct flow_stat_payload_header *)(p + rte_pktmbuf_data_len(m) - fsp_head_size);
275            return m;
276        } else {
277            // We have: r/w --> read only.
278            // Changing to:
279            // (original) r/w -> (new) indirect (direct is original read_only, after trimming last bytes) -> (new) latency info
280            rte_mbuf_t *m_read_only = m->next, *m_indirect;
281
282            m_indirect = CGlobalInfo::pktmbuf_alloc_small(get_socket_id());
283            assert(m_indirect);
284            // alloc mbuf just for the latency header
285            m_lat = CGlobalInfo::pktmbuf_alloc( get_socket_id(), fsp_head_size);
286            assert(m_lat);
287            fsp_head = (struct flow_stat_payload_header *)rte_pktmbuf_append(m_lat, fsp_head_size);
288            utl_rte_pktmbuf_chain_with_indirect(m, m_indirect, m_read_only, m_lat);
289            m_indirect->data_len = (uint16_t)(m_indirect->data_len - fsp_head_size);
290            return m;
291        }
292    }
293}
294
295// test the const case of alloc_flow_stat_mbuf. The more complicated non const case is tested in the simulation.
296bool CGenNodeStateless::alloc_flow_stat_mbuf_test_const() {
297    rte_mbuf_t *m, *m_test;
298    uint16_t sizes[2] = {64, 500};
299    uint16_t size;
300    struct flow_stat_payload_header *fsp_head;
301    char *p;
302
303    set_socket_id(0);
304    for (int test_num = 0; test_num < sizeof(sizes)/sizeof(sizes[0]); test_num++) {
305        size = sizes[test_num];
306        m = CGlobalInfo::pktmbuf_alloc(get_socket_id(), size);
307        p = rte_pktmbuf_append(m, size);
308        for (int i = 0; i < size; i++) {
309            p[i] = (char)i;
310        }
311        m_test = alloc_flow_stat_mbuf(m, fsp_head, true);
312        p = rte_pktmbuf_mtod(m_test, char*);
313        assert(rte_pktmbuf_pkt_len(m_test) == size);
314        for (int i = 0; i < rte_pktmbuf_pkt_len(m_test) - sizeof(*fsp_head); i++) {
315            assert(p[i] == (char)i);
316        }
317        // verify fsp_head points correctly
318        if (size > 128) { // should match threshould in alloc_flow_stat_mbuf
319            assert(rte_pktmbuf_data_len(m_test) == size - sizeof(*fsp_head));
320            assert(rte_pktmbuf_data_len(m_test->next) == sizeof(*fsp_head));
321            assert((char *)fsp_head == rte_pktmbuf_mtod((m_test->next), char*));
322        } else {
323            assert(rte_pktmbuf_data_len(m_test) == size);
324            assert (((char *)fsp_head) + sizeof (*fsp_head) == p + rte_pktmbuf_data_len(m_test));
325        }
326        rte_pktmbuf_free(m_test);
327    }
328    return true;
329}
330
331rte_mbuf_t   * CGenNodeStateless::alloc_node_with_vm(){
332
333    rte_mbuf_t        * m;
334    /* alloc small packet buffer*/
335    uint16_t prefix_size = prefix_header_size();
336    m = CGlobalInfo::pktmbuf_alloc( get_socket_id(), prefix_size );
337    if (m==0) {
338        return (m);
339    }
340    /* TBD remove this, should handle cases of error */
341    assert(m);
342    char *p=rte_pktmbuf_append(m, prefix_size);
343    memcpy( p ,m_original_packet_data_prefix, prefix_size);
344
345
346    /* run the VM program */
347    StreamDPVmInstructionsRunner runner;
348
349    runner.run( (uint32_t*)m_vm_flow_var,
350                m_vm_program_size,
351                m_vm_program,
352                m_vm_flow_var,
353                (uint8_t*)p);
354
355    uint16_t pkt_new_size=runner.get_new_pkt_size();
356    if ( likely( pkt_new_size == 0) ) {
357        /* no packet size change */
358        rte_mbuf_t * m_const = get_const_mbuf();
359        if (  m_const != NULL) {
360            utl_rte_pktmbuf_add_after(m,m_const);
361        }
362        return (m);
363    }
364
365    /* packet size change there are a few changes */
366    rte_mbuf_t * m_const = get_const_mbuf();
367    if ( (m_const == 0 ) || (pkt_new_size<=prefix_size) ) {
368        /* one mbuf , just trim it */
369        m->data_len = pkt_new_size;
370        m->pkt_len  = pkt_new_size;
371        return (m);
372    }
373
374    rte_mbuf_t * mi= CGlobalInfo::pktmbuf_alloc_small(get_socket_id());
375    assert(mi);
376    rte_pktmbuf_attach(mi,m_const);
377    utl_rte_pktmbuf_add_after2(m,mi);
378
379    if ( pkt_new_size < m->pkt_len) {
380        /* need to trim it */
381        mi->data_len = (pkt_new_size - prefix_size);
382        m->pkt_len   = pkt_new_size;
383    }
384    return (m);
385}
386
387void CGenNodeStateless::free_stl_vm_buf(){
388        rte_mbuf_t * m ;
389         m=get_const_mbuf();
390         if (m) {
391             rte_pktmbuf_free(m); /* reduce the ref counter */
392             /* clear the const marker */
393             clear_const_mbuf();
394         }
395
396         free_prefix_header();
397
398         if (m_vm_flow_var) {
399             /* free flow var */
400             free(m_vm_flow_var);
401             m_vm_flow_var=0;
402         }
403}
404
405
406
407void CGenNodeStateless::free_stl_node(){
408
409    if ( is_cache_mbuf_array() ){
410        /* do we have cache of mbuf pre allocated */
411        cache_mbuf_array_free();
412    }else{
413        /* if we have cache mbuf free it */
414        rte_mbuf_t * m=get_cache_mbuf();
415        if (m) {
416                rte_pktmbuf_free(m);
417                m_cache_mbuf=0;
418        }
419    }
420    free_stl_vm_buf();
421}
422
423
424bool TrexStatelessDpPerPort::update_number_of_active_streams(uint32_t d){
425    m_active_streams-=d; /* reduce the number of streams */
426    if (m_active_streams == 0) {
427        return (true);
428    }
429    return (false);
430}
431
432bool TrexStatelessDpPerPort::resume_traffic(uint8_t port_id){
433
434    /* we are working with continues streams so we must be in transmit mode */
435    assert(m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE);
436
437    for (auto dp_stream : m_active_nodes) {
438        CGenNodeStateless * node =dp_stream.m_node;
439        assert(node->get_port_id() == port_id);
440        assert(node->is_pause() == true);
441        node->set_pause(false);
442    }
443    m_state = TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
444    return (true);
445}
446
447bool TrexStatelessDpPerPort::update_traffic(uint8_t port_id, double factor) {
448
449    assert( (m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING ||
450            (m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE)) );
451
452    for (auto dp_stream : m_active_nodes) {
453        CGenNodeStateless * node = dp_stream.m_node;
454        assert(node->get_port_id() == port_id);
455
456        node->update_rate(factor);
457    }
458
459    return (true);
460}
461
462bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){
463
464    /* we are working with continues streams so we must be in transmit mode */
465    assert(m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING);
466
467    for (auto dp_stream : m_active_nodes) {
468        CGenNodeStateless * node =dp_stream.m_node;
469        assert(node->get_port_id() == port_id);
470        assert(node->is_pause() == false);
471        node->set_pause(true);
472    }
473    m_state = TrexStatelessDpPerPort::ppSTATE_PAUSE;
474    return (true);
475}
476
477bool TrexStatelessDpPerPort::push_pcap(uint8_t port_id,
478                                       const std::string &pcap_filename,
479                                       double ipg_usec,
480                                       double speedup,
481                                       uint32_t count) {
482
483    /* push pcap can only happen on an idle port from the core prespective */
484    assert(m_state == TrexStatelessDpPerPort::ppSTATE_IDLE);
485
486    CGenNodePCAP *pcap_node = m_core->allocate_pcap_node();
487    if (!pcap_node) {
488        return (false);
489    }
490
491    pkt_dir_t dir          = m_core->m_node_gen.m_v_if->port_id_to_dir(port_id);
492    socket_id_t socket_id  = m_core->m_node_gen.m_socket_id;
493
494    uint8_t mac_addr[12];
495    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, mac_addr);
496
497    bool rc = pcap_node->create(port_id,
498                                dir,
499                                socket_id,
500                                mac_addr,
501                                pcap_filename,
502                                ipg_usec,
503                                speedup,
504                                count);
505    if (!rc) {
506        m_core->free_node((CGenNode *)pcap_node);
507        return (false);
508    }
509
510    /* schedule the node for now */
511    pcap_node->m_time = m_core->m_cur_time_sec;
512    m_core->m_node_gen.add_node((CGenNode *)pcap_node);
513
514    /* hold a pointer to the node */
515    assert(m_active_pcap_node == NULL);
516    m_active_pcap_node = pcap_node;
517
518    m_state = TrexStatelessDpPerPort::ppSTATE_PCAP_TX;
519    return (true);
520}
521
522
523bool TrexStatelessDpPerPort::stop_traffic(uint8_t  port_id,
524                                          bool     stop_on_id,
525                                          int      event_id){
526
527
528    if (m_state == TrexStatelessDpPerPort::ppSTATE_IDLE) {
529        assert(m_active_streams==0);
530        return false;
531    }
532
533    /* there could be race of stop after stop */
534    if ( stop_on_id ) {
535        if (event_id != m_event_id){
536            /* we can't stop it is an old message */
537            return false;
538        }
539    }
540
541    for (auto dp_stream : m_active_nodes) {
542        CGenNodeStateless * node =dp_stream.m_node;
543        assert(node->get_port_id() == port_id);
544        if ( node->get_state() == CGenNodeStateless::ss_ACTIVE) {
545            node->mark_for_free();
546            m_active_streams--;
547            dp_stream.DeleteOnlyStream();
548
549        }else{
550            dp_stream.Delete(m_core);
551        }
552    }
553
554    /* check for active PCAP node */
555    if (m_active_pcap_node) {
556        /* when got async stop from outside or duration */
557        if (m_active_pcap_node->is_active()) {
558            m_active_pcap_node->mark_for_free();
559        } else {
560            /* graceful stop - node was put out by the scheduler */
561            m_core->free_node( (CGenNode *)m_active_pcap_node);
562        }
563
564        m_active_pcap_node = NULL;
565    }
566
567    /* active stream should be zero */
568    assert(m_active_streams==0);
569    m_active_nodes.clear();
570    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
571    return (true);
572}
573
574
575void TrexStatelessDpPerPort::create(CFlowGenListPerThread   *  core){
576    m_core=core;
577    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
578    m_active_streams=0;
579    m_active_nodes.clear();
580    m_active_pcap_node = NULL;
581}
582
583
584
585void
586TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) {
587    m_thread_id = thread_id;
588    m_core = core;
589    m_local_port_offset = 2*core->getDualPortId();
590
591    CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
592
593    m_ring_from_cp = cp_dp->getRingCpToDp(thread_id);
594    m_ring_to_cp   = cp_dp->getRingDpToCp(thread_id);
595
596    m_state = STATE_IDLE;
597
598    int i;
599    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
600        m_ports[i].create(core);
601    }
602}
603
604
605/* move to the next stream, old stream move to INACTIVE */
606bool TrexStatelessDpCore::set_stateless_next_node(CGenNodeStateless * cur_node,
607                                                  CGenNodeStateless * next_node){
608
609    assert(cur_node);
610    TrexStatelessDpPerPort * lp_port = get_port_db(cur_node->m_port_id);
611    bool schedule =false;
612
613    bool to_stop_port=false;
614
615    if (next_node == NULL) {
616        /* there is no next stream , reduce the number of active streams*/
617        to_stop_port = lp_port->update_number_of_active_streams(1);
618
619    }else{
620        uint8_t state=next_node->get_state();
621
622        /* can't be FREE_RESUSE */
623        assert(state != CGenNodeStateless::ss_FREE_RESUSE);
624        if (state == CGenNodeStateless::ss_INACTIVE ) {
625
626            if (cur_node->m_action_counter > 0) {
627                cur_node->m_action_counter--;
628                if (cur_node->m_action_counter==0) {
629                    to_stop_port = lp_port->update_number_of_active_streams(1);
630                }else{
631                    /* refill start info and scedule, no update in active streams  */
632                    next_node->refresh();
633                    schedule = true;
634                }
635            }else{
636                /* refill start info and scedule, no update in active streams  */
637                next_node->refresh();
638                schedule = true;
639            }
640
641        }else{
642            to_stop_port = lp_port->update_number_of_active_streams(1);
643        }
644    }
645
646    if ( to_stop_port ) {
647        /* call stop port explictly to move the state */
648        stop_traffic(cur_node->m_port_id,false,0);
649    }
650
651    return ( schedule );
652}
653
654
655
656/**
657 * in idle state loop, the processor most of the time sleeps
658 * and periodically checks for messages
659 *
660 * @author imarom (01-Nov-15)
661 */
662void
663TrexStatelessDpCore::idle_state_loop() {
664
665    const int SHORT_DELAY_MS    = 2;
666    const int LONG_DELAY_MS     = 50;
667    const int DEEP_SLEEP_LIMIT  = 2000;
668
669    int counter = 0;
670
671    while (m_state == STATE_IDLE) {
672        m_core->tickle();
673        m_core->m_node_gen.m_v_if->flush_dp_rx_queue();
674        bool had_msg = periodic_check_for_cp_messages();
675        if (had_msg) {
676            counter = 0;
677            continue;
678        }
679
680        /* enter deep sleep only if enough time had passed */
681        if (counter < DEEP_SLEEP_LIMIT) {
682            delay(SHORT_DELAY_MS);
683            counter++;
684        } else {
685            delay(LONG_DELAY_MS);
686        }
687
688    }
689}
690
691
692
693void TrexStatelessDpCore::quit_main_loop(){
694    m_core->set_terminate_mode(true); /* mark it as terminated */
695    m_state = STATE_TERMINATE;
696    add_global_duration(0.0001);
697}
698
699
700/**
701 * scehduler runs when traffic exists
702 * it will return when no more transmitting is done on this
703 * core
704 *
705 * @author imarom (01-Nov-15)
706 */
707void
708TrexStatelessDpCore::start_scheduler() {
709    /* creates a maintenace job using the scheduler */
710    CGenNode * node_sync = m_core->create_node() ;
711    node_sync->m_type = CGenNode::FLOW_SYNC;
712    node_sync->m_time = m_core->m_cur_time_sec + SYNC_TIME_OUT;
713
714    m_core->m_node_gen.add_node(node_sync);
715
716    double old_offset = 0.0;
717    m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset);
718    /* bail out in case of terminate */
719    if (m_state != TrexStatelessDpCore::STATE_TERMINATE) {
720        m_core->m_node_gen.close_file(m_core);
721        m_state = STATE_IDLE; /* we exit from all ports and we have nothing to do, we move to IDLE state */
722    }
723}
724
725
726void
727TrexStatelessDpCore::run_once(){
728
729    idle_state_loop();
730
731    if ( m_state == STATE_TERMINATE ){
732        return;
733    }
734
735    start_scheduler();
736}
737
738
739
740
741void
742TrexStatelessDpCore::start() {
743
744    while (true) {
745        run_once();
746
747        if ( m_core->is_terminated_by_master() ) {
748            break;
749        }
750    }
751}
752
753/* only if both port are idle we can exit */
754void
755TrexStatelessDpCore::schedule_exit(){
756
757    CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
758
759    node->m_type = CGenNode::COMMAND;
760
761    node->m_cmd = new TrexStatelessDpCanQuit();
762
763    /* make sure it will be scheduled after the current node */
764    node->m_time = m_core->m_cur_time_sec ;
765
766    m_core->m_node_gen.add_node((CGenNode *)node);
767}
768
769
770void
771TrexStatelessDpCore::add_global_duration(double duration){
772    if (duration > 0.0) {
773        CGenNode *node = m_core->create_node() ;
774
775        node->m_type = CGenNode::EXIT_SCHED;
776
777        /* make sure it will be scheduled after the current node */
778        node->m_time = m_core->m_cur_time_sec + duration ;
779
780        m_core->m_node_gen.add_node(node);
781    }
782}
783
784/* add per port exit */
785void
786TrexStatelessDpCore::add_port_duration(double duration,
787                                       uint8_t port_id,
788                                       int event_id){
789    if (duration > 0.0) {
790        CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
791
792        node->m_type = CGenNode::COMMAND;
793
794        /* make sure it will be scheduled after the current node */
795        node->m_time = m_core->m_cur_time_sec + duration ;
796
797        TrexStatelessDpStop * cmd=new TrexStatelessDpStop(port_id);
798
799
800        /* test this */
801        m_core->m_non_active_nodes++;
802        cmd->set_core_ptr(m_core);
803        cmd->set_event_id(event_id);
804        cmd->set_wait_for_event_id(true);
805
806        node->m_cmd = cmd;
807
808        m_core->m_node_gen.add_node((CGenNode *)node);
809    }
810}
811
812
813void TrexStatelessDpCore::update_mac_addr(TrexStream * stream,
814                                          CGenNodeStateless *node,
815                                          pkt_dir_t dir,
816                                          char *raw_pkt){
817    bool              ov_src = stream->get_override_src_mac_by_pkt_data();
818    TrexStream::stream_dst_mac_t  ov_dst = stream->get_override_dst_mac_mode();
819
820
821    if ( (ov_src == true) && (ov_dst == TrexStream::stPKT) ) {
822        /* nothing to do, take from the packet both */
823        return;
824    }
825
826        /* take from cfg_file */
827    if ( (ov_src == false) &&
828         (ov_dst == TrexStream::stCFG_FILE) ){
829
830          m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*)raw_pkt);
831          return;
832    }
833
834    /* save the pkt*/
835    char tmp_pkt[12];
836    memcpy(tmp_pkt,raw_pkt,12);
837
838    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*)raw_pkt);
839
840    if ((ov_src == true) && (ov_dst == TrexStream::stCFG_FILE)) {
841        memcpy(raw_pkt+6,tmp_pkt+6,6);
842    }
843
844    if ((ov_src == false) && (ov_dst == TrexStream::stPKT)) {
845        memcpy(raw_pkt,tmp_pkt,6);
846    }
847}
848
849
850void TrexStatelessDpCore::replay_vm_into_cache(TrexStream * stream,
851                                               CGenNodeStateless *node){
852
853    uint16_t      cache_size = stream->m_cache_size;
854    assert(cache_size>0);
855    rte_mbuf_t * m=0;
856
857    uint32_t buf_size = CGenNodeCacheMbuf::get_object_size(cache_size);
858    CGenNodeCacheMbuf * p = (CGenNodeCacheMbuf *)malloc(buf_size);
859    assert(p);
860    memset(p,0,buf_size);
861
862    int i;
863    for (i=0; i<cache_size; i++) {
864        p->m_array[i] =  node->alloc_node_with_vm();
865    }
866    /* save const */
867    m=node->get_const_mbuf();
868    if (m) {
869        p->m_mbuf_const=m;
870        rte_pktmbuf_refcnt_update(m,1);
871    }
872
873    /* free all VM and const mbuf */
874    node->free_stl_vm_buf();
875
876    /* copy to local node meory */
877    node->cache_mbuf_array_copy(p,cache_size);
878
879    /* free the memory */
880    free(p);
881}
882
883
884void
885TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port,
886                                TrexStream * stream,
887                                TrexStreamsCompiledObj *comp) {
888
889    CGenNodeStateless *node = m_core->create_node_sl();
890
891    node->m_thread_id = m_thread_id;
892    node->cache_mbuf_array_init();
893    node->m_batch_size=0;
894
895    /* add periodic */
896    node->m_cache_mbuf=0;
897    node->m_type = CGenNode::STATELESS_PKT;
898
899    node->m_action_counter = stream->m_action_count;
900
901    /* clone the stream from control plane memory to DP memory */
902    node->m_ref_stream_info = stream->clone();
903    /* no need for this memory anymore on the control plane memory */
904    stream->release_dp_object();
905
906    node->m_next_stream=0; /* will be fixed later */
907
908    if ( stream->m_self_start ){
909        /* if self start it is in active mode */
910        node->m_state =CGenNodeStateless::ss_ACTIVE;
911        lp_port->m_active_streams++;
912    }else{
913        node->m_state =CGenNodeStateless::ss_INACTIVE;
914    }
915
916    node->m_time = m_core->m_cur_time_sec + stream->get_start_delay_sec();
917
918    pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(stream->m_port_id);
919    node->m_flags = 0;
920    node->m_src_port =0;
921    node->m_original_packet_data_prefix = 0;
922
923    if (stream->m_rx_check.m_enabled) {
924        node->set_stat_needed();
925        uint8_t hw_id = stream->m_rx_check.m_hw_id;
926        assert (hw_id < MAX_FLOW_STATS + MAX_FLOW_STATS_PAYLOAD);
927        node->set_stat_hw_id(hw_id);
928        // no support for cache with flow stat payload rules
929        if ((TrexPlatformApi::driver_stat_cap_e)stream->m_rx_check.m_rule_type == TrexPlatformApi::IF_STAT_PAYLOAD) {
930            stream->m_cache_size = 0;
931        }
932    }
933
934    /* set socket id */
935    node->set_socket_id(m_core->m_node_gen.m_socket_id);
936
937    /* build a mbuf from a packet */
938
939    uint16_t pkt_size = stream->m_pkt.len;
940    const uint8_t *stream_pkt = stream->m_pkt.binary;
941
942    node->m_pause =0;
943    node->m_stream_type = stream->m_type;
944    node->m_next_time_offset = 1.0 / stream->get_pps();
945    node->m_null_stream = (stream->m_null_stream ? 1 : 0);
946
947    /* stateless specific fields */
948    switch ( stream->m_type ) {
949
950    case TrexStream::stCONTINUOUS :
951        node->m_single_burst=0;
952        node->m_single_burst_refill=0;
953        node->m_multi_bursts=0;
954        break;
955
956    case TrexStream::stSINGLE_BURST :
957        node->m_stream_type             = TrexStream::stMULTI_BURST;
958        node->m_single_burst            = stream->m_burst_total_pkts;
959        node->m_single_burst_refill     = stream->m_burst_total_pkts;
960        node->m_multi_bursts            = 1;  /* single burst in multi burst of 1 */
961        break;
962
963    case TrexStream::stMULTI_BURST :
964        node->m_single_burst        = stream->m_burst_total_pkts;
965        node->m_single_burst_refill = stream->m_burst_total_pkts;
966        node->m_multi_bursts        = stream->m_num_bursts;
967        break;
968    default:
969
970        assert(0);
971    };
972
973    node->m_port_id = stream->m_port_id;
974
975    /* set dir 0 or 1 client or server */
976    node->set_mbuf_cache_dir(dir);
977
978
979    if (node->m_ref_stream_info->getDpVm() == NULL) {
980        /* no VM */
981
982        node->m_vm_flow_var =  NULL;
983        node->m_vm_program  =  NULL;
984        node->m_vm_program_size =0;
985
986                /* allocate const mbuf */
987        rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size);
988        assert(m);
989
990        char *p = rte_pktmbuf_append(m, pkt_size);
991        assert(p);
992        /* copy the packet */
993        memcpy(p,stream_pkt,pkt_size);
994
995        update_mac_addr(stream,node,dir,p);
996
997        /* set the packet as a readonly */
998        node->set_cache_mbuf(m);
999
1000        node->m_original_packet_data_prefix =0;
1001    }else{
1002
1003        /* set the program */
1004        TrexStream * local_mem_stream = node->m_ref_stream_info;
1005
1006        StreamVmDp  * lpDpVm = local_mem_stream->getDpVm();
1007
1008        node->m_vm_flow_var      = lpDpVm->clone_bss(); /* clone the flow var */
1009        node->m_vm_program       = lpDpVm->get_program(); /* same ref to the program */
1010        node->m_vm_program_size  = lpDpVm->get_program_size();
1011
1012        /* generate random seed if needed*/
1013        if (lpDpVm->is_random_seed()) {
1014            node->generate_random_seed();
1015        }
1016
1017        /* we need to copy the object */
1018        if ( pkt_size > lpDpVm->get_prefix_size() ) {
1019            /* we need const packet */
1020            uint16_t const_pkt_size  = pkt_size - lpDpVm->get_prefix_size() ;
1021            rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), const_pkt_size );
1022            assert(m);
1023
1024            char *p = rte_pktmbuf_append(m, const_pkt_size);
1025            assert(p);
1026
1027            /* copy packet data */
1028            memcpy(p,(stream_pkt + lpDpVm->get_prefix_size()),const_pkt_size);
1029
1030            node->set_const_mbuf(m);
1031        }
1032
1033
1034        if ( lpDpVm->is_pkt_size_var() ) {
1035            // mark the node as varible size
1036            node->set_var_pkt_size();
1037        }
1038
1039
1040        if (lpDpVm->get_prefix_size() > pkt_size ) {
1041            lpDpVm->set_prefix_size(pkt_size);
1042        }
1043
1044        /* copy the headr */
1045        uint16_t header_size = lpDpVm->get_prefix_size();
1046        assert(header_size);
1047        node->alloc_prefix_header(header_size);
1048        uint8_t *p=node->m_original_packet_data_prefix;
1049        assert(p);
1050
1051        memcpy(p,stream_pkt , header_size);
1052
1053        update_mac_addr(stream,node,dir,(char *)p);
1054
1055        if (stream->m_cache_size > 0 ) {
1056            /* we need to create cache of objects */
1057            replay_vm_into_cache(stream, node);
1058        }
1059    }
1060
1061
1062    CDpOneStream one_stream;
1063
1064    one_stream.m_dp_stream = node->m_ref_stream_info;
1065    one_stream.m_node =node;
1066
1067    lp_port->m_active_nodes.push_back(one_stream);
1068
1069    /* schedule only if active */
1070    if (node->m_state == CGenNodeStateless::ss_ACTIVE) {
1071        m_core->m_node_gen.add_node((CGenNode *)node);
1072    }
1073}
1074
1075void
1076TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj,
1077                                   double duration,
1078                                   int event_id) {
1079
1080
1081    TrexStatelessDpPerPort * lp_port=get_port_db(obj->get_port_id());
1082    lp_port->m_active_streams = 0;
1083    lp_port->set_event_id(event_id);
1084
1085    /* update cur time */
1086    if ( CGlobalInfo::is_realtime()  ){
1087        m_core->m_cur_time_sec = now_sec() + SCHD_OFFSET_DTIME ;
1088    }
1089
1090    /* no nodes in the list */
1091    assert(lp_port->m_active_nodes.size()==0);
1092
1093    for (auto single_stream : obj->get_objects()) {
1094        /* all commands should be for the same port */
1095        assert(obj->get_port_id() == single_stream.m_stream->m_port_id);
1096        add_stream(lp_port,single_stream.m_stream,obj);
1097    }
1098
1099    uint32_t nodes = lp_port->m_active_nodes.size();
1100    /* find next stream */
1101    assert(nodes == obj->get_objects().size());
1102
1103    int cnt=0;
1104
1105    /* set the next_stream pointer  */
1106    for (auto single_stream : obj->get_objects()) {
1107
1108        if (single_stream.m_stream->is_dp_next_stream() ) {
1109            int stream_id = single_stream.m_stream->m_next_stream_id;
1110            assert(stream_id<nodes);
1111            /* point to the next stream , stream_id is fixed */
1112            lp_port->m_active_nodes[cnt].m_node->m_next_stream = lp_port->m_active_nodes[stream_id].m_node ;
1113        }
1114        cnt++;
1115    }
1116
1117    lp_port->m_state =TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
1118    m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
1119
1120
1121    if ( duration > 0.0 ){
1122        add_port_duration( duration ,obj->get_port_id(),event_id );
1123    }
1124
1125}
1126
1127
1128bool TrexStatelessDpCore::are_all_ports_idle(){
1129
1130    bool res=true;
1131    int i;
1132    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
1133        if ( m_ports[i].m_state != TrexStatelessDpPerPort::ppSTATE_IDLE ){
1134            res=false;
1135        }
1136    }
1137    return (res);
1138}
1139
1140
1141void
1142TrexStatelessDpCore::resume_traffic(uint8_t port_id){
1143
1144    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1145
1146    lp_port->resume_traffic(port_id);
1147}
1148
1149
1150void
1151TrexStatelessDpCore::pause_traffic(uint8_t port_id){
1152
1153    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1154
1155    lp_port->pause_traffic(port_id);
1156}
1157
1158void
1159TrexStatelessDpCore::push_pcap(uint8_t port_id,
1160                               int event_id,
1161                               const std::string &pcap_filename,
1162                               double ipg_usec,
1163                               double speedup,
1164                               uint32_t count,
1165                               double duration) {
1166
1167    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1168
1169    lp_port->set_event_id(event_id);
1170
1171    /* delegate the command to the port */
1172    bool rc = lp_port->push_pcap(port_id, pcap_filename, ipg_usec, speedup, count);
1173    if (!rc) {
1174        /* report back that we stopped */
1175        CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
1176        TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
1177                                                                       port_id,
1178                                                                       event_id,
1179                                                                       false);
1180        ring->Enqueue((CGenNode *)event_msg);
1181        return;
1182    }
1183
1184
1185    if (duration > 0.0) {
1186        add_port_duration(duration, port_id, event_id);
1187    }
1188
1189     m_state = TrexStatelessDpCore::STATE_PCAP_TX;
1190}
1191
1192void
1193TrexStatelessDpCore::update_traffic(uint8_t port_id, double factor) {
1194
1195    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1196
1197    lp_port->update_traffic(port_id, factor);
1198}
1199
1200
1201void
1202TrexStatelessDpCore::stop_traffic(uint8_t  port_id,
1203                                  bool     stop_on_id,
1204                                  int      event_id) {
1205    /* we cannot remove nodes not from the top of the queue so
1206       for every active node - make sure next time
1207       the scheduler invokes it, it will be free */
1208
1209    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1210    if ( lp_port->stop_traffic(port_id,stop_on_id,event_id) == false){
1211        return;
1212    }
1213
1214    /* flush the TX queue before sending done message to the CP */
1215    m_core->flush_tx_queue();
1216
1217    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
1218    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
1219                                                                   port_id,
1220                                                                   lp_port->get_event_id());
1221    ring->Enqueue((CGenNode *)event_msg);
1222
1223}
1224
1225/**
1226 * handle a message from CP to DP
1227 *
1228 */
1229void
1230TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) {
1231    msg->handle(this);
1232    delete msg;
1233}
1234
1235void
1236TrexStatelessDpCore::barrier(uint8_t port_id, int event_id) {
1237
1238    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
1239    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
1240                                                                   port_id,
1241                                                                   event_id);
1242    ring->Enqueue((CGenNode *)event_msg);
1243}
1244
1245
1246/**
1247 * PCAP node
1248 */
1249bool CGenNodePCAP::create(uint8_t port_id,
1250                          pkt_dir_t dir,
1251                          socket_id_t socket_id,
1252                          const uint8_t *mac_addr,
1253                          const std::string &pcap_filename,
1254                          double ipg_usec,
1255                          double speedup,
1256                          uint32_t count) {
1257    std::stringstream ss;
1258
1259    m_type       = CGenNode::PCAP_PKT;
1260    m_flags      = 0;
1261    m_src_port   = 0;
1262    m_port_id    = port_id;
1263    m_count      = count;
1264
1265    /* mark this node as slow path */
1266    set_slow_path(true);
1267
1268    if (ipg_usec != -1) {
1269        /* fixed IPG */
1270        m_ipg_sec = usec_to_sec(ipg_usec / speedup);
1271        m_speedup = 0;
1272    } else {
1273        /* packet IPG */
1274        m_ipg_sec = -1;
1275        m_speedup  = speedup;
1276    }
1277
1278    /* copy MAC addr info */
1279    memcpy(m_mac_addr, mac_addr, 12);
1280
1281    /* set the dir */
1282    set_mbuf_dir(dir);
1283    set_socket_id(socket_id);
1284
1285    /* create the PCAP reader */
1286    m_reader = CCapReaderFactory::CreateReader((char *)pcap_filename.c_str(), 0, ss);
1287    if (!m_reader) {
1288        return false;
1289    }
1290
1291    m_raw_packet = new CCapPktRaw();
1292    if ( m_reader->ReadPacket(m_raw_packet) == false ){
1293        /* handle error */
1294        delete m_reader;
1295        return (false);
1296    }
1297
1298    /* this is the reference time */
1299    //m_base_time = m_raw_packet->get_time();
1300    m_last_pkt_time = m_raw_packet->get_time();
1301
1302    /* ready */
1303    m_state = PCAP_ACTIVE;
1304
1305    return true;
1306}
1307
1308/**
1309 * cleanup for PCAP node
1310 *
1311 * @author imarom (08-May-16)
1312 */
1313void CGenNodePCAP::destroy() {
1314
1315    if (m_raw_packet) {
1316        delete m_raw_packet;
1317        m_raw_packet = NULL;
1318    }
1319
1320    if (m_reader) {
1321        delete m_reader;
1322        m_reader = NULL;
1323    }
1324
1325    m_state = PCAP_INVALID;
1326}
1327
1328