trex_stateless_dp_core.cpp revision e7ffce7b
1/*
2 Itay Marom
3 Hanoch Haim
4 Cisco Systems, Inc.
5*/
6
7/*
8Copyright (c) 2015-2015 Cisco Systems, Inc.
9
10Licensed under the Apache License, Version 2.0 (the "License");
11you may not use this file except in compliance with the License.
12You may obtain a copy of the License at
13
14    http://www.apache.org/licenses/LICENSE-2.0
15
16Unless required by applicable law or agreed to in writing, software
17distributed under the License is distributed on an "AS IS" BASIS,
18WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19See the License for the specific language governing permissions and
20limitations under the License.
21*/
22#include <trex_stateless_dp_core.h>
23#include <trex_stateless_messaging.h>
24#include <trex_streams_compiler.h>
25#include <trex_stream_node.h>
26#include <trex_stream.h>
27
28#include <bp_sim.h>
29
30
31void CDpOneStream::Delete(CFlowGenListPerThread   * core){
32    assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE);
33    core->free_node((CGenNode *)m_node);
34    delete m_dp_stream;
35    m_node=0;
36    m_dp_stream=0;
37}
38
39void CDpOneStream::DeleteOnlyStream(){
40    assert(m_dp_stream);
41    delete m_dp_stream;
42    m_dp_stream=0;
43}
44
45int CGenNodeStateless::get_stream_id(){
46    if (m_state ==CGenNodeStateless::ss_FREE_RESUSE) {
47        return (-1); // not valid
48    }
49    assert(m_ref_stream_info);
50    return ((int)m_ref_stream_info->m_stream_id);
51}
52
53
54void CGenNodeStateless::DumpHeader(FILE *fd){
55    fprintf(fd," pkt_id, time, port , action , state, stream_id , stype , m-burst# , burst# \n");
56
57}
58void CGenNodeStateless::Dump(FILE *fd){
59    fprintf(fd," %2.4f, %3lu, %s,%s, %3d, %s, %3lu, %3lu  \n",
60            m_time,
61            (ulong)m_port_id,
62            "s-pkt", //action
63            get_stream_state_str(m_state ).c_str(),
64            get_stream_id(),   //stream_id
65            TrexStream::get_stream_type_str(m_stream_type).c_str(), //stype
66            (ulong)m_multi_bursts,
67            (ulong)m_single_burst
68            );
69}
70
71
72void CGenNodeStateless::refresh(){
73
74    /* refill the stream info */
75    m_single_burst    = m_single_burst_refill;
76    m_multi_bursts    = m_ref_stream_info->m_num_bursts;
77    m_state           = CGenNodeStateless::ss_ACTIVE;
78}
79
80
81void CGenNodeCommand::free_command(){
82
83    assert(m_cmd);
84    m_cmd->on_node_remove();
85    delete m_cmd;
86}
87
88
89std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state){
90    std::string res;
91
92    switch (stream_state) {
93    case CGenNodeStateless::ss_FREE_RESUSE :
94         res="FREE    ";
95        break;
96    case CGenNodeStateless::ss_INACTIVE :
97        res="INACTIVE ";
98        break;
99    case CGenNodeStateless::ss_ACTIVE :
100        res="ACTIVE   ";
101        break;
102    default:
103        res="Unknow   ";
104    };
105    return(res);
106}
107
108
109rte_mbuf_t   * CGenNodeStateless::alloc_node_with_vm(){
110
111    rte_mbuf_t        * m;
112    /* alloc small packet buffer*/
113    uint16_t prefix_size = prefix_header_size();
114    m = CGlobalInfo::pktmbuf_alloc( get_socket_id(), prefix_size );
115    if (m==0) {
116        return (m);
117    }
118    /* TBD remove this, should handle cases of error */
119    assert(m);
120    char *p=rte_pktmbuf_append(m, prefix_size);
121    memcpy( p ,m_original_packet_data_prefix, prefix_size);
122
123
124    /* run the VM program */
125    StreamDPVmInstructionsRunner runner;
126
127    runner.run( m_vm_program_size,
128                m_vm_program,
129                m_vm_flow_var,
130                (uint8_t*)p);
131
132
133    rte_mbuf_t * m_const = get_const_mbuf();
134    if (  m_const != NULL) {
135        utl_rte_pktmbuf_add_after(m,m_const);
136    }
137    return (m);
138}
139
140
141void CGenNodeStateless::free_stl_node(){
142    /* if we have cache mbuf free it */
143    rte_mbuf_t * m=get_cache_mbuf();
144    if (m) {
145        rte_pktmbuf_free(m);
146        m_cache_mbuf=0;
147    }else{
148        /* non cache - must have an header */
149         m=get_const_mbuf();
150         if (m) {
151             rte_pktmbuf_free(m); /* reduce the ref counter */
152         }
153         free_prefix_header();
154    }
155    if (m_vm_flow_var) {
156        /* free flow var */
157        free(m_vm_flow_var);
158        m_vm_flow_var=0;
159    }
160}
161
162
163bool TrexStatelessDpPerPort::update_number_of_active_streams(uint32_t d){
164    m_active_streams-=d; /* reduce the number of streams */
165    if (m_active_streams == 0) {
166        return (true);
167    }
168    return (false);
169}
170
171bool TrexStatelessDpPerPort::resume_traffic(uint8_t port_id){
172
173    /* we are working with continues streams so we must be in transmit mode */
174    assert(m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE);
175
176    for (auto dp_stream : m_active_nodes) {
177        CGenNodeStateless * node =dp_stream.m_node;
178        assert(node->get_port_id() == port_id);
179        assert(node->is_pause() == true);
180        node->set_pause(false);
181    }
182    m_state = TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
183    return (true);
184}
185
186bool TrexStatelessDpPerPort::update_traffic(uint8_t port_id, double factor) {
187
188    assert( (m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING ||
189            (m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE)) );
190
191    for (auto dp_stream : m_active_nodes) {
192        CGenNodeStateless * node = dp_stream.m_node;
193        assert(node->get_port_id() == port_id);
194
195        node->update_rate(factor);
196    }
197
198    return (true);
199}
200
201bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){
202
203    /* we are working with continues streams so we must be in transmit mode */
204    assert(m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING);
205
206    for (auto dp_stream : m_active_nodes) {
207        CGenNodeStateless * node =dp_stream.m_node;
208        assert(node->get_port_id() == port_id);
209        assert(node->is_pause() == false);
210        node->set_pause(true);
211    }
212    m_state = TrexStatelessDpPerPort::ppSTATE_PAUSE;
213    return (true);
214}
215
216
217bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id,
218                                          bool stop_on_id,
219                                          int event_id){
220
221
222    if (m_state == TrexStatelessDpPerPort::ppSTATE_IDLE) {
223        assert(m_active_streams==0);
224        return false;
225    }
226
227    /* there could be race of stop after stop */
228    if ( stop_on_id ) {
229        if (event_id != m_event_id){
230            /* we can't stop it is an old message */
231            return false;
232        }
233    }
234
235    for (auto dp_stream : m_active_nodes) {
236        CGenNodeStateless * node =dp_stream.m_node;
237        assert(node->get_port_id() == port_id);
238        if ( node->get_state() == CGenNodeStateless::ss_ACTIVE) {
239            node->mark_for_free();
240            m_active_streams--;
241            dp_stream.DeleteOnlyStream();
242
243        }else{
244            dp_stream.Delete(m_core);
245        }
246    }
247
248    /* active stream should be zero */
249    assert(m_active_streams==0);
250    m_active_nodes.clear();
251    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
252    return (true);
253}
254
255
256void TrexStatelessDpPerPort::create(CFlowGenListPerThread   *  core){
257    m_core=core;
258    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
259    m_port_id=0;
260    m_active_streams=0;
261    m_active_nodes.clear();
262}
263
264
265
266void
267TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) {
268    m_thread_id = thread_id;
269    m_core = core;
270    m_local_port_offset = 2*core->getDualPortId();
271
272    CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
273
274    m_ring_from_cp = cp_dp->getRingCpToDp(thread_id);
275    m_ring_to_cp   = cp_dp->getRingDpToCp(thread_id);
276
277    m_state = STATE_IDLE;
278
279    int i;
280    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
281        m_ports[i].create(core);
282    }
283}
284
285
286/* move to the next stream, old stream move to INACTIVE */
287bool TrexStatelessDpCore::set_stateless_next_node(CGenNodeStateless * cur_node,
288                                                  CGenNodeStateless * next_node){
289
290    assert(cur_node);
291    TrexStatelessDpPerPort * lp_port = get_port_db(cur_node->m_port_id);
292    bool schedule =false;
293
294    bool to_stop_port=false;
295
296    if (next_node == NULL) {
297        /* there is no next stream , reduce the number of active streams*/
298        to_stop_port = lp_port->update_number_of_active_streams(1);
299
300    }else{
301        uint8_t state=next_node->get_state();
302
303        /* can't be FREE_RESUSE */
304        assert(state != CGenNodeStateless::ss_FREE_RESUSE);
305        if (next_node->get_state() == CGenNodeStateless::ss_INACTIVE ) {
306
307            /* refill start info and scedule, no update in active streams  */
308            next_node->refresh();
309            schedule = true;
310
311        }else{
312            to_stop_port = lp_port->update_number_of_active_streams(1);
313        }
314    }
315
316    if ( to_stop_port ) {
317        /* call stop port explictly to move the state */
318        stop_traffic(cur_node->m_port_id,false,0);
319    }
320
321    return ( schedule );
322}
323
324
325
326/**
327 * in idle state loop, the processor most of the time sleeps
328 * and periodically checks for messages
329 *
330 * @author imarom (01-Nov-15)
331 */
332void
333TrexStatelessDpCore::idle_state_loop() {
334
335    while (m_state == STATE_IDLE) {
336        periodic_check_for_cp_messages();
337        delay(200);
338    }
339}
340
341
342
343void TrexStatelessDpCore::quit_main_loop(){
344    m_core->set_terminate_mode(true); /* mark it as terminated */
345    m_state = STATE_TERMINATE;
346    add_global_duration(0.0001);
347}
348
349
350/**
351 * scehduler runs when traffic exists
352 * it will return when no more transmitting is done on this
353 * core
354 *
355 * @author imarom (01-Nov-15)
356 */
357void
358TrexStatelessDpCore::start_scheduler() {
359    /* creates a maintenace job using the scheduler */
360    CGenNode * node_sync = m_core->create_node() ;
361    node_sync->m_type = CGenNode::FLOW_SYNC;
362    node_sync->m_time = m_core->m_cur_time_sec + SYNC_TIME_OUT;
363    m_core->m_node_gen.add_node(node_sync);
364
365    double old_offset = 0.0;
366    m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset);
367    /* bail out in case of terminate */
368    if (m_state != TrexStatelessDpCore::STATE_TERMINATE) {
369        m_core->m_node_gen.close_file(m_core);
370        m_state = STATE_IDLE; /* we exit from all ports and we have nothing to do, we move to IDLE state */
371    }
372}
373
374
375void
376TrexStatelessDpCore::run_once(){
377
378    idle_state_loop();
379
380    if ( m_state == STATE_TERMINATE ){
381        return;
382    }
383
384    start_scheduler();
385}
386
387
388
389
390void
391TrexStatelessDpCore::start() {
392
393    while (true) {
394        run_once();
395
396        if ( m_core->is_terminated_by_master() ) {
397            break;
398        }
399    }
400}
401
402/* only if both port are idle we can exit */
403void
404TrexStatelessDpCore::schedule_exit(){
405
406    CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
407
408    node->m_type = CGenNode::COMMAND;
409
410    node->m_cmd = new TrexStatelessDpCanQuit();
411
412    /* make sure it will be scheduled after the current node */
413    node->m_time = m_core->m_cur_time_sec ;
414
415    m_core->m_node_gen.add_node((CGenNode *)node);
416}
417
418
419void
420TrexStatelessDpCore::add_global_duration(double duration){
421    if (duration > 0.0) {
422        CGenNode *node = m_core->create_node() ;
423
424        node->m_type = CGenNode::EXIT_SCHED;
425
426        /* make sure it will be scheduled after the current node */
427        node->m_time = m_core->m_cur_time_sec + duration ;
428
429        m_core->m_node_gen.add_node(node);
430    }
431}
432
433/* add per port exit */
434void
435TrexStatelessDpCore::add_port_duration(double duration,
436                                       uint8_t port_id,
437                                       int event_id){
438    if (duration > 0.0) {
439        CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
440
441        node->m_type = CGenNode::COMMAND;
442
443        /* make sure it will be scheduled after the current node */
444        node->m_time = m_core->m_cur_time_sec + duration ;
445
446        TrexStatelessDpStop * cmd=new TrexStatelessDpStop(port_id);
447
448
449        /* test this */
450        m_core->m_non_active_nodes++;
451        cmd->set_core_ptr(m_core);
452        cmd->set_event_id(event_id);
453        cmd->set_wait_for_event_id(true);
454
455        node->m_cmd = cmd;
456
457        m_core->m_node_gen.add_node((CGenNode *)node);
458    }
459}
460
461
462void
463TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port,
464                                TrexStream * stream,
465                                TrexStreamsCompiledObj *comp) {
466
467    CGenNodeStateless *node = m_core->create_node_sl();
468
469    /* add periodic */
470    node->m_cache_mbuf=0;
471    node->m_type = CGenNode::STATELESS_PKT;
472
473    node->m_ref_stream_info  =   stream->clone_as_dp();
474
475    node->m_next_stream=0; /* will be fixed later */
476
477
478    if ( stream->m_self_start ){
479        /* if self start it is in active mode */
480        node->m_state =CGenNodeStateless::ss_ACTIVE;
481        lp_port->m_active_streams++;
482    }else{
483        node->m_state =CGenNodeStateless::ss_INACTIVE;
484    }
485
486    node->m_time = m_core->m_cur_time_sec + usec_to_sec(stream->m_isg_usec);
487
488    pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(stream->m_port_id);
489    node->m_flags = 0;
490    node->m_src_port =0;
491    node->m_original_packet_data_prefix = 0;
492
493
494
495    /* set socket id */
496    node->set_socket_id(m_core->m_node_gen.m_socket_id);
497
498    /* build a mbuf from a packet */
499
500    uint16_t pkt_size = stream->m_pkt.len;
501    const uint8_t *stream_pkt = stream->m_pkt.binary;
502
503    node->m_pause =0;
504    node->m_stream_type = stream->m_type;
505    node->m_next_time_offset = 1.0 / stream->get_pps();
506
507    /* stateless specific fields */
508    switch ( stream->m_type ) {
509
510    case TrexStream::stCONTINUOUS :
511        node->m_single_burst=0;
512        node->m_single_burst_refill=0;
513        node->m_multi_bursts=0;
514        node->m_ibg_sec                 = 0.0;
515        break;
516
517    case TrexStream::stSINGLE_BURST :
518        node->m_stream_type             = TrexStream::stMULTI_BURST;
519        node->m_single_burst            = stream->m_burst_total_pkts;
520        node->m_single_burst_refill     = stream->m_burst_total_pkts;
521        node->m_multi_bursts            = 1;  /* single burst in multi burst of 1 */
522        node->m_ibg_sec                 = 0.0;
523        break;
524
525    case TrexStream::stMULTI_BURST :
526        node->m_single_burst        = stream->m_burst_total_pkts;
527        node->m_single_burst_refill = stream->m_burst_total_pkts;
528        node->m_multi_bursts        = stream->m_num_bursts;
529        node->m_ibg_sec             = usec_to_sec( stream->m_ibg_usec );
530        break;
531    default:
532
533        assert(0);
534    };
535
536    node->m_port_id = stream->m_port_id;
537
538    /* set dir 0 or 1 client or server */
539    node->set_mbuf_cache_dir(dir);
540
541
542    if (stream->is_vm()  == false ) {
543        /* no VM */
544
545        node->m_vm_flow_var =  NULL;
546        node->m_vm_program  =  NULL;
547        node->m_vm_program_size =0;
548
549                /* allocate const mbuf */
550        rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size);
551        assert(m);
552
553        char *p = rte_pktmbuf_append(m, pkt_size);
554        assert(p);
555        /* copy the packet */
556        memcpy(p,stream_pkt,pkt_size);
557
558        /* TBD repace the mac if req we should add flag  */
559        m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*) p);
560
561        /* set the packet as a readonly */
562        node->set_cache_mbuf(m);
563
564        node->m_original_packet_data_prefix =0;
565    }else{
566
567        /* set the program */
568        TrexStream * local_mem_stream = node->m_ref_stream_info;
569
570        StreamVmDp  * lpDpVm = local_mem_stream->getDpVm();
571
572        node->m_vm_flow_var =  lpDpVm->clone_bss(); /* clone the flow var */
573        node->m_vm_program  =  lpDpVm->get_program(); /* same ref to the program */
574        node->m_vm_program_size =lpDpVm->get_program_size();
575
576
577        /* we need to copy the object */
578        if ( pkt_size > stream->m_vm_prefix_size  ) {
579            /* we need const packet */
580            uint16_t const_pkt_size  = pkt_size - stream->m_vm_prefix_size ;
581            rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), const_pkt_size );
582            assert(m);
583
584            char *p = rte_pktmbuf_append(m, const_pkt_size);
585            assert(p);
586
587            /* copy packet data */
588            memcpy(p,(stream_pkt+ stream->m_vm_prefix_size),const_pkt_size);
589
590            node->set_const_mbuf(m);
591        }
592
593
594        if (stream->m_vm_prefix_size > pkt_size ) {
595            stream->m_vm_prefix_size = pkt_size;
596        }
597        /* copy the headr */
598        uint16_t header_size = stream->m_vm_prefix_size;
599        assert(header_size);
600        node->alloc_prefix_header(header_size);
601        uint8_t *p=node->m_original_packet_data_prefix;
602        assert(p);
603
604        memcpy(p,stream_pkt , header_size);
605        /* TBD repace the mac if req we should add flag  */
606        m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, p);
607    }
608
609
610    CDpOneStream one_stream;
611
612    one_stream.m_dp_stream = node->m_ref_stream_info;
613    one_stream.m_node =node;
614
615    lp_port->m_active_nodes.push_back(one_stream);
616
617    /* schedule only if active */
618    if (node->m_state == CGenNodeStateless::ss_ACTIVE) {
619        m_core->m_node_gen.add_node((CGenNode *)node);
620    }
621}
622
623void
624TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj,
625                                   double duration,
626                                   int event_id) {
627
628
629    TrexStatelessDpPerPort * lp_port=get_port_db(obj->get_port_id());
630    lp_port->m_active_streams = 0;
631    lp_port->set_event_id(event_id);
632
633    /* no nodes in the list */
634    assert(lp_port->m_active_nodes.size()==0);
635
636    for (auto single_stream : obj->get_objects()) {
637        /* all commands should be for the same port */
638        assert(obj->get_port_id() == single_stream.m_stream->m_port_id);
639        add_stream(lp_port,single_stream.m_stream,obj);
640    }
641
642    uint32_t nodes = lp_port->m_active_nodes.size();
643    /* find next stream */
644    assert(nodes == obj->get_objects().size());
645
646    int cnt=0;
647
648    /* set the next_stream pointer  */
649    for (auto single_stream : obj->get_objects()) {
650
651        if (single_stream.m_stream->is_dp_next_stream() ) {
652            int stream_id = single_stream.m_stream->m_next_stream_id;
653            assert(stream_id<nodes);
654            /* point to the next stream , stream_id is fixed */
655            lp_port->m_active_nodes[cnt].m_node->m_next_stream = lp_port->m_active_nodes[stream_id].m_node ;
656        }
657        cnt++;
658    }
659
660    lp_port->m_state =TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
661    m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
662
663
664    if ( duration > 0.0 ){
665        add_port_duration( duration ,obj->get_port_id(),event_id );
666    }
667
668}
669
670
671bool TrexStatelessDpCore::are_all_ports_idle(){
672
673    bool res=true;
674    int i;
675    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
676        if ( m_ports[i].m_state != TrexStatelessDpPerPort::ppSTATE_IDLE ){
677            res=false;
678        }
679    }
680    return (res);
681}
682
683
684void
685TrexStatelessDpCore::resume_traffic(uint8_t port_id){
686
687    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
688
689    lp_port->resume_traffic(port_id);
690}
691
692
693void
694TrexStatelessDpCore::pause_traffic(uint8_t port_id){
695
696    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
697
698    lp_port->pause_traffic(port_id);
699}
700
701void
702TrexStatelessDpCore::update_traffic(uint8_t port_id, double factor) {
703
704    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
705
706    lp_port->update_traffic(port_id, factor);
707}
708
709
710void
711TrexStatelessDpCore::stop_traffic(uint8_t port_id,
712                                  bool stop_on_id,
713                                  int event_id) {
714    /* we cannot remove nodes not from the top of the queue so
715       for every active node - make sure next time
716       the scheduler invokes it, it will be free */
717
718    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
719
720    if ( lp_port->stop_traffic(port_id,stop_on_id,event_id) == false){
721        /* nothing to do ! already stopped */
722        //printf(" skip .. %f\n",m_core->m_cur_time_sec);
723        return;
724    }
725
726#if 0
727    if ( are_all_ports_idle() ) {
728        /* just a place holder if we will need to do somthing in that case */
729    }
730#endif
731
732    /* inform the control plane we stopped - this might be a async stop
733       (streams ended)
734     */
735    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
736    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
737                                                                   port_id,
738                                                                   TrexDpPortEvent::EVENT_STOP,
739                                                                   lp_port->get_event_id());
740    ring->Enqueue((CGenNode *)event_msg);
741
742}
743
744/**
745 * handle a message from CP to DP
746 *
747 */
748void
749TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) {
750    msg->handle(this);
751    delete msg;
752}
753
754