trex_stateless_dp_core.cpp revision c3a0d758
1/*
2 Itay Marom
3 Hanoch Haim
4 Cisco Systems, Inc.
5*/
6
7/*
8Copyright (c) 2015-2016 Cisco Systems, Inc.
9
10Licensed under the Apache License, Version 2.0 (the "License");
11you may not use this file except in compliance with the License.
12You may obtain a copy of the License at
13
14    http://www.apache.org/licenses/LICENSE-2.0
15
16Unless required by applicable law or agreed to in writing, software
17distributed under the License is distributed on an "AS IS" BASIS,
18WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19See the License for the specific language governing permissions and
20limitations under the License.
21*/
22#include "bp_sim.h"
23#include "trex_stateless_dp_core.h"
24#include "trex_stateless_messaging.h"
25#include "trex_stream.h"
26#include "trex_stream_node.h"
27#include "trex_streams_compiler.h"
28#include "mbuf.h"
29
30
31
32
33void CGenNodeStateless::cache_mbuf_array_init(){
34    m_cache_size=0;
35    m_cache_array_cnt=0;
36}
37
38
39
40void CGenNodeStateless::cache_mbuf_array_copy(CGenNodeCacheMbuf *obj,
41                                              uint16_t size){
42
43    int i;
44    cache_mbuf_array_alloc(size);
45    for (i=0; i<size; i++) {
46        cache_mbuf_array_set(i,obj->m_array[i]);
47    }
48    cache_mbuf_array_set_const_mbuf(obj->m_mbuf_const);
49}
50
51
52rte_mbuf_t ** CGenNodeStateless::cache_mbuf_array_alloc(uint16_t size){
53
54    uint32_t buf_size = CGenNodeCacheMbuf::get_object_size(size);
55    /* TBD  replace with align, zero API */
56    m_cache_mbuf = (void *)malloc(buf_size);
57    assert(m_cache_mbuf);
58    memset(m_cache_mbuf,0,buf_size);
59
60    m_flags |= SL_NODE_CONST_MBUF_CACHE_ARRAY;
61    m_cache_size=size;
62    m_cache_array_cnt=0;
63    return ((rte_mbuf_t **)m_cache_mbuf);
64}
65
66void CGenNodeStateless::cache_mbuf_array_free(){
67
68    assert(m_cache_mbuf);
69    int i;
70    for (i=0; i<(int)m_cache_size; i++) {
71        rte_mbuf_t * m=cache_mbuf_array_get((uint16_t)i);
72        assert(m);
73        rte_pktmbuf_free(m);
74    }
75
76    /* free the const */
77    rte_mbuf_t * m=cache_mbuf_array_get_const_mbuf() ;
78    if (m) {
79        rte_pktmbuf_free(m);
80    }
81
82    free(m_cache_mbuf);
83    m_cache_mbuf=0;
84}
85
86
87rte_mbuf_t * CGenNodeStateless::cache_mbuf_array_get(uint16_t index){
88
89    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
90    return (p->m_array[index]);
91}
92
93void CGenNodeStateless::cache_mbuf_array_set_const_mbuf(rte_mbuf_t * m){
94    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
95    p->m_mbuf_const=m;
96}
97
98rte_mbuf_t * CGenNodeStateless::cache_mbuf_array_get_const_mbuf(){
99    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
100    return (p->m_mbuf_const);
101}
102
103
104void CGenNodeStateless::cache_mbuf_array_set(uint16_t index,
105                                             rte_mbuf_t * m){
106    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
107    p->m_array[index]=m;
108}
109
110
111void CDpOneStream::Delete(CFlowGenListPerThread   * core){
112    assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE);
113    core->free_node((CGenNode *)m_node);
114    delete m_dp_stream;
115    m_node=0;
116    m_dp_stream=0;
117}
118
119void CDpOneStream::DeleteOnlyStream(){
120    assert(m_dp_stream);
121    delete m_dp_stream;
122    m_dp_stream=0;
123}
124
125int CGenNodeStateless::get_stream_id(){
126    if (m_state ==CGenNodeStateless::ss_FREE_RESUSE) {
127        return (-1); // not valid
128    }
129    assert(m_ref_stream_info);
130    return ((int)m_ref_stream_info->m_stream_id);
131}
132
133
134void CGenNodeStateless::DumpHeader(FILE *fd){
135    fprintf(fd," pkt_id, time, port , action , state, stream_id , stype , m-burst# , burst# \n");
136
137}
138void CGenNodeStateless::Dump(FILE *fd){
139    fprintf(fd," %2.4f, %3lu, %s,%s, %3d, %s, %3lu, %3lu  \n",
140            m_time,
141            (ulong)m_port_id,
142            "s-pkt", //action
143            get_stream_state_str(m_state ).c_str(),
144            get_stream_id(),   //stream_id
145            TrexStream::get_stream_type_str(m_stream_type).c_str(), //stype
146            (ulong)m_multi_bursts,
147            (ulong)m_single_burst
148            );
149}
150
151
152
153void CGenNodeStateless::refresh_vm_bss(){
154    if ( m_vm_flow_var ) {
155        StreamVmDp  * vm_s=m_ref_stream_info->m_vm_dp;
156        assert(vm_s);
157        memcpy(m_vm_flow_var,vm_s->get_bss(),vm_s->get_bss_size());
158
159        if ( vm_s->is_random_seed() ){
160            /* if we have random seed for this program */
161            if (m_ref_stream_info->m_random_seed) {
162                set_random_seed(m_ref_stream_info->m_random_seed);
163            }
164        }
165    }
166}
167
168
169/**
170 * this function called when stream restart after it was inactive
171 */
172void CGenNodeStateless::refresh(){
173
174    /* refill the stream info */
175    m_single_burst    = m_single_burst_refill;
176    m_multi_bursts    = m_ref_stream_info->m_num_bursts;
177    m_state           = CGenNodeStateless::ss_ACTIVE;
178
179    /* refresh init value */
180#if 0
181    /* TBD should add a JSON varible for that */
182    refresh_vm_bss();
183#endif
184}
185
186
187void CGenNodeCommand::free_command(){
188
189    assert(m_cmd);
190    m_cmd->on_node_remove();
191    delete m_cmd;
192}
193
194
195std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state){
196    std::string res;
197
198    switch (stream_state) {
199    case CGenNodeStateless::ss_FREE_RESUSE :
200         res="FREE    ";
201        break;
202    case CGenNodeStateless::ss_INACTIVE :
203        res="INACTIVE ";
204        break;
205    case CGenNodeStateless::ss_ACTIVE :
206        res="ACTIVE   ";
207        break;
208    default:
209        res="Unknow   ";
210    };
211    return(res);
212}
213
214/*
215 * Allocate mbuf for flow stat (and latency) info sending
216 * m - Original mbuf (can be complicated mbuf data structure)
217 * fsp_head - return pointer in which the flow stat info should be filled
218 * is_const - is the given mbuf const
219 * return new mbuf structure in which the fsp_head can be written. If needed, orginal mbuf is freed.
220 */
221rte_mbuf_t * CGenNodeStateless::alloc_flow_stat_mbuf(rte_mbuf_t *m, struct flow_stat_payload_header *&fsp_head
222                                                     , bool is_const) {
223    rte_mbuf_t *m_ret = NULL, *m_lat = NULL;
224    uint16_t fsp_head_size = sizeof(struct flow_stat_payload_header);
225
226    if (is_const) {
227        // const mbuf case
228        if (rte_pktmbuf_data_len(m) > 128) {
229            m_ret = CGlobalInfo::pktmbuf_alloc_small(get_socket_id());
230            assert(m_ret);
231            // alloc mbuf just for the latency header
232            m_lat = CGlobalInfo::pktmbuf_alloc( get_socket_id(), fsp_head_size);
233            assert(m_lat);
234            fsp_head = (struct flow_stat_payload_header *)rte_pktmbuf_append(m_lat, fsp_head_size);
235            rte_pktmbuf_attach(m_ret, m);
236            rte_pktmbuf_trim(m_ret, sizeof(struct flow_stat_payload_header));
237            utl_rte_pktmbuf_add_after2(m_ret, m_lat);
238            // ref count was updated when we took the (const) mbuf, and again in rte_pktmbuf_attach
239            // so need do decrease now, to avoid leak.
240            rte_pktmbuf_refcnt_update(m, -1);
241            return m_ret;
242        } else {
243            // Short packet. Just copy all bytes.
244            m_ret = CGlobalInfo::pktmbuf_alloc( get_socket_id(), rte_pktmbuf_data_len(m) );
245            assert(m_ret);
246            char *p = rte_pktmbuf_mtod(m, char*);
247            char *p_new = rte_pktmbuf_append(m_ret, rte_pktmbuf_data_len(m));
248            memcpy(p_new , p, rte_pktmbuf_data_len(m));
249            fsp_head = (struct flow_stat_payload_header *)(p_new + rte_pktmbuf_data_len(m) - fsp_head_size);
250            rte_pktmbuf_free(m);
251            return m_ret;
252        }
253    } else {
254        // Field engine (vm)
255        if (rte_pktmbuf_is_contiguous(m)) {
256            // one, r/w mbuf
257            char *p = rte_pktmbuf_mtod(m, char*);
258            fsp_head = (struct flow_stat_payload_header *)(p + rte_pktmbuf_data_len(m) - fsp_head_size);
259            return m;
260        } else {
261            // We have: r/w --> read only.
262            // Changing to:
263            // (original) r/w -> (new) indirect (direct is original read_only, after trimming last bytes) -> (new) latency info
264            rte_mbuf_t *m_read_only = m->next, *m_indirect;
265
266            m_indirect = CGlobalInfo::pktmbuf_alloc_small(get_socket_id());
267            assert(m_indirect);
268            // alloc mbuf just for the latency header
269            m_lat = CGlobalInfo::pktmbuf_alloc( get_socket_id(), fsp_head_size);
270            assert(m_lat);
271            fsp_head = (struct flow_stat_payload_header *)rte_pktmbuf_append(m_lat, fsp_head_size);
272            utl_rte_pktmbuf_chain_with_indirect(m, m_indirect, m_read_only, m_lat);
273            m_indirect->data_len = (uint16_t)(m_indirect->data_len - fsp_head_size);
274            return m;
275        }
276    }
277}
278
279// test the const case of alloc_flow_stat_mbuf. The more complicated non const case is tested in the simulation.
280bool CGenNodeStateless::alloc_flow_stat_mbuf_test_const() {
281    rte_mbuf_t *m, *m_test;
282    uint16_t sizes[2] = {64, 500};
283    uint16_t size;
284    struct flow_stat_payload_header *fsp_head;
285    char *p;
286
287    set_socket_id(0);
288    for (int test_num = 0; test_num < sizeof(sizes)/sizeof(sizes[0]); test_num++) {
289        size = sizes[test_num];
290        m = CGlobalInfo::pktmbuf_alloc(get_socket_id(), size);
291        p = rte_pktmbuf_append(m, size);
292        for (int i = 0; i < size; i++) {
293            p[i] = (char)i;
294        }
295        m_test = alloc_flow_stat_mbuf(m, fsp_head, true);
296        p = rte_pktmbuf_mtod(m_test, char*);
297        assert(rte_pktmbuf_pkt_len(m_test) == size);
298        for (int i = 0; i < rte_pktmbuf_pkt_len(m_test) - sizeof(*fsp_head); i++) {
299            assert(p[i] == (char)i);
300        }
301        // verify fsp_head points correctly
302        if (size > 128) { // should match threshould in alloc_flow_stat_mbuf
303            assert(rte_pktmbuf_data_len(m_test) == size - sizeof(*fsp_head));
304            assert(rte_pktmbuf_data_len(m_test->next) == sizeof(*fsp_head));
305            assert((char *)fsp_head == rte_pktmbuf_mtod((m_test->next), char*));
306        } else {
307            assert(rte_pktmbuf_data_len(m_test) == size);
308            assert (((char *)fsp_head) + sizeof (*fsp_head) == p + rte_pktmbuf_data_len(m_test));
309        }
310        rte_pktmbuf_free(m_test);
311    }
312    return true;
313}
314
315rte_mbuf_t   * CGenNodeStateless::alloc_node_with_vm(){
316
317    rte_mbuf_t        * m;
318    /* alloc small packet buffer*/
319    uint16_t prefix_size = prefix_header_size();
320    m = CGlobalInfo::pktmbuf_alloc( get_socket_id(), prefix_size );
321    if (m==0) {
322        return (m);
323    }
324    /* TBD remove this, should handle cases of error */
325    assert(m);
326    char *p=rte_pktmbuf_append(m, prefix_size);
327    memcpy( p ,m_original_packet_data_prefix, prefix_size);
328
329
330    /* run the VM program */
331    StreamDPVmInstructionsRunner runner;
332
333    runner.run( (uint32_t*)m_vm_flow_var,
334                m_vm_program_size,
335                m_vm_program,
336                m_vm_flow_var,
337                (uint8_t*)p);
338
339    uint16_t pkt_new_size=runner.get_new_pkt_size();
340    if ( likely( pkt_new_size == 0) ) {
341        /* no packet size change */
342        rte_mbuf_t * m_const = get_const_mbuf();
343        if (  m_const != NULL) {
344            utl_rte_pktmbuf_add_after(m,m_const);
345        }
346        return (m);
347    }
348
349    /* packet size change there are a few changes */
350    rte_mbuf_t * m_const = get_const_mbuf();
351    if ( (m_const == 0 ) || (pkt_new_size<=prefix_size) ) {
352        /* one mbuf , just trim it */
353        m->data_len = pkt_new_size;
354        m->pkt_len  = pkt_new_size;
355        return (m);
356    }
357
358    rte_mbuf_t * mi= CGlobalInfo::pktmbuf_alloc_small(get_socket_id());
359    assert(mi);
360    rte_pktmbuf_attach(mi,m_const);
361    utl_rte_pktmbuf_add_after2(m,mi);
362
363    if ( pkt_new_size < m->pkt_len) {
364        /* need to trim it */
365        mi->data_len = (pkt_new_size - prefix_size);
366        m->pkt_len   = pkt_new_size;
367    }
368    return (m);
369}
370
371void CGenNodeStateless::free_stl_vm_buf(){
372        rte_mbuf_t * m ;
373         m=get_const_mbuf();
374         if (m) {
375             rte_pktmbuf_free(m); /* reduce the ref counter */
376             /* clear the const marker */
377             clear_const_mbuf();
378         }
379
380         free_prefix_header();
381
382         if (m_vm_flow_var) {
383             /* free flow var */
384             free(m_vm_flow_var);
385             m_vm_flow_var=0;
386         }
387}
388
389
390
391void CGenNodeStateless::free_stl_node(){
392
393    if ( is_cache_mbuf_array() ){
394        /* do we have cache of mbuf pre allocated */
395        cache_mbuf_array_free();
396    }else{
397        /* if we have cache mbuf free it */
398        rte_mbuf_t * m=get_cache_mbuf();
399        if (m) {
400                rte_pktmbuf_free(m);
401                m_cache_mbuf=0;
402        }
403    }
404    free_stl_vm_buf();
405}
406
407
408bool TrexStatelessDpPerPort::update_number_of_active_streams(uint32_t d){
409    m_active_streams-=d; /* reduce the number of streams */
410    if (m_active_streams == 0) {
411        return (true);
412    }
413    return (false);
414}
415
416bool TrexStatelessDpPerPort::resume_traffic(uint8_t port_id){
417
418    /* we are working with continues streams so we must be in transmit mode */
419    assert(m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE);
420
421    for (auto dp_stream : m_active_nodes) {
422        CGenNodeStateless * node =dp_stream.m_node;
423        assert(node->get_port_id() == port_id);
424        assert(node->is_pause() == true);
425        node->set_pause(false);
426    }
427    m_state = TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
428    return (true);
429}
430
431bool TrexStatelessDpPerPort::update_traffic(uint8_t port_id, double factor) {
432
433    assert( (m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING ||
434            (m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE)) );
435
436    for (auto dp_stream : m_active_nodes) {
437        CGenNodeStateless * node = dp_stream.m_node;
438        assert(node->get_port_id() == port_id);
439
440        node->update_rate(factor);
441    }
442
443    return (true);
444}
445
446bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){
447
448    /* we are working with continues streams so we must be in transmit mode */
449    assert(m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING);
450
451    for (auto dp_stream : m_active_nodes) {
452        CGenNodeStateless * node =dp_stream.m_node;
453        assert(node->get_port_id() == port_id);
454        assert(node->is_pause() == false);
455        node->set_pause(true);
456    }
457    m_state = TrexStatelessDpPerPort::ppSTATE_PAUSE;
458    return (true);
459}
460
461bool TrexStatelessDpPerPort::push_pcap(uint8_t port_id,
462                                       const std::string &pcap_filename,
463                                       double ipg_usec,
464                                       double speedup,
465                                       uint32_t count) {
466
467    /* push pcap can only happen on an idle port from the core prespective */
468    assert(m_state == TrexStatelessDpPerPort::ppSTATE_IDLE);
469
470    CGenNodePCAP *pcap_node = m_core->allocate_pcap_node();
471    if (!pcap_node) {
472        return (false);
473    }
474
475    pkt_dir_t dir          = m_core->m_node_gen.m_v_if->port_id_to_dir(port_id);
476    socket_id_t socket_id  = m_core->m_node_gen.m_socket_id;
477
478    uint8_t mac_addr[12];
479    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, mac_addr);
480
481    bool rc = pcap_node->create(port_id,
482                                dir,
483                                socket_id,
484                                mac_addr,
485                                pcap_filename,
486                                ipg_usec,
487                                speedup,
488                                count);
489    if (!rc) {
490        m_core->free_node((CGenNode *)pcap_node);
491        return (false);
492    }
493
494    /* schedule the node for now */
495    pcap_node->m_time = m_core->m_cur_time_sec;
496    m_core->m_node_gen.add_node((CGenNode *)pcap_node);
497
498    /* hold a pointer to the node */
499    assert(m_active_pcap_node == NULL);
500    m_active_pcap_node = pcap_node;
501
502    m_state = TrexStatelessDpPerPort::ppSTATE_PCAP_TX;
503    return (true);
504}
505
506
507bool TrexStatelessDpPerPort::stop_traffic(uint8_t  port_id,
508                                          bool     stop_on_id,
509                                          int      event_id){
510
511
512    if (m_state == TrexStatelessDpPerPort::ppSTATE_IDLE) {
513        assert(m_active_streams==0);
514        return false;
515    }
516
517    /* there could be race of stop after stop */
518    if ( stop_on_id ) {
519        if (event_id != m_event_id){
520            /* we can't stop it is an old message */
521            return false;
522        }
523    }
524
525    for (auto dp_stream : m_active_nodes) {
526        CGenNodeStateless * node =dp_stream.m_node;
527        assert(node->get_port_id() == port_id);
528        if ( node->get_state() == CGenNodeStateless::ss_ACTIVE) {
529            node->mark_for_free();
530            m_active_streams--;
531            dp_stream.DeleteOnlyStream();
532
533        }else{
534            dp_stream.Delete(m_core);
535        }
536    }
537
538    /* check for active PCAP node */
539    if (m_active_pcap_node) {
540        /* when got async stop from outside or duration */
541        if (m_active_pcap_node->is_active()) {
542            m_active_pcap_node->mark_for_free();
543        } else {
544            /* graceful stop - node was put out by the scheduler */
545            m_core->free_node( (CGenNode *)m_active_pcap_node);
546        }
547
548        m_active_pcap_node = NULL;
549    }
550
551    /* active stream should be zero */
552    assert(m_active_streams==0);
553    m_active_nodes.clear();
554    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
555    return (true);
556}
557
558
559void TrexStatelessDpPerPort::create(CFlowGenListPerThread   *  core){
560    m_core=core;
561    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
562    m_active_streams=0;
563    m_active_nodes.clear();
564    m_active_pcap_node = NULL;
565}
566
567
568
569void
570TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) {
571    m_thread_id = thread_id;
572    m_core = core;
573    m_local_port_offset = 2*core->getDualPortId();
574
575    CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
576
577    m_ring_from_cp = cp_dp->getRingCpToDp(thread_id);
578    m_ring_to_cp   = cp_dp->getRingDpToCp(thread_id);
579
580    m_state = STATE_IDLE;
581
582    int i;
583    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
584        m_ports[i].create(core);
585    }
586}
587
588
589/* move to the next stream, old stream move to INACTIVE */
590bool TrexStatelessDpCore::set_stateless_next_node(CGenNodeStateless * cur_node,
591                                                  CGenNodeStateless * next_node){
592
593    assert(cur_node);
594    TrexStatelessDpPerPort * lp_port = get_port_db(cur_node->m_port_id);
595    bool schedule =false;
596
597    bool to_stop_port=false;
598
599    if (next_node == NULL) {
600        /* there is no next stream , reduce the number of active streams*/
601        to_stop_port = lp_port->update_number_of_active_streams(1);
602
603    }else{
604        uint8_t state=next_node->get_state();
605
606        /* can't be FREE_RESUSE */
607        assert(state != CGenNodeStateless::ss_FREE_RESUSE);
608        if (state == CGenNodeStateless::ss_INACTIVE ) {
609
610            if (cur_node->m_action_counter > 0) {
611                cur_node->m_action_counter--;
612                if (cur_node->m_action_counter==0) {
613                    to_stop_port = lp_port->update_number_of_active_streams(1);
614                }else{
615                    /* refill start info and scedule, no update in active streams  */
616                    next_node->refresh();
617                    schedule = true;
618                }
619            }else{
620                /* refill start info and scedule, no update in active streams  */
621                next_node->refresh();
622                schedule = true;
623            }
624
625        }else{
626            to_stop_port = lp_port->update_number_of_active_streams(1);
627        }
628    }
629
630    if ( to_stop_port ) {
631        /* call stop port explictly to move the state */
632        stop_traffic(cur_node->m_port_id,false,0);
633    }
634
635    return ( schedule );
636}
637
638
639
640/**
641 * in idle state loop, the processor most of the time sleeps
642 * and periodically checks for messages
643 *
644 * @author imarom (01-Nov-15)
645 */
646void
647TrexStatelessDpCore::idle_state_loop() {
648
649    const int SHORT_DELAY_MS    = 2;
650    const int LONG_DELAY_MS     = 50;
651    const int DEEP_SLEEP_LIMIT  = 2000;
652
653    int counter = 0;
654
655    while (m_state == STATE_IDLE) {
656        m_core->tickle();
657        m_core->m_node_gen.m_v_if->flush_dp_rx_queue();
658        bool had_msg = periodic_check_for_cp_messages();
659        if (had_msg) {
660            counter = 0;
661            continue;
662        }
663
664        /* enter deep sleep only if enough time had passed */
665        if (counter < DEEP_SLEEP_LIMIT) {
666            delay(SHORT_DELAY_MS);
667            counter++;
668        } else {
669            delay(LONG_DELAY_MS);
670        }
671
672    }
673}
674
675
676
677void TrexStatelessDpCore::quit_main_loop(){
678    m_core->set_terminate_mode(true); /* mark it as terminated */
679    m_state = STATE_TERMINATE;
680    add_global_duration(0.0001);
681}
682
683
684/**
685 * scehduler runs when traffic exists
686 * it will return when no more transmitting is done on this
687 * core
688 *
689 * @author imarom (01-Nov-15)
690 */
691void
692TrexStatelessDpCore::start_scheduler() {
693    /* creates a maintenace job using the scheduler */
694    CGenNode * node_sync = m_core->create_node() ;
695    node_sync->m_type = CGenNode::FLOW_SYNC;
696    node_sync->m_time = m_core->m_cur_time_sec + SYNC_TIME_OUT;
697
698    m_core->m_node_gen.m_flow_sync_node = node_sync;
699    m_core->m_node_gen.add_node(node_sync);
700
701    double old_offset = 0.0;
702    m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset);
703    /* bail out in case of terminate */
704    if (m_state != TrexStatelessDpCore::STATE_TERMINATE) {
705        m_core->m_node_gen.close_file(m_core);
706        m_state = STATE_IDLE; /* we exit from all ports and we have nothing to do, we move to IDLE state */
707    }
708}
709
710
711void
712TrexStatelessDpCore::run_once(){
713
714    idle_state_loop();
715
716    if ( m_state == STATE_TERMINATE ){
717        return;
718    }
719
720    start_scheduler();
721}
722
723
724
725
726void
727TrexStatelessDpCore::start() {
728
729    while (true) {
730        run_once();
731
732        if ( m_core->is_terminated_by_master() ) {
733            break;
734        }
735    }
736}
737
738/* only if both port are idle we can exit */
739void
740TrexStatelessDpCore::schedule_exit(){
741
742    CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
743
744    node->m_type = CGenNode::COMMAND;
745
746    node->m_cmd = new TrexStatelessDpCanQuit();
747
748    /* make sure it will be scheduled after the current node */
749    node->m_time = m_core->m_cur_time_sec ;
750
751    m_core->m_node_gen.add_node((CGenNode *)node);
752}
753
754
755void
756TrexStatelessDpCore::add_global_duration(double duration){
757    if (duration > 0.0) {
758        CGenNode *node = m_core->create_node() ;
759
760        node->m_type = CGenNode::EXIT_SCHED;
761
762        /* make sure it will be scheduled after the current node */
763        node->m_time = m_core->m_cur_time_sec + duration ;
764
765        m_core->m_node_gen.add_node(node);
766    }
767}
768
769/* add per port exit */
770void
771TrexStatelessDpCore::add_port_duration(double duration,
772                                       uint8_t port_id,
773                                       int event_id){
774    if (duration > 0.0) {
775        CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
776
777        node->m_type = CGenNode::COMMAND;
778
779        /* make sure it will be scheduled after the current node */
780        node->m_time = m_core->m_cur_time_sec + duration ;
781
782        TrexStatelessDpStop * cmd=new TrexStatelessDpStop(port_id);
783
784
785        /* test this */
786        m_core->m_non_active_nodes++;
787        cmd->set_core_ptr(m_core);
788        cmd->set_event_id(event_id);
789        cmd->set_wait_for_event_id(true);
790
791        node->m_cmd = cmd;
792
793        m_core->m_node_gen.add_node((CGenNode *)node);
794    }
795}
796
797
798void TrexStatelessDpCore::update_mac_addr(TrexStream * stream,
799                                          CGenNodeStateless *node,
800                                          pkt_dir_t dir,
801                                          char *raw_pkt){
802    bool              ov_src = stream->get_override_src_mac_by_pkt_data();
803    TrexStream::stream_dst_mac_t  ov_dst = stream->get_override_dst_mac_mode();
804
805
806    if ( (ov_src == true) && (ov_dst == TrexStream::stPKT) ) {
807        /* nothing to do, take from the packet both */
808        return;
809    }
810
811        /* take from cfg_file */
812    if ( (ov_src == false) &&
813         (ov_dst == TrexStream::stCFG_FILE) ){
814
815          m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*)raw_pkt);
816          return;
817    }
818
819    /* save the pkt*/
820    char tmp_pkt[12];
821    memcpy(tmp_pkt,raw_pkt,12);
822
823    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*)raw_pkt);
824
825    if ((ov_src == true) && (ov_dst == TrexStream::stCFG_FILE)) {
826        memcpy(raw_pkt+6,tmp_pkt+6,6);
827    }
828
829    if ((ov_src == false) && (ov_dst == TrexStream::stPKT)) {
830        memcpy(raw_pkt,tmp_pkt,6);
831    }
832}
833
834
835void TrexStatelessDpCore::replay_vm_into_cache(TrexStream * stream,
836                                               CGenNodeStateless *node){
837
838    uint16_t      cache_size = stream->m_cache_size;
839    assert(cache_size>0);
840    rte_mbuf_t * m=0;
841
842    uint32_t buf_size = CGenNodeCacheMbuf::get_object_size(cache_size);
843    CGenNodeCacheMbuf * p = (CGenNodeCacheMbuf *)malloc(buf_size);
844    assert(p);
845    memset(p,0,buf_size);
846
847    int i;
848    for (i=0; i<cache_size; i++) {
849        p->m_array[i] =  node->alloc_node_with_vm();
850    }
851    /* save const */
852    m=node->get_const_mbuf();
853    if (m) {
854        p->m_mbuf_const=m;
855        rte_pktmbuf_refcnt_update(m,1);
856    }
857
858    /* free all VM and const mbuf */
859    node->free_stl_vm_buf();
860
861    /* copy to local node meory */
862    node->cache_mbuf_array_copy(p,cache_size);
863
864    /* free the memory */
865    free(p);
866}
867
868
869void
870TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port,
871                                TrexStream * stream,
872                                TrexStreamsCompiledObj *comp) {
873
874    CGenNodeStateless *node = m_core->create_node_sl();
875
876    node->cache_mbuf_array_init();
877    node->m_batch_size=0;
878
879    /* add periodic */
880    node->m_cache_mbuf=0;
881    node->m_type = CGenNode::STATELESS_PKT;
882
883    node->m_action_counter = stream->m_action_count;
884
885    /* clone the stream from control plane memory to DP memory */
886    node->m_ref_stream_info = stream->clone();
887    /* no need for this memory anymore on the control plane memory */
888    stream->release_dp_object();
889
890    node->m_next_stream=0; /* will be fixed later */
891
892    if ( stream->m_self_start ){
893        /* if self start it is in active mode */
894        node->m_state =CGenNodeStateless::ss_ACTIVE;
895        lp_port->m_active_streams++;
896    }else{
897        node->m_state =CGenNodeStateless::ss_INACTIVE;
898    }
899
900    node->m_time = m_core->m_cur_time_sec + stream->get_start_delay_sec();
901
902    pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(stream->m_port_id);
903    node->m_flags = 0;
904    node->m_src_port =0;
905    node->m_original_packet_data_prefix = 0;
906
907    if (stream->m_rx_check.m_enabled) {
908        node->set_stat_needed();
909        uint8_t hw_id = stream->m_rx_check.m_hw_id;
910        assert (hw_id < MAX_FLOW_STATS + MAX_FLOW_STATS_PAYLOAD);
911        node->set_stat_hw_id(hw_id);
912        // no support for cache with flow stat payload rules
913        if ((TrexPlatformApi::driver_stat_cap_e)stream->m_rx_check.m_rule_type == TrexPlatformApi::IF_STAT_PAYLOAD) {
914            stream->m_cache_size = 0;
915        }
916    }
917
918    /* set socket id */
919    node->set_socket_id(m_core->m_node_gen.m_socket_id);
920
921    /* build a mbuf from a packet */
922
923    uint16_t pkt_size = stream->m_pkt.len;
924    const uint8_t *stream_pkt = stream->m_pkt.binary;
925
926    node->m_pause =0;
927    node->m_stream_type = stream->m_type;
928    node->m_next_time_offset = 1.0 / stream->get_pps();
929    node->m_null_stream = (stream->m_null_stream ? 1 : 0);
930
931    /* stateless specific fields */
932    switch ( stream->m_type ) {
933
934    case TrexStream::stCONTINUOUS :
935        node->m_single_burst=0;
936        node->m_single_burst_refill=0;
937        node->m_multi_bursts=0;
938        break;
939
940    case TrexStream::stSINGLE_BURST :
941        node->m_stream_type             = TrexStream::stMULTI_BURST;
942        node->m_single_burst            = stream->m_burst_total_pkts;
943        node->m_single_burst_refill     = stream->m_burst_total_pkts;
944        node->m_multi_bursts            = 1;  /* single burst in multi burst of 1 */
945        break;
946
947    case TrexStream::stMULTI_BURST :
948        node->m_single_burst        = stream->m_burst_total_pkts;
949        node->m_single_burst_refill = stream->m_burst_total_pkts;
950        node->m_multi_bursts        = stream->m_num_bursts;
951        break;
952    default:
953
954        assert(0);
955    };
956
957    node->m_port_id = stream->m_port_id;
958
959    /* set dir 0 or 1 client or server */
960    node->set_mbuf_cache_dir(dir);
961
962
963    if (node->m_ref_stream_info->getDpVm() == NULL) {
964        /* no VM */
965
966        node->m_vm_flow_var =  NULL;
967        node->m_vm_program  =  NULL;
968        node->m_vm_program_size =0;
969
970                /* allocate const mbuf */
971        rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size);
972        assert(m);
973
974        char *p = rte_pktmbuf_append(m, pkt_size);
975        assert(p);
976        /* copy the packet */
977        memcpy(p,stream_pkt,pkt_size);
978
979        update_mac_addr(stream,node,dir,p);
980
981        /* set the packet as a readonly */
982        node->set_cache_mbuf(m);
983
984        node->m_original_packet_data_prefix =0;
985    }else{
986
987        /* set the program */
988        TrexStream * local_mem_stream = node->m_ref_stream_info;
989
990        StreamVmDp  * lpDpVm = local_mem_stream->getDpVm();
991
992        node->m_vm_flow_var      = lpDpVm->clone_bss(); /* clone the flow var */
993        node->m_vm_program       = lpDpVm->get_program(); /* same ref to the program */
994        node->m_vm_program_size  = lpDpVm->get_program_size();
995
996
997        /* set the random seed if was set */
998        if ( lpDpVm->is_random_seed() ){
999            /* if we have random seed for this program */
1000            if (stream->m_random_seed) {
1001                node->set_random_seed(stream->m_random_seed);
1002            }
1003        }
1004
1005        /* we need to copy the object */
1006        if ( pkt_size > lpDpVm->get_prefix_size() ) {
1007            /* we need const packet */
1008            uint16_t const_pkt_size  = pkt_size - lpDpVm->get_prefix_size() ;
1009            rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), const_pkt_size );
1010            assert(m);
1011
1012            char *p = rte_pktmbuf_append(m, const_pkt_size);
1013            assert(p);
1014
1015            /* copy packet data */
1016            memcpy(p,(stream_pkt + lpDpVm->get_prefix_size()),const_pkt_size);
1017
1018            node->set_const_mbuf(m);
1019        }
1020
1021
1022        if ( lpDpVm->is_pkt_size_var() ) {
1023            // mark the node as varible size
1024            node->set_var_pkt_size();
1025        }
1026
1027
1028        if (lpDpVm->get_prefix_size() > pkt_size ) {
1029            lpDpVm->set_prefix_size(pkt_size);
1030        }
1031
1032        /* copy the headr */
1033        uint16_t header_size = lpDpVm->get_prefix_size();
1034        assert(header_size);
1035        node->alloc_prefix_header(header_size);
1036        uint8_t *p=node->m_original_packet_data_prefix;
1037        assert(p);
1038
1039        memcpy(p,stream_pkt , header_size);
1040
1041        update_mac_addr(stream,node,dir,(char *)p);
1042
1043        if (stream->m_cache_size > 0 ) {
1044            /* we need to create cache of objects */
1045            replay_vm_into_cache(stream, node);
1046        }
1047    }
1048
1049
1050    CDpOneStream one_stream;
1051
1052    one_stream.m_dp_stream = node->m_ref_stream_info;
1053    one_stream.m_node =node;
1054
1055    lp_port->m_active_nodes.push_back(one_stream);
1056
1057    /* schedule only if active */
1058    if (node->m_state == CGenNodeStateless::ss_ACTIVE) {
1059        m_core->m_node_gen.add_node((CGenNode *)node);
1060    }
1061}
1062
1063void
1064TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj,
1065                                   double duration,
1066                                   int event_id) {
1067
1068
1069    TrexStatelessDpPerPort * lp_port=get_port_db(obj->get_port_id());
1070    lp_port->m_active_streams = 0;
1071    lp_port->set_event_id(event_id);
1072
1073    /* update cur time */
1074    if ( CGlobalInfo::is_realtime()  ){
1075        m_core->m_cur_time_sec = now_sec() + SCHD_OFFSET_DTIME ;
1076    }
1077
1078    /* no nodes in the list */
1079    assert(lp_port->m_active_nodes.size()==0);
1080
1081    for (auto single_stream : obj->get_objects()) {
1082        /* all commands should be for the same port */
1083        assert(obj->get_port_id() == single_stream.m_stream->m_port_id);
1084        add_stream(lp_port,single_stream.m_stream,obj);
1085    }
1086
1087    uint32_t nodes = lp_port->m_active_nodes.size();
1088    /* find next stream */
1089    assert(nodes == obj->get_objects().size());
1090
1091    int cnt=0;
1092
1093    /* set the next_stream pointer  */
1094    for (auto single_stream : obj->get_objects()) {
1095
1096        if (single_stream.m_stream->is_dp_next_stream() ) {
1097            int stream_id = single_stream.m_stream->m_next_stream_id;
1098            assert(stream_id<nodes);
1099            /* point to the next stream , stream_id is fixed */
1100            lp_port->m_active_nodes[cnt].m_node->m_next_stream = lp_port->m_active_nodes[stream_id].m_node ;
1101        }
1102        cnt++;
1103    }
1104
1105    lp_port->m_state =TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
1106    m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
1107
1108
1109    if ( duration > 0.0 ){
1110        add_port_duration( duration ,obj->get_port_id(),event_id );
1111    }
1112
1113}
1114
1115
1116bool TrexStatelessDpCore::are_all_ports_idle(){
1117
1118    bool res=true;
1119    int i;
1120    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
1121        if ( m_ports[i].m_state != TrexStatelessDpPerPort::ppSTATE_IDLE ){
1122            res=false;
1123        }
1124    }
1125    return (res);
1126}
1127
1128
1129void
1130TrexStatelessDpCore::resume_traffic(uint8_t port_id){
1131
1132    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1133
1134    lp_port->resume_traffic(port_id);
1135}
1136
1137
1138void
1139TrexStatelessDpCore::pause_traffic(uint8_t port_id){
1140
1141    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1142
1143    lp_port->pause_traffic(port_id);
1144}
1145
1146void
1147TrexStatelessDpCore::push_pcap(uint8_t port_id,
1148                               int event_id,
1149                               const std::string &pcap_filename,
1150                               double ipg_usec,
1151                               double speedup,
1152                               uint32_t count,
1153                               double duration) {
1154
1155    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1156
1157    lp_port->set_event_id(event_id);
1158
1159    /* delegate the command to the port */
1160    bool rc = lp_port->push_pcap(port_id, pcap_filename, ipg_usec, speedup, count);
1161    if (!rc) {
1162        /* report back that we stopped */
1163        CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
1164        TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
1165                                                                       port_id,
1166                                                                       event_id,
1167                                                                       false);
1168        ring->Enqueue((CGenNode *)event_msg);
1169        return;
1170    }
1171
1172
1173    if (duration > 0.0) {
1174        add_port_duration(duration, port_id, event_id);
1175    }
1176
1177     m_state = TrexStatelessDpCore::STATE_PCAP_TX;
1178}
1179
1180void
1181TrexStatelessDpCore::update_traffic(uint8_t port_id, double factor) {
1182
1183    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1184
1185    lp_port->update_traffic(port_id, factor);
1186}
1187
1188
1189void
1190TrexStatelessDpCore::stop_traffic(uint8_t  port_id,
1191                                  bool     stop_on_id,
1192                                  int      event_id) {
1193    /* we cannot remove nodes not from the top of the queue so
1194       for every active node - make sure next time
1195       the scheduler invokes it, it will be free */
1196
1197    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1198    if ( lp_port->stop_traffic(port_id,stop_on_id,event_id) == false){
1199        return;
1200    }
1201
1202    /* flush the TX queue before sending done message to the CP */
1203    m_core->flush_tx_queue();
1204
1205    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
1206    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
1207                                                                   port_id,
1208                                                                   lp_port->get_event_id());
1209    ring->Enqueue((CGenNode *)event_msg);
1210
1211}
1212
1213/**
1214 * handle a message from CP to DP
1215 *
1216 */
1217void
1218TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) {
1219    msg->handle(this);
1220    delete msg;
1221}
1222
1223void
1224TrexStatelessDpCore::barrier(uint8_t port_id, int event_id) {
1225
1226    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
1227    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
1228                                                                   port_id,
1229                                                                   event_id);
1230    ring->Enqueue((CGenNode *)event_msg);
1231}
1232
1233
1234/**
1235 * PCAP node
1236 */
1237bool CGenNodePCAP::create(uint8_t port_id,
1238                          pkt_dir_t dir,
1239                          socket_id_t socket_id,
1240                          const uint8_t *mac_addr,
1241                          const std::string &pcap_filename,
1242                          double ipg_usec,
1243                          double speedup,
1244                          uint32_t count) {
1245    std::stringstream ss;
1246
1247    m_type       = CGenNode::PCAP_PKT;
1248    m_flags      = 0;
1249    m_src_port   = 0;
1250    m_port_id    = port_id;
1251    m_count      = count;
1252
1253    /* mark this node as slow path */
1254    set_slow_path(true);
1255
1256    if (ipg_usec != -1) {
1257        /* fixed IPG */
1258        m_ipg_sec = usec_to_sec(ipg_usec / speedup);
1259        m_speedup = 0;
1260    } else {
1261        /* packet IPG */
1262        m_ipg_sec = -1;
1263        m_speedup  = speedup;
1264    }
1265
1266    /* copy MAC addr info */
1267    memcpy(m_mac_addr, mac_addr, 12);
1268
1269    /* set the dir */
1270    set_mbuf_dir(dir);
1271    set_socket_id(socket_id);
1272
1273    /* create the PCAP reader */
1274    m_reader = CCapReaderFactory::CreateReader((char *)pcap_filename.c_str(), 0, ss);
1275    if (!m_reader) {
1276        return false;
1277    }
1278
1279    m_raw_packet = new CCapPktRaw();
1280    if ( m_reader->ReadPacket(m_raw_packet) == false ){
1281        /* handle error */
1282        delete m_reader;
1283        return (false);
1284    }
1285
1286    /* this is the reference time */
1287    //m_base_time = m_raw_packet->get_time();
1288    m_last_pkt_time = m_raw_packet->get_time();
1289
1290    /* ready */
1291    m_state = PCAP_ACTIVE;
1292
1293    return true;
1294}
1295
1296/**
1297 * cleanup for PCAP node
1298 *
1299 * @author imarom (08-May-16)
1300 */
1301void CGenNodePCAP::destroy() {
1302
1303    if (m_raw_packet) {
1304        delete m_raw_packet;
1305        m_raw_packet = NULL;
1306    }
1307
1308    if (m_reader) {
1309        delete m_reader;
1310        m_reader = NULL;
1311    }
1312
1313    m_state = PCAP_INVALID;
1314}
1315
1316