trex_stateless_dp_core.cpp revision d49f3784
1/*
2 Itay Marom
3 Hanoch Haim
4 Cisco Systems, Inc.
5*/
6
7/*
8Copyright (c) 2015-2016 Cisco Systems, Inc.
9
10Licensed under the Apache License, Version 2.0 (the "License");
11you may not use this file except in compliance with the License.
12You may obtain a copy of the License at
13
14    http://www.apache.org/licenses/LICENSE-2.0
15
16Unless required by applicable law or agreed to in writing, software
17distributed under the License is distributed on an "AS IS" BASIS,
18WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19See the License for the specific language governing permissions and
20limitations under the License.
21*/
22#include "bp_sim.h"
23#include "trex_stateless_dp_core.h"
24#include "trex_stateless_messaging.h"
25#include "trex_stream.h"
26#include "trex_stream_node.h"
27#include "trex_streams_compiler.h"
28#include "mbuf.h"
29
30
31
32
33void CGenNodeStateless::cache_mbuf_array_init(){
34    m_cache_size=0;
35    m_cache_array_cnt=0;
36}
37
38
39
40void CGenNodeStateless::cache_mbuf_array_copy(CGenNodeCacheMbuf *obj,
41                                              uint16_t size){
42
43    int i;
44    cache_mbuf_array_alloc(size);
45    for (i=0; i<size; i++) {
46        cache_mbuf_array_set(i,obj->m_array[i]);
47    }
48    cache_mbuf_array_set_const_mbuf(obj->m_mbuf_const);
49}
50
51
52rte_mbuf_t ** CGenNodeStateless::cache_mbuf_array_alloc(uint16_t size){
53
54    uint32_t buf_size = CGenNodeCacheMbuf::get_object_size(size);
55    /* TBD  replace with align, zero API */
56    m_cache_mbuf = (void *)malloc(buf_size);
57    assert(m_cache_mbuf);
58    memset(m_cache_mbuf,0,buf_size);
59
60    m_flags |= SL_NODE_CONST_MBUF_CACHE_ARRAY;
61    m_cache_size=size;
62    m_cache_array_cnt=0;
63    return ((rte_mbuf_t **)m_cache_mbuf);
64}
65
66void CGenNodeStateless::cache_mbuf_array_free(){
67
68    assert(m_cache_mbuf);
69    int i;
70    for (i=0; i<(int)m_cache_size; i++) {
71        rte_mbuf_t * m=cache_mbuf_array_get((uint16_t)i);
72        assert(m);
73        rte_pktmbuf_free(m);
74    }
75
76    /* free the const */
77    rte_mbuf_t * m=cache_mbuf_array_get_const_mbuf() ;
78    if (m) {
79        rte_pktmbuf_free(m);
80    }
81
82    free(m_cache_mbuf);
83    m_cache_mbuf=0;
84}
85
86
87rte_mbuf_t * CGenNodeStateless::cache_mbuf_array_get(uint16_t index){
88
89    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
90    return (p->m_array[index]);
91}
92
93void CGenNodeStateless::cache_mbuf_array_set_const_mbuf(rte_mbuf_t * m){
94    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
95    p->m_mbuf_const=m;
96}
97
98rte_mbuf_t * CGenNodeStateless::cache_mbuf_array_get_const_mbuf(){
99    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
100    return (p->m_mbuf_const);
101}
102
103
104void CGenNodeStateless::cache_mbuf_array_set(uint16_t index,
105                                             rte_mbuf_t * m){
106    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
107    p->m_array[index]=m;
108}
109
110
111void CDpOneStream::Delete(CFlowGenListPerThread   * core){
112    assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE);
113    core->free_node((CGenNode *)m_node);
114    delete m_dp_stream;
115    m_node=0;
116    m_dp_stream=0;
117}
118
119void CDpOneStream::DeleteOnlyStream(){
120    assert(m_dp_stream);
121    delete m_dp_stream;
122    m_dp_stream=0;
123}
124
125int CGenNodeStateless::get_stream_id(){
126    if (m_state ==CGenNodeStateless::ss_FREE_RESUSE) {
127        return (-1); // not valid
128    }
129    assert(m_ref_stream_info);
130    return ((int)m_ref_stream_info->m_stream_id);
131}
132
133
134void CGenNodeStateless::DumpHeader(FILE *fd){
135    fprintf(fd," pkt_id, time, port , action , state, stream_id , stype , m-burst# , burst# \n");
136
137}
138void CGenNodeStateless::Dump(FILE *fd){
139    fprintf(fd," %2.4f, %3lu, %s,%s, %3d, %s, %3lu, %3lu  \n",
140            m_time,
141            (ulong)m_port_id,
142            "s-pkt", //action
143            get_stream_state_str(m_state ).c_str(),
144            get_stream_id(),   //stream_id
145            TrexStream::get_stream_type_str(m_stream_type).c_str(), //stype
146            (ulong)m_multi_bursts,
147            (ulong)m_single_burst
148            );
149}
150
151
152
153void CGenNodeStateless::refresh_vm_bss(){
154    if ( m_vm_flow_var ) {
155        StreamVmDp  * vm_s=m_ref_stream_info->m_vm_dp;
156        assert(vm_s);
157        memcpy(m_vm_flow_var,vm_s->get_bss(),vm_s->get_bss_size());
158
159        if ( vm_s->is_random_seed() ){
160            /* if we have random seed for this program */
161            if (m_ref_stream_info->m_random_seed) {
162                set_random_seed(m_ref_stream_info->m_random_seed);
163            }
164        }
165    }
166}
167
168
169/**
170 * this function called when stream restart after it was inactive
171 */
172void CGenNodeStateless::refresh(){
173
174    /* refill the stream info */
175    m_single_burst    = m_single_burst_refill;
176    m_multi_bursts    = m_ref_stream_info->m_num_bursts;
177    m_state           = CGenNodeStateless::ss_ACTIVE;
178
179    /* refresh init value */
180#if 0
181    /* TBD should add a JSON varible for that */
182    refresh_vm_bss();
183#endif
184}
185
186
187void CGenNodeCommand::free_command(){
188
189    assert(m_cmd);
190    m_cmd->on_node_remove();
191    delete m_cmd;
192}
193
194
195std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state){
196    std::string res;
197
198    switch (stream_state) {
199    case CGenNodeStateless::ss_FREE_RESUSE :
200         res="FREE    ";
201        break;
202    case CGenNodeStateless::ss_INACTIVE :
203        res="INACTIVE ";
204        break;
205    case CGenNodeStateless::ss_ACTIVE :
206        res="ACTIVE   ";
207        break;
208    default:
209        res="Unknow   ";
210    };
211    return(res);
212}
213
214/*
215 * Allocate mbuf for flow stat (and latency) info sending
216 * m - Original mbuf (can be complicated mbuf data structure)
217 * fsp_head - return pointer in which the flow stat info should be filled
218 * is_const - is the given mbuf const
219 * return new mbuf structure in which the fsp_head can be written. If needed, orginal mbuf is freed.
220 */
221rte_mbuf_t * CGenNodeStateless::alloc_flow_stat_mbuf(rte_mbuf_t *m, struct flow_stat_payload_header *&fsp_head
222                                                     , bool is_const) {
223    rte_mbuf_t *m_ret = NULL, *m_lat = NULL;
224    uint16_t fsp_head_size = sizeof(struct flow_stat_payload_header);
225
226    if (is_const) {
227        // const mbuf case
228        if (rte_pktmbuf_data_len(m) > 128) {
229            m_ret = CGlobalInfo::pktmbuf_alloc_small(get_socket_id());
230            assert(m_ret);
231            // alloc mbuf just for the latency header
232            m_lat = CGlobalInfo::pktmbuf_alloc( get_socket_id(), fsp_head_size);
233            assert(m_lat);
234            fsp_head = (struct flow_stat_payload_header *)rte_pktmbuf_append(m_lat, fsp_head_size);
235            rte_pktmbuf_attach(m_ret, m);
236            rte_pktmbuf_trim(m_ret, sizeof(struct flow_stat_payload_header));
237            utl_rte_pktmbuf_add_after2(m_ret, m_lat);
238            // ref count was updated when we took the (const) mbuf, and again in rte_pktmbuf_attach
239            // so need do decrease now, to avoid leak.
240            rte_pktmbuf_refcnt_update(m, -1);
241            return m_ret;
242        } else {
243            // Short packet. Just copy all bytes.
244            m_ret = CGlobalInfo::pktmbuf_alloc( get_socket_id(), rte_pktmbuf_data_len(m) );
245            assert(m_ret);
246            char *p = rte_pktmbuf_mtod(m, char*);
247            char *p_new = rte_pktmbuf_append(m_ret, rte_pktmbuf_data_len(m));
248            memcpy(p_new , p, rte_pktmbuf_data_len(m));
249            fsp_head = (struct flow_stat_payload_header *)(p_new + rte_pktmbuf_data_len(m) - fsp_head_size);
250            rte_pktmbuf_free(m);
251            return m_ret;
252        }
253    } else {
254        // Field engine (vm)
255        if (rte_pktmbuf_is_contiguous(m)) {
256            // one, r/w mbuf
257            char *p = rte_pktmbuf_mtod(m, char*);
258            fsp_head = (struct flow_stat_payload_header *)(p + rte_pktmbuf_data_len(m) - fsp_head_size);
259            return m;
260        } else {
261            // We have: r/w --> read only.
262            // Changing to:
263            // (original) r/w -> (new) indirect (direct is original read_only, after trimming last bytes) -> (new) latency info
264            rte_mbuf_t *m_read_only = m->next, *m_indirect;
265
266            m_indirect = CGlobalInfo::pktmbuf_alloc_small(get_socket_id());
267            assert(m_indirect);
268            // alloc mbuf just for the latency header
269            m_lat = CGlobalInfo::pktmbuf_alloc( get_socket_id(), fsp_head_size);
270            assert(m_lat);
271            fsp_head = (struct flow_stat_payload_header *)rte_pktmbuf_append(m_lat, fsp_head_size);
272            utl_rte_pktmbuf_chain_with_indirect(m, m_indirect, m_read_only, m_lat);
273            m_indirect->data_len = (uint16_t)(m_indirect->data_len - fsp_head_size);
274            return m;
275        }
276    }
277}
278
279// test the const case of alloc_flow_stat_mbuf. The more complicated non const case is tested in the simulation.
280bool CGenNodeStateless::alloc_flow_stat_mbuf_test_const() {
281    rte_mbuf_t *m, *m_test;
282    uint16_t sizes[2] = {64, 500};
283    uint16_t size;
284    struct flow_stat_payload_header *fsp_head;
285    char *p;
286
287    set_socket_id(0);
288    for (int test_num = 0; test_num < sizeof(sizes)/sizeof(sizes[0]); test_num++) {
289        size = sizes[test_num];
290        m = CGlobalInfo::pktmbuf_alloc(get_socket_id(), size);
291        p = rte_pktmbuf_append(m, size);
292        for (int i = 0; i < size; i++) {
293            p[i] = (char)i;
294        }
295        m_test = alloc_flow_stat_mbuf(m, fsp_head, true);
296        p = rte_pktmbuf_mtod(m_test, char*);
297        assert(rte_pktmbuf_pkt_len(m_test) == size);
298        for (int i = 0; i < rte_pktmbuf_pkt_len(m_test) - sizeof(*fsp_head); i++) {
299            assert(p[i] == (char)i);
300        }
301        // verify fsp_head points correctly
302        if (size > 128) { // should match threshould in alloc_flow_stat_mbuf
303            assert(rte_pktmbuf_data_len(m_test) == size - sizeof(*fsp_head));
304            assert(rte_pktmbuf_data_len(m_test->next) == sizeof(*fsp_head));
305            assert((char *)fsp_head == rte_pktmbuf_mtod((m_test->next), char*));
306        } else {
307            assert(rte_pktmbuf_data_len(m_test) == size);
308            assert (((char *)fsp_head) + sizeof (*fsp_head) == p + rte_pktmbuf_data_len(m_test));
309        }
310        rte_pktmbuf_free(m_test);
311    }
312    return true;
313}
314
315rte_mbuf_t   * CGenNodeStateless::alloc_node_with_vm(){
316
317    rte_mbuf_t        * m;
318    /* alloc small packet buffer*/
319    uint16_t prefix_size = prefix_header_size();
320    m = CGlobalInfo::pktmbuf_alloc( get_socket_id(), prefix_size );
321    if (m==0) {
322        return (m);
323    }
324    /* TBD remove this, should handle cases of error */
325    assert(m);
326    char *p=rte_pktmbuf_append(m, prefix_size);
327    memcpy( p ,m_original_packet_data_prefix, prefix_size);
328
329
330    /* run the VM program */
331    StreamDPVmInstructionsRunner runner;
332
333    runner.run( (uint32_t*)m_vm_flow_var,
334                m_vm_program_size,
335                m_vm_program,
336                m_vm_flow_var,
337                (uint8_t*)p);
338
339    uint16_t pkt_new_size=runner.get_new_pkt_size();
340    if ( likely( pkt_new_size == 0) ) {
341        /* no packet size change */
342        rte_mbuf_t * m_const = get_const_mbuf();
343        if (  m_const != NULL) {
344            utl_rte_pktmbuf_add_after(m,m_const);
345        }
346        return (m);
347    }
348
349    /* packet size change there are a few changes */
350    rte_mbuf_t * m_const = get_const_mbuf();
351    if ( (m_const == 0 ) || (pkt_new_size<=prefix_size) ) {
352        /* one mbuf , just trim it */
353        m->data_len = pkt_new_size;
354        m->pkt_len  = pkt_new_size;
355        return (m);
356    }
357
358    rte_mbuf_t * mi= CGlobalInfo::pktmbuf_alloc_small(get_socket_id());
359    assert(mi);
360    rte_pktmbuf_attach(mi,m_const);
361    utl_rte_pktmbuf_add_after2(m,mi);
362
363    if ( pkt_new_size < m->pkt_len) {
364        /* need to trim it */
365        mi->data_len = (pkt_new_size - prefix_size);
366        m->pkt_len   = pkt_new_size;
367    }
368    return (m);
369}
370
371void CGenNodeStateless::free_stl_vm_buf(){
372        rte_mbuf_t * m ;
373         m=get_const_mbuf();
374         if (m) {
375             rte_pktmbuf_free(m); /* reduce the ref counter */
376             /* clear the const marker */
377             clear_const_mbuf();
378         }
379
380         free_prefix_header();
381
382         if (m_vm_flow_var) {
383             /* free flow var */
384             free(m_vm_flow_var);
385             m_vm_flow_var=0;
386         }
387}
388
389
390
391void CGenNodeStateless::free_stl_node(){
392
393    if ( is_cache_mbuf_array() ){
394        /* do we have cache of mbuf pre allocated */
395        cache_mbuf_array_free();
396    }else{
397        /* if we have cache mbuf free it */
398        rte_mbuf_t * m=get_cache_mbuf();
399        if (m) {
400                rte_pktmbuf_free(m);
401                m_cache_mbuf=0;
402        }
403    }
404    free_stl_vm_buf();
405}
406
407
408bool TrexStatelessDpPerPort::update_number_of_active_streams(uint32_t d){
409    m_active_streams-=d; /* reduce the number of streams */
410    if (m_active_streams == 0) {
411        return (true);
412    }
413    return (false);
414}
415
416bool TrexStatelessDpPerPort::resume_traffic(uint8_t port_id){
417
418    /* we are working with continues streams so we must be in transmit mode */
419    assert(m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE);
420
421    for (auto dp_stream : m_active_nodes) {
422        CGenNodeStateless * node =dp_stream.m_node;
423        assert(node->get_port_id() == port_id);
424        assert(node->is_pause() == true);
425        node->set_pause(false);
426    }
427    m_state = TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
428    return (true);
429}
430
431bool TrexStatelessDpPerPort::update_traffic(uint8_t port_id, double factor) {
432
433    assert( (m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING ||
434            (m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE)) );
435
436    for (auto dp_stream : m_active_nodes) {
437        CGenNodeStateless * node = dp_stream.m_node;
438        assert(node->get_port_id() == port_id);
439
440        node->update_rate(factor);
441    }
442
443    return (true);
444}
445
446bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){
447
448    /* we are working with continues streams so we must be in transmit mode */
449    assert(m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING);
450
451    for (auto dp_stream : m_active_nodes) {
452        CGenNodeStateless * node =dp_stream.m_node;
453        assert(node->get_port_id() == port_id);
454        assert(node->is_pause() == false);
455        node->set_pause(true);
456    }
457    m_state = TrexStatelessDpPerPort::ppSTATE_PAUSE;
458    return (true);
459}
460
461bool TrexStatelessDpPerPort::push_pcap(uint8_t port_id,
462                                       const std::string &pcap_filename,
463                                       double ipg_usec,
464                                       double speedup,
465                                       uint32_t count) {
466
467    /* push pcap can only happen on an idle port from the core prespective */
468    assert(m_state == TrexStatelessDpPerPort::ppSTATE_IDLE);
469
470    CGenNodePCAP *pcap_node = m_core->allocate_pcap_node();
471    if (!pcap_node) {
472        return (false);
473    }
474
475    pkt_dir_t dir          = m_core->m_node_gen.m_v_if->port_id_to_dir(port_id);
476    socket_id_t socket_id  = m_core->m_node_gen.m_socket_id;
477
478    uint8_t mac_addr[12];
479    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, mac_addr);
480
481    bool rc = pcap_node->create(port_id,
482                                dir,
483                                socket_id,
484                                mac_addr,
485                                pcap_filename,
486                                ipg_usec,
487                                speedup,
488                                count);
489    if (!rc) {
490        m_core->free_node((CGenNode *)pcap_node);
491        return (false);
492    }
493
494    /* schedule the node for now */
495    pcap_node->m_time = m_core->m_cur_time_sec;
496    m_core->m_node_gen.add_node((CGenNode *)pcap_node);
497
498    /* hold a pointer to the node */
499    assert(m_active_pcap_node == NULL);
500    m_active_pcap_node = pcap_node;
501
502    m_state = TrexStatelessDpPerPort::ppSTATE_PCAP_TX;
503    return (true);
504}
505
506
507bool TrexStatelessDpPerPort::stop_traffic(uint8_t  port_id,
508                                          bool     stop_on_id,
509                                          int      event_id){
510
511
512    if (m_state == TrexStatelessDpPerPort::ppSTATE_IDLE) {
513        assert(m_active_streams==0);
514        return false;
515    }
516
517    /* there could be race of stop after stop */
518    if ( stop_on_id ) {
519        if (event_id != m_event_id){
520            /* we can't stop it is an old message */
521            return false;
522        }
523    }
524
525    for (auto dp_stream : m_active_nodes) {
526        CGenNodeStateless * node =dp_stream.m_node;
527        assert(node->get_port_id() == port_id);
528        if ( node->get_state() == CGenNodeStateless::ss_ACTIVE) {
529            node->mark_for_free();
530            m_active_streams--;
531            dp_stream.DeleteOnlyStream();
532
533        }else{
534            dp_stream.Delete(m_core);
535        }
536    }
537
538    /* check for active PCAP node */
539    if (m_active_pcap_node) {
540        /* when got async stop from outside or duration */
541        if (m_active_pcap_node->is_active()) {
542            m_active_pcap_node->mark_for_free();
543        } else {
544            /* graceful stop - node was put out by the scheduler */
545            m_core->free_node( (CGenNode *)m_active_pcap_node);
546        }
547
548        m_active_pcap_node = NULL;
549    }
550
551    /* active stream should be zero */
552    assert(m_active_streams==0);
553    m_active_nodes.clear();
554    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
555    return (true);
556}
557
558
559void TrexStatelessDpPerPort::create(CFlowGenListPerThread   *  core){
560    m_core=core;
561    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
562    m_active_streams=0;
563    m_active_nodes.clear();
564    m_active_pcap_node = NULL;
565}
566
567
568
569void
570TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) {
571    m_thread_id = thread_id;
572    m_core = core;
573    m_local_port_offset = 2*core->getDualPortId();
574
575    CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
576
577    m_ring_from_cp = cp_dp->getRingCpToDp(thread_id);
578    m_ring_to_cp   = cp_dp->getRingDpToCp(thread_id);
579
580    m_state = STATE_IDLE;
581
582    int i;
583    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
584        m_ports[i].create(core);
585    }
586}
587
588
589/* move to the next stream, old stream move to INACTIVE */
590bool TrexStatelessDpCore::set_stateless_next_node(CGenNodeStateless * cur_node,
591                                                  CGenNodeStateless * next_node){
592
593    assert(cur_node);
594    TrexStatelessDpPerPort * lp_port = get_port_db(cur_node->m_port_id);
595    bool schedule =false;
596
597    bool to_stop_port=false;
598
599    if (next_node == NULL) {
600        /* there is no next stream , reduce the number of active streams*/
601        to_stop_port = lp_port->update_number_of_active_streams(1);
602
603    }else{
604        uint8_t state=next_node->get_state();
605
606        /* can't be FREE_RESUSE */
607        assert(state != CGenNodeStateless::ss_FREE_RESUSE);
608        if (state == CGenNodeStateless::ss_INACTIVE ) {
609
610            if (cur_node->m_action_counter > 0) {
611                cur_node->m_action_counter--;
612                if (cur_node->m_action_counter==0) {
613                    to_stop_port = lp_port->update_number_of_active_streams(1);
614                }else{
615                    /* refill start info and scedule, no update in active streams  */
616                    next_node->refresh();
617                    schedule = true;
618                }
619            }else{
620                /* refill start info and scedule, no update in active streams  */
621                next_node->refresh();
622                schedule = true;
623            }
624
625        }else{
626            to_stop_port = lp_port->update_number_of_active_streams(1);
627        }
628    }
629
630    if ( to_stop_port ) {
631        /* call stop port explictly to move the state */
632        stop_traffic(cur_node->m_port_id,false,0);
633    }
634
635    return ( schedule );
636}
637
638
639
640/**
641 * in idle state loop, the processor most of the time sleeps
642 * and periodically checks for messages
643 *
644 * @author imarom (01-Nov-15)
645 */
646void
647TrexStatelessDpCore::idle_state_loop() {
648
649    const int SHORT_DELAY_MS    = 2;
650    const int LONG_DELAY_MS     = 50;
651    const int DEEP_SLEEP_LIMIT  = 2000;
652
653    int counter = 0;
654
655    while (m_state == STATE_IDLE) {
656        m_core->tickle();
657        m_core->m_node_gen.m_v_if->flush_dp_rx_queue();
658        bool had_msg = periodic_check_for_cp_messages();
659        if (had_msg) {
660            counter = 0;
661            continue;
662        }
663
664        /* enter deep sleep only if enough time had passed */
665        if (counter < DEEP_SLEEP_LIMIT) {
666            delay(SHORT_DELAY_MS);
667            counter++;
668        } else {
669            delay(LONG_DELAY_MS);
670        }
671
672    }
673}
674
675
676
677void TrexStatelessDpCore::quit_main_loop(){
678    m_core->set_terminate_mode(true); /* mark it as terminated */
679    m_state = STATE_TERMINATE;
680    add_global_duration(0.0001);
681}
682
683
684/**
685 * scehduler runs when traffic exists
686 * it will return when no more transmitting is done on this
687 * core
688 *
689 * @author imarom (01-Nov-15)
690 */
691void
692TrexStatelessDpCore::start_scheduler() {
693    /* creates a maintenace job using the scheduler */
694    CGenNode * node_sync = m_core->create_node() ;
695    node_sync->m_type = CGenNode::FLOW_SYNC;
696    node_sync->m_time = m_core->m_cur_time_sec + SYNC_TIME_OUT;
697    m_core->m_node_gen.add_node(node_sync);
698
699    double old_offset = 0.0;
700    m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset);
701    /* bail out in case of terminate */
702    if (m_state != TrexStatelessDpCore::STATE_TERMINATE) {
703        m_core->m_node_gen.close_file(m_core);
704        m_state = STATE_IDLE; /* we exit from all ports and we have nothing to do, we move to IDLE state */
705    }
706}
707
708
709void
710TrexStatelessDpCore::run_once(){
711
712    idle_state_loop();
713
714    if ( m_state == STATE_TERMINATE ){
715        return;
716    }
717
718    start_scheduler();
719}
720
721
722
723
724void
725TrexStatelessDpCore::start() {
726
727    while (true) {
728        run_once();
729
730        if ( m_core->is_terminated_by_master() ) {
731            break;
732        }
733    }
734}
735
736/* only if both port are idle we can exit */
737void
738TrexStatelessDpCore::schedule_exit(){
739
740    CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
741
742    node->m_type = CGenNode::COMMAND;
743
744    node->m_cmd = new TrexStatelessDpCanQuit();
745
746    /* make sure it will be scheduled after the current node */
747    node->m_time = m_core->m_cur_time_sec ;
748
749    m_core->m_node_gen.add_node((CGenNode *)node);
750}
751
752
753void
754TrexStatelessDpCore::add_global_duration(double duration){
755    if (duration > 0.0) {
756        CGenNode *node = m_core->create_node() ;
757
758        node->m_type = CGenNode::EXIT_SCHED;
759
760        /* make sure it will be scheduled after the current node */
761        node->m_time = m_core->m_cur_time_sec + duration ;
762
763        m_core->m_node_gen.add_node(node);
764    }
765}
766
767/* add per port exit */
768void
769TrexStatelessDpCore::add_port_duration(double duration,
770                                       uint8_t port_id,
771                                       int event_id){
772    if (duration > 0.0) {
773        CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
774
775        node->m_type = CGenNode::COMMAND;
776
777        /* make sure it will be scheduled after the current node */
778        node->m_time = m_core->m_cur_time_sec + duration ;
779
780        TrexStatelessDpStop * cmd=new TrexStatelessDpStop(port_id);
781
782
783        /* test this */
784        m_core->m_non_active_nodes++;
785        cmd->set_core_ptr(m_core);
786        cmd->set_event_id(event_id);
787        cmd->set_wait_for_event_id(true);
788
789        node->m_cmd = cmd;
790
791        m_core->m_node_gen.add_node((CGenNode *)node);
792    }
793}
794
795
796void TrexStatelessDpCore::update_mac_addr(TrexStream * stream,
797                                          CGenNodeStateless *node,
798                                          pkt_dir_t dir,
799                                          char *raw_pkt){
800    bool              ov_src = stream->get_override_src_mac_by_pkt_data();
801    TrexStream::stream_dst_mac_t  ov_dst = stream->get_override_dst_mac_mode();
802
803
804    if ( (ov_src == true) && (ov_dst == TrexStream::stPKT) ) {
805        /* nothing to do, take from the packet both */
806        return;
807    }
808
809        /* take from cfg_file */
810    if ( (ov_src == false) &&
811         (ov_dst == TrexStream::stCFG_FILE) ){
812
813          m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*)raw_pkt);
814          return;
815    }
816
817    /* save the pkt*/
818    char tmp_pkt[12];
819    memcpy(tmp_pkt,raw_pkt,12);
820
821    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*)raw_pkt);
822
823    if ((ov_src == true) && (ov_dst == TrexStream::stCFG_FILE)) {
824        memcpy(raw_pkt+6,tmp_pkt+6,6);
825    }
826
827    if ((ov_src == false) && (ov_dst == TrexStream::stPKT)) {
828        memcpy(raw_pkt,tmp_pkt,6);
829    }
830}
831
832
833void TrexStatelessDpCore::replay_vm_into_cache(TrexStream * stream,
834                                               CGenNodeStateless *node){
835
836    uint16_t      cache_size = stream->m_cache_size;
837    assert(cache_size>0);
838    rte_mbuf_t * m=0;
839
840    uint32_t buf_size = CGenNodeCacheMbuf::get_object_size(cache_size);
841    CGenNodeCacheMbuf * p = (CGenNodeCacheMbuf *)malloc(buf_size);
842    assert(p);
843    memset(p,0,buf_size);
844
845    int i;
846    for (i=0; i<cache_size; i++) {
847        p->m_array[i] =  node->alloc_node_with_vm();
848    }
849    /* save const */
850    m=node->get_const_mbuf();
851    if (m) {
852        p->m_mbuf_const=m;
853        rte_pktmbuf_refcnt_update(m,1);
854    }
855
856    /* free all VM and const mbuf */
857    node->free_stl_vm_buf();
858
859    /* copy to local node meory */
860    node->cache_mbuf_array_copy(p,cache_size);
861
862    /* free the memory */
863    free(p);
864}
865
866
867void
868TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port,
869                                TrexStream * stream,
870                                TrexStreamsCompiledObj *comp) {
871
872    CGenNodeStateless *node = m_core->create_node_sl();
873
874    node->cache_mbuf_array_init();
875    node->m_batch_size=0;
876
877    /* add periodic */
878    node->m_cache_mbuf=0;
879    node->m_type = CGenNode::STATELESS_PKT;
880
881    node->m_action_counter = stream->m_action_count;
882
883    /* clone the stream from control plane memory to DP memory */
884    node->m_ref_stream_info = stream->clone();
885    /* no need for this memory anymore on the control plane memory */
886    stream->release_dp_object();
887
888    node->m_next_stream=0; /* will be fixed later */
889
890    if ( stream->m_self_start ){
891        /* if self start it is in active mode */
892        node->m_state =CGenNodeStateless::ss_ACTIVE;
893        lp_port->m_active_streams++;
894    }else{
895        node->m_state =CGenNodeStateless::ss_INACTIVE;
896    }
897
898    node->m_time = m_core->m_cur_time_sec + stream->get_start_delay_sec();
899
900    pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(stream->m_port_id);
901    node->m_flags = 0;
902    node->m_src_port =0;
903    node->m_original_packet_data_prefix = 0;
904
905    if (stream->m_rx_check.m_enabled) {
906        node->set_stat_needed();
907        uint8_t hw_id = stream->m_rx_check.m_hw_id;
908        assert (hw_id < MAX_FLOW_STATS + MAX_FLOW_STATS_PAYLOAD);
909        node->set_stat_hw_id(hw_id);
910        // no support for cache with flow stat payload rules
911        if ((TrexPlatformApi::driver_stat_cap_e)stream->m_rx_check.m_rule_type == TrexPlatformApi::IF_STAT_PAYLOAD) {
912            stream->m_cache_size = 0;
913        }
914    }
915
916    /* set socket id */
917    node->set_socket_id(m_core->m_node_gen.m_socket_id);
918
919    /* build a mbuf from a packet */
920
921    uint16_t pkt_size = stream->m_pkt.len;
922    const uint8_t *stream_pkt = stream->m_pkt.binary;
923
924    node->m_pause =0;
925    node->m_stream_type = stream->m_type;
926    node->m_next_time_offset = 1.0 / stream->get_pps();
927    node->m_null_stream = (stream->m_null_stream ? 1 : 0);
928
929    /* stateless specific fields */
930    switch ( stream->m_type ) {
931
932    case TrexStream::stCONTINUOUS :
933        node->m_single_burst=0;
934        node->m_single_burst_refill=0;
935        node->m_multi_bursts=0;
936        break;
937
938    case TrexStream::stSINGLE_BURST :
939        node->m_stream_type             = TrexStream::stMULTI_BURST;
940        node->m_single_burst            = stream->m_burst_total_pkts;
941        node->m_single_burst_refill     = stream->m_burst_total_pkts;
942        node->m_multi_bursts            = 1;  /* single burst in multi burst of 1 */
943        break;
944
945    case TrexStream::stMULTI_BURST :
946        node->m_single_burst        = stream->m_burst_total_pkts;
947        node->m_single_burst_refill = stream->m_burst_total_pkts;
948        node->m_multi_bursts        = stream->m_num_bursts;
949        break;
950    default:
951
952        assert(0);
953    };
954
955    node->m_port_id = stream->m_port_id;
956
957    /* set dir 0 or 1 client or server */
958    node->set_mbuf_cache_dir(dir);
959
960
961    if (node->m_ref_stream_info->getDpVm() == NULL) {
962        /* no VM */
963
964        node->m_vm_flow_var =  NULL;
965        node->m_vm_program  =  NULL;
966        node->m_vm_program_size =0;
967
968                /* allocate const mbuf */
969        rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size);
970        assert(m);
971
972        char *p = rte_pktmbuf_append(m, pkt_size);
973        assert(p);
974        /* copy the packet */
975        memcpy(p,stream_pkt,pkt_size);
976
977        update_mac_addr(stream,node,dir,p);
978
979        /* set the packet as a readonly */
980        node->set_cache_mbuf(m);
981
982        node->m_original_packet_data_prefix =0;
983    }else{
984
985        /* set the program */
986        TrexStream * local_mem_stream = node->m_ref_stream_info;
987
988        StreamVmDp  * lpDpVm = local_mem_stream->getDpVm();
989
990        node->m_vm_flow_var      = lpDpVm->clone_bss(); /* clone the flow var */
991        node->m_vm_program       = lpDpVm->get_program(); /* same ref to the program */
992        node->m_vm_program_size  = lpDpVm->get_program_size();
993
994
995        /* set the random seed if was set */
996        if ( lpDpVm->is_random_seed() ){
997            /* if we have random seed for this program */
998            if (stream->m_random_seed) {
999                node->set_random_seed(stream->m_random_seed);
1000            }
1001        }
1002
1003        /* we need to copy the object */
1004        if ( pkt_size > lpDpVm->get_prefix_size() ) {
1005            /* we need const packet */
1006            uint16_t const_pkt_size  = pkt_size - lpDpVm->get_prefix_size() ;
1007            rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), const_pkt_size );
1008            assert(m);
1009
1010            char *p = rte_pktmbuf_append(m, const_pkt_size);
1011            assert(p);
1012
1013            /* copy packet data */
1014            memcpy(p,(stream_pkt + lpDpVm->get_prefix_size()),const_pkt_size);
1015
1016            node->set_const_mbuf(m);
1017        }
1018
1019
1020        if ( lpDpVm->is_pkt_size_var() ) {
1021            // mark the node as varible size
1022            node->set_var_pkt_size();
1023        }
1024
1025
1026        if (lpDpVm->get_prefix_size() > pkt_size ) {
1027            lpDpVm->set_prefix_size(pkt_size);
1028        }
1029
1030        /* copy the headr */
1031        uint16_t header_size = lpDpVm->get_prefix_size();
1032        assert(header_size);
1033        node->alloc_prefix_header(header_size);
1034        uint8_t *p=node->m_original_packet_data_prefix;
1035        assert(p);
1036
1037        memcpy(p,stream_pkt , header_size);
1038
1039        update_mac_addr(stream,node,dir,(char *)p);
1040
1041        if (stream->m_cache_size > 0 ) {
1042            /* we need to create cache of objects */
1043            replay_vm_into_cache(stream, node);
1044        }
1045    }
1046
1047
1048    CDpOneStream one_stream;
1049
1050    one_stream.m_dp_stream = node->m_ref_stream_info;
1051    one_stream.m_node =node;
1052
1053    lp_port->m_active_nodes.push_back(one_stream);
1054
1055    /* schedule only if active */
1056    if (node->m_state == CGenNodeStateless::ss_ACTIVE) {
1057        m_core->m_node_gen.add_node((CGenNode *)node);
1058    }
1059}
1060
1061void
1062TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj,
1063                                   double duration,
1064                                   int event_id) {
1065
1066
1067    TrexStatelessDpPerPort * lp_port=get_port_db(obj->get_port_id());
1068    lp_port->m_active_streams = 0;
1069    lp_port->set_event_id(event_id);
1070
1071    /* update cur time */
1072    if ( CGlobalInfo::is_realtime()  ){
1073        m_core->m_cur_time_sec = now_sec() + SCHD_OFFSET_DTIME ;
1074    }
1075
1076    /* no nodes in the list */
1077    assert(lp_port->m_active_nodes.size()==0);
1078
1079    for (auto single_stream : obj->get_objects()) {
1080        /* all commands should be for the same port */
1081        assert(obj->get_port_id() == single_stream.m_stream->m_port_id);
1082        add_stream(lp_port,single_stream.m_stream,obj);
1083    }
1084
1085    uint32_t nodes = lp_port->m_active_nodes.size();
1086    /* find next stream */
1087    assert(nodes == obj->get_objects().size());
1088
1089    int cnt=0;
1090
1091    /* set the next_stream pointer  */
1092    for (auto single_stream : obj->get_objects()) {
1093
1094        if (single_stream.m_stream->is_dp_next_stream() ) {
1095            int stream_id = single_stream.m_stream->m_next_stream_id;
1096            assert(stream_id<nodes);
1097            /* point to the next stream , stream_id is fixed */
1098            lp_port->m_active_nodes[cnt].m_node->m_next_stream = lp_port->m_active_nodes[stream_id].m_node ;
1099        }
1100        cnt++;
1101    }
1102
1103    lp_port->m_state =TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
1104    m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
1105
1106
1107    if ( duration > 0.0 ){
1108        add_port_duration( duration ,obj->get_port_id(),event_id );
1109    }
1110
1111}
1112
1113
1114bool TrexStatelessDpCore::are_all_ports_idle(){
1115
1116    bool res=true;
1117    int i;
1118    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
1119        if ( m_ports[i].m_state != TrexStatelessDpPerPort::ppSTATE_IDLE ){
1120            res=false;
1121        }
1122    }
1123    return (res);
1124}
1125
1126
1127void
1128TrexStatelessDpCore::resume_traffic(uint8_t port_id){
1129
1130    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1131
1132    lp_port->resume_traffic(port_id);
1133}
1134
1135
1136void
1137TrexStatelessDpCore::pause_traffic(uint8_t port_id){
1138
1139    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1140
1141    lp_port->pause_traffic(port_id);
1142}
1143
1144void
1145TrexStatelessDpCore::push_pcap(uint8_t port_id,
1146                               int event_id,
1147                               const std::string &pcap_filename,
1148                               double ipg_usec,
1149                               double speedup,
1150                               uint32_t count,
1151                               double duration) {
1152
1153    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1154
1155    lp_port->set_event_id(event_id);
1156
1157    /* delegate the command to the port */
1158    bool rc = lp_port->push_pcap(port_id, pcap_filename, ipg_usec, speedup, count);
1159    if (!rc) {
1160        /* report back that we stopped */
1161        CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
1162        TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
1163                                                                       port_id,
1164                                                                       event_id,
1165                                                                       false);
1166        ring->Enqueue((CGenNode *)event_msg);
1167        return;
1168    }
1169
1170
1171    if (duration > 0.0) {
1172        add_port_duration(duration, port_id, event_id);
1173    }
1174
1175     m_state = TrexStatelessDpCore::STATE_PCAP_TX;
1176}
1177
1178void
1179TrexStatelessDpCore::update_traffic(uint8_t port_id, double factor) {
1180
1181    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1182
1183    lp_port->update_traffic(port_id, factor);
1184}
1185
1186
1187void
1188TrexStatelessDpCore::stop_traffic(uint8_t  port_id,
1189                                  bool     stop_on_id,
1190                                  int      event_id) {
1191    /* we cannot remove nodes not from the top of the queue so
1192       for every active node - make sure next time
1193       the scheduler invokes it, it will be free */
1194
1195    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1196    if ( lp_port->stop_traffic(port_id,stop_on_id,event_id) == false){
1197        return;
1198    }
1199
1200    /* flush the TX queue before sending done message to the CP */
1201    m_core->flush_tx_queue();
1202
1203    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
1204    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
1205                                                                   port_id,
1206                                                                   lp_port->get_event_id());
1207    ring->Enqueue((CGenNode *)event_msg);
1208
1209}
1210
1211/**
1212 * handle a message from CP to DP
1213 *
1214 */
1215void
1216TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) {
1217    msg->handle(this);
1218    delete msg;
1219}
1220
1221void
1222TrexStatelessDpCore::barrier(uint8_t port_id, int event_id) {
1223
1224    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
1225    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
1226                                                                   port_id,
1227                                                                   event_id);
1228    ring->Enqueue((CGenNode *)event_msg);
1229}
1230
1231
1232/**
1233 * PCAP node
1234 */
1235bool CGenNodePCAP::create(uint8_t port_id,
1236                          pkt_dir_t dir,
1237                          socket_id_t socket_id,
1238                          const uint8_t *mac_addr,
1239                          const std::string &pcap_filename,
1240                          double ipg_usec,
1241                          double speedup,
1242                          uint32_t count) {
1243    std::stringstream ss;
1244
1245    m_type       = CGenNode::PCAP_PKT;
1246    m_flags      = 0;
1247    m_src_port   = 0;
1248    m_port_id    = port_id;
1249    m_count      = count;
1250
1251    /* mark this node as slow path */
1252    set_slow_path(true);
1253
1254    if (ipg_usec != -1) {
1255        /* fixed IPG */
1256        m_ipg_sec = usec_to_sec(ipg_usec / speedup);
1257        m_speedup = 0;
1258    } else {
1259        /* packet IPG */
1260        m_ipg_sec = -1;
1261        m_speedup  = speedup;
1262    }
1263
1264    /* copy MAC addr info */
1265    memcpy(m_mac_addr, mac_addr, 12);
1266
1267    /* set the dir */
1268    set_mbuf_dir(dir);
1269    set_socket_id(socket_id);
1270
1271    /* create the PCAP reader */
1272    m_reader = CCapReaderFactory::CreateReader((char *)pcap_filename.c_str(), 0, ss);
1273    if (!m_reader) {
1274        return false;
1275    }
1276
1277    m_raw_packet = new CCapPktRaw();
1278    if ( m_reader->ReadPacket(m_raw_packet) == false ){
1279        /* handle error */
1280        delete m_reader;
1281        return (false);
1282    }
1283
1284    /* this is the reference time */
1285    //m_base_time = m_raw_packet->get_time();
1286    m_last_pkt_time = m_raw_packet->get_time();
1287
1288    /* ready */
1289    m_state = PCAP_ACTIVE;
1290
1291    return true;
1292}
1293
1294/**
1295 * cleanup for PCAP node
1296 *
1297 * @author imarom (08-May-16)
1298 */
1299void CGenNodePCAP::destroy() {
1300
1301    if (m_raw_packet) {
1302        delete m_raw_packet;
1303        m_raw_packet = NULL;
1304    }
1305
1306    if (m_reader) {
1307        delete m_reader;
1308        m_reader = NULL;
1309    }
1310
1311    m_state = PCAP_INVALID;
1312}
1313
1314