trex_stateless_dp_core.cpp revision 0e3021bb
1/*
2 Itay Marom
3 Hanoch Haim
4 Cisco Systems, Inc.
5*/
6
7/*
8Copyright (c) 2015-2015 Cisco Systems, Inc.
9
10Licensed under the Apache License, Version 2.0 (the "License");
11you may not use this file except in compliance with the License.
12You may obtain a copy of the License at
13
14    http://www.apache.org/licenses/LICENSE-2.0
15
16Unless required by applicable law or agreed to in writing, software
17distributed under the License is distributed on an "AS IS" BASIS,
18WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19See the License for the specific language governing permissions and
20limitations under the License.
21*/
22#include <trex_stateless_dp_core.h>
23#include <trex_stateless_messaging.h>
24#include <trex_streams_compiler.h>
25#include <trex_stream_node.h>
26#include <trex_stream.h>
27
28#include <bp_sim.h>
29
30
31void CDpOneStream::Delete(CFlowGenListPerThread   * core){
32    assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE);
33    core->free_node((CGenNode *)m_node);
34    delete m_dp_stream;
35    m_node=0;
36    m_dp_stream=0;
37}
38
39void CDpOneStream::DeleteOnlyStream(){
40    assert(m_dp_stream);
41    delete m_dp_stream;
42    m_dp_stream=0;
43}
44
45int CGenNodeStateless::get_stream_id(){
46    if (m_state ==CGenNodeStateless::ss_FREE_RESUSE) {
47        return (-1); // not valid
48    }
49    assert(m_ref_stream_info);
50    return ((int)m_ref_stream_info->m_stream_id);
51}
52
53
54void CGenNodeStateless::DumpHeader(FILE *fd){
55    fprintf(fd," pkt_id, time, port , action , state, stream_id , stype , m-burst# , burst# \n");
56
57}
58void CGenNodeStateless::Dump(FILE *fd){
59    fprintf(fd," %2.4f, %3lu, %s,%s, %3d, %s, %3lu, %3lu  \n",
60            m_time,
61            (ulong)m_port_id,
62            "s-pkt", //action
63            get_stream_state_str(m_state ).c_str(),
64            get_stream_id(),   //stream_id
65            TrexStream::get_stream_type_str(m_stream_type).c_str(), //stype
66            (ulong)m_multi_bursts,
67            (ulong)m_single_burst
68            );
69}
70
71
72/**
73 * this function called when stream restart after it was inactive
74 */
75void CGenNodeStateless::refresh(){
76
77    /* refill the stream info */
78    m_single_burst    = m_single_burst_refill;
79    m_multi_bursts    = m_ref_stream_info->m_num_bursts;
80    m_state           = CGenNodeStateless::ss_ACTIVE;
81}
82
83
84void CGenNodeCommand::free_command(){
85
86    assert(m_cmd);
87    m_cmd->on_node_remove();
88    delete m_cmd;
89}
90
91
92std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state){
93    std::string res;
94
95    switch (stream_state) {
96    case CGenNodeStateless::ss_FREE_RESUSE :
97         res="FREE    ";
98        break;
99    case CGenNodeStateless::ss_INACTIVE :
100        res="INACTIVE ";
101        break;
102    case CGenNodeStateless::ss_ACTIVE :
103        res="ACTIVE   ";
104        break;
105    default:
106        res="Unknow   ";
107    };
108    return(res);
109}
110
111
112rte_mbuf_t   * CGenNodeStateless::alloc_node_with_vm(){
113
114    rte_mbuf_t        * m;
115    /* alloc small packet buffer*/
116    uint16_t prefix_size = prefix_header_size();
117    m = CGlobalInfo::pktmbuf_alloc( get_socket_id(), prefix_size );
118    if (m==0) {
119        return (m);
120    }
121    /* TBD remove this, should handle cases of error */
122    assert(m);
123    char *p=rte_pktmbuf_append(m, prefix_size);
124    memcpy( p ,m_original_packet_data_prefix, prefix_size);
125
126
127    /* run the VM program */
128    StreamDPVmInstructionsRunner runner;
129
130    runner.run( (uint32_t*)m_vm_flow_var,
131                m_vm_program_size,
132                m_vm_program,
133                m_vm_flow_var,
134                (uint8_t*)p);
135
136
137    rte_mbuf_t * m_const = get_const_mbuf();
138    if (  m_const != NULL) {
139        utl_rte_pktmbuf_add_after(m,m_const);
140    }
141    return (m);
142}
143
144
145void CGenNodeStateless::free_stl_node(){
146    /* if we have cache mbuf free it */
147    rte_mbuf_t * m=get_cache_mbuf();
148    if (m) {
149        rte_pktmbuf_free(m);
150        m_cache_mbuf=0;
151    }else{
152        /* non cache - must have an header */
153         m=get_const_mbuf();
154         if (m) {
155             rte_pktmbuf_free(m); /* reduce the ref counter */
156         }
157         free_prefix_header();
158    }
159    if (m_vm_flow_var) {
160        /* free flow var */
161        free(m_vm_flow_var);
162        m_vm_flow_var=0;
163    }
164}
165
166
167bool TrexStatelessDpPerPort::update_number_of_active_streams(uint32_t d){
168    m_active_streams-=d; /* reduce the number of streams */
169    if (m_active_streams == 0) {
170        return (true);
171    }
172    return (false);
173}
174
175bool TrexStatelessDpPerPort::resume_traffic(uint8_t port_id){
176
177    /* we are working with continues streams so we must be in transmit mode */
178    assert(m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE);
179
180    for (auto dp_stream : m_active_nodes) {
181        CGenNodeStateless * node =dp_stream.m_node;
182        assert(node->get_port_id() == port_id);
183        assert(node->is_pause() == true);
184        node->set_pause(false);
185    }
186    m_state = TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
187    return (true);
188}
189
190bool TrexStatelessDpPerPort::update_traffic(uint8_t port_id, double factor) {
191
192    assert( (m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING ||
193            (m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE)) );
194
195    for (auto dp_stream : m_active_nodes) {
196        CGenNodeStateless * node = dp_stream.m_node;
197        assert(node->get_port_id() == port_id);
198
199        node->update_rate(factor);
200    }
201
202    return (true);
203}
204
205bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){
206
207    /* we are working with continues streams so we must be in transmit mode */
208    assert(m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING);
209
210    for (auto dp_stream : m_active_nodes) {
211        CGenNodeStateless * node =dp_stream.m_node;
212        assert(node->get_port_id() == port_id);
213        assert(node->is_pause() == false);
214        node->set_pause(true);
215    }
216    m_state = TrexStatelessDpPerPort::ppSTATE_PAUSE;
217    return (true);
218}
219
220
221bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id,
222                                          bool stop_on_id,
223                                          int event_id){
224
225
226    if (m_state == TrexStatelessDpPerPort::ppSTATE_IDLE) {
227        assert(m_active_streams==0);
228        return false;
229    }
230
231    /* there could be race of stop after stop */
232    if ( stop_on_id ) {
233        if (event_id != m_event_id){
234            /* we can't stop it is an old message */
235            return false;
236        }
237    }
238
239    for (auto dp_stream : m_active_nodes) {
240        CGenNodeStateless * node =dp_stream.m_node;
241        assert(node->get_port_id() == port_id);
242        if ( node->get_state() == CGenNodeStateless::ss_ACTIVE) {
243            node->mark_for_free();
244            m_active_streams--;
245            dp_stream.DeleteOnlyStream();
246
247        }else{
248            dp_stream.Delete(m_core);
249        }
250    }
251
252    /* active stream should be zero */
253    assert(m_active_streams==0);
254    m_active_nodes.clear();
255    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
256    return (true);
257}
258
259
260void TrexStatelessDpPerPort::create(CFlowGenListPerThread   *  core){
261    m_core=core;
262    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
263    m_port_id=0;
264    m_active_streams=0;
265    m_active_nodes.clear();
266}
267
268
269
270void
271TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) {
272    m_thread_id = thread_id;
273    m_core = core;
274    m_local_port_offset = 2*core->getDualPortId();
275
276    CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
277
278    m_ring_from_cp = cp_dp->getRingCpToDp(thread_id);
279    m_ring_to_cp   = cp_dp->getRingDpToCp(thread_id);
280
281    m_state = STATE_IDLE;
282
283    int i;
284    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
285        m_ports[i].create(core);
286    }
287}
288
289
290/* move to the next stream, old stream move to INACTIVE */
291bool TrexStatelessDpCore::set_stateless_next_node(CGenNodeStateless * cur_node,
292                                                  CGenNodeStateless * next_node){
293
294    assert(cur_node);
295    TrexStatelessDpPerPort * lp_port = get_port_db(cur_node->m_port_id);
296    bool schedule =false;
297
298    bool to_stop_port=false;
299
300    if (next_node == NULL) {
301        /* there is no next stream , reduce the number of active streams*/
302        to_stop_port = lp_port->update_number_of_active_streams(1);
303
304    }else{
305        uint8_t state=next_node->get_state();
306
307        /* can't be FREE_RESUSE */
308        assert(state != CGenNodeStateless::ss_FREE_RESUSE);
309        if (state == CGenNodeStateless::ss_INACTIVE ) {
310
311            /* refill start info and scedule, no update in active streams  */
312            next_node->refresh();
313            schedule = true;
314
315        }else{
316            to_stop_port = lp_port->update_number_of_active_streams(1);
317        }
318    }
319
320    if ( to_stop_port ) {
321        /* call stop port explictly to move the state */
322        stop_traffic(cur_node->m_port_id,false,0);
323    }
324
325    return ( schedule );
326}
327
328
329
330/**
331 * in idle state loop, the processor most of the time sleeps
332 * and periodically checks for messages
333 *
334 * @author imarom (01-Nov-15)
335 */
336void
337TrexStatelessDpCore::idle_state_loop() {
338
339    while (m_state == STATE_IDLE) {
340        periodic_check_for_cp_messages();
341        delay(200);
342    }
343}
344
345
346
347void TrexStatelessDpCore::quit_main_loop(){
348    m_core->set_terminate_mode(true); /* mark it as terminated */
349    m_state = STATE_TERMINATE;
350    add_global_duration(0.0001);
351}
352
353
354/**
355 * scehduler runs when traffic exists
356 * it will return when no more transmitting is done on this
357 * core
358 *
359 * @author imarom (01-Nov-15)
360 */
361void
362TrexStatelessDpCore::start_scheduler() {
363    /* creates a maintenace job using the scheduler */
364    CGenNode * node_sync = m_core->create_node() ;
365    node_sync->m_type = CGenNode::FLOW_SYNC;
366    node_sync->m_time = m_core->m_cur_time_sec + SYNC_TIME_OUT;
367    m_core->m_node_gen.add_node(node_sync);
368
369    double old_offset = 0.0;
370    m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset);
371    /* bail out in case of terminate */
372    if (m_state != TrexStatelessDpCore::STATE_TERMINATE) {
373        m_core->m_node_gen.close_file(m_core);
374        m_state = STATE_IDLE; /* we exit from all ports and we have nothing to do, we move to IDLE state */
375    }
376}
377
378
379void
380TrexStatelessDpCore::run_once(){
381
382    idle_state_loop();
383
384    if ( m_state == STATE_TERMINATE ){
385        return;
386    }
387
388    start_scheduler();
389}
390
391
392
393
394void
395TrexStatelessDpCore::start() {
396
397    while (true) {
398        run_once();
399
400        if ( m_core->is_terminated_by_master() ) {
401            break;
402        }
403    }
404}
405
406/* only if both port are idle we can exit */
407void
408TrexStatelessDpCore::schedule_exit(){
409
410    CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
411
412    node->m_type = CGenNode::COMMAND;
413
414    node->m_cmd = new TrexStatelessDpCanQuit();
415
416    /* make sure it will be scheduled after the current node */
417    node->m_time = m_core->m_cur_time_sec ;
418
419    m_core->m_node_gen.add_node((CGenNode *)node);
420}
421
422
423void
424TrexStatelessDpCore::add_global_duration(double duration){
425    if (duration > 0.0) {
426        CGenNode *node = m_core->create_node() ;
427
428        node->m_type = CGenNode::EXIT_SCHED;
429
430        /* make sure it will be scheduled after the current node */
431        node->m_time = m_core->m_cur_time_sec + duration ;
432
433        m_core->m_node_gen.add_node(node);
434    }
435}
436
437/* add per port exit */
438void
439TrexStatelessDpCore::add_port_duration(double duration,
440                                       uint8_t port_id,
441                                       int event_id){
442    if (duration > 0.0) {
443        CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
444
445        node->m_type = CGenNode::COMMAND;
446
447        /* make sure it will be scheduled after the current node */
448        node->m_time = m_core->m_cur_time_sec + duration ;
449
450        TrexStatelessDpStop * cmd=new TrexStatelessDpStop(port_id);
451
452
453        /* test this */
454        m_core->m_non_active_nodes++;
455        cmd->set_core_ptr(m_core);
456        cmd->set_event_id(event_id);
457        cmd->set_wait_for_event_id(true);
458
459        node->m_cmd = cmd;
460
461        m_core->m_node_gen.add_node((CGenNode *)node);
462    }
463}
464
465
466void
467TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port,
468                                TrexStream * stream,
469                                TrexStreamsCompiledObj *comp) {
470
471    CGenNodeStateless *node = m_core->create_node_sl();
472
473    /* add periodic */
474    node->m_cache_mbuf=0;
475    node->m_type = CGenNode::STATELESS_PKT;
476
477    node->m_ref_stream_info  =   stream->clone_as_dp();
478
479    node->m_next_stream=0; /* will be fixed later */
480
481
482    if ( stream->m_self_start ){
483        /* if self start it is in active mode */
484        node->m_state =CGenNodeStateless::ss_ACTIVE;
485        lp_port->m_active_streams++;
486    }else{
487        node->m_state =CGenNodeStateless::ss_INACTIVE;
488    }
489
490    node->m_time = m_core->m_cur_time_sec + usec_to_sec(stream->m_isg_usec);
491
492    pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(stream->m_port_id);
493    node->m_flags = 0;
494    node->m_src_port =0;
495    node->m_original_packet_data_prefix = 0;
496
497
498
499    /* set socket id */
500    node->set_socket_id(m_core->m_node_gen.m_socket_id);
501
502    /* build a mbuf from a packet */
503
504    uint16_t pkt_size = stream->m_pkt.len;
505    const uint8_t *stream_pkt = stream->m_pkt.binary;
506
507    node->m_pause =0;
508    node->m_stream_type = stream->m_type;
509    node->m_next_time_offset = 1.0 / stream->get_pps();
510
511    /* stateless specific fields */
512    switch ( stream->m_type ) {
513
514    case TrexStream::stCONTINUOUS :
515        node->m_single_burst=0;
516        node->m_single_burst_refill=0;
517        node->m_multi_bursts=0;
518        node->m_ibg_sec                 = 0.0;
519        break;
520
521    case TrexStream::stSINGLE_BURST :
522        node->m_stream_type             = TrexStream::stMULTI_BURST;
523        node->m_single_burst            = stream->m_burst_total_pkts;
524        node->m_single_burst_refill     = stream->m_burst_total_pkts;
525        node->m_multi_bursts            = 1;  /* single burst in multi burst of 1 */
526        node->m_ibg_sec                 = 0.0;
527        break;
528
529    case TrexStream::stMULTI_BURST :
530        node->m_single_burst        = stream->m_burst_total_pkts;
531        node->m_single_burst_refill = stream->m_burst_total_pkts;
532        node->m_multi_bursts        = stream->m_num_bursts;
533        node->m_ibg_sec             = usec_to_sec( stream->m_ibg_usec );
534        break;
535    default:
536
537        assert(0);
538    };
539
540    node->m_port_id = stream->m_port_id;
541
542    /* set dir 0 or 1 client or server */
543    node->set_mbuf_cache_dir(dir);
544
545
546    if (stream->is_vm()  == false ) {
547        /* no VM */
548
549        node->m_vm_flow_var =  NULL;
550        node->m_vm_program  =  NULL;
551        node->m_vm_program_size =0;
552
553                /* allocate const mbuf */
554        rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size);
555        assert(m);
556
557        char *p = rte_pktmbuf_append(m, pkt_size);
558        assert(p);
559        /* copy the packet */
560        memcpy(p,stream_pkt,pkt_size);
561
562        /* TBD repace the mac if req we should add flag  */
563        m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*) p);
564
565        /* set the packet as a readonly */
566        node->set_cache_mbuf(m);
567
568        node->m_original_packet_data_prefix =0;
569    }else{
570
571        /* set the program */
572        TrexStream * local_mem_stream = node->m_ref_stream_info;
573
574        StreamVmDp  * lpDpVm = local_mem_stream->getDpVm();
575
576        node->m_vm_flow_var =  lpDpVm->clone_bss(); /* clone the flow var */
577        node->m_vm_program  =  lpDpVm->get_program(); /* same ref to the program */
578        node->m_vm_program_size =lpDpVm->get_program_size();
579
580
581        /* we need to copy the object */
582        if ( pkt_size > stream->m_vm_prefix_size  ) {
583            /* we need const packet */
584            uint16_t const_pkt_size  = pkt_size - stream->m_vm_prefix_size ;
585            rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), const_pkt_size );
586            assert(m);
587
588            char *p = rte_pktmbuf_append(m, const_pkt_size);
589            assert(p);
590
591            /* copy packet data */
592            memcpy(p,(stream_pkt+ stream->m_vm_prefix_size),const_pkt_size);
593
594            node->set_const_mbuf(m);
595        }
596
597
598        if (stream->m_vm_prefix_size > pkt_size ) {
599            stream->m_vm_prefix_size = pkt_size;
600        }
601        /* copy the headr */
602        uint16_t header_size = stream->m_vm_prefix_size;
603        assert(header_size);
604        node->alloc_prefix_header(header_size);
605        uint8_t *p=node->m_original_packet_data_prefix;
606        assert(p);
607
608        memcpy(p,stream_pkt , header_size);
609        /* TBD repace the mac if req we should add flag  */
610        m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, p);
611    }
612
613
614    CDpOneStream one_stream;
615
616    one_stream.m_dp_stream = node->m_ref_stream_info;
617    one_stream.m_node =node;
618
619    lp_port->m_active_nodes.push_back(one_stream);
620
621    /* schedule only if active */
622    if (node->m_state == CGenNodeStateless::ss_ACTIVE) {
623        m_core->m_node_gen.add_node((CGenNode *)node);
624    }
625}
626
627void
628TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj,
629                                   double duration,
630                                   int event_id) {
631
632
633    TrexStatelessDpPerPort * lp_port=get_port_db(obj->get_port_id());
634    lp_port->m_active_streams = 0;
635    lp_port->set_event_id(event_id);
636
637    /* no nodes in the list */
638    assert(lp_port->m_active_nodes.size()==0);
639
640    for (auto single_stream : obj->get_objects()) {
641        /* all commands should be for the same port */
642        assert(obj->get_port_id() == single_stream.m_stream->m_port_id);
643        add_stream(lp_port,single_stream.m_stream,obj);
644    }
645
646    uint32_t nodes = lp_port->m_active_nodes.size();
647    /* find next stream */
648    assert(nodes == obj->get_objects().size());
649
650    int cnt=0;
651
652    /* set the next_stream pointer  */
653    for (auto single_stream : obj->get_objects()) {
654
655        if (single_stream.m_stream->is_dp_next_stream() ) {
656            int stream_id = single_stream.m_stream->m_next_stream_id;
657            assert(stream_id<nodes);
658            /* point to the next stream , stream_id is fixed */
659            lp_port->m_active_nodes[cnt].m_node->m_next_stream = lp_port->m_active_nodes[stream_id].m_node ;
660        }
661        cnt++;
662    }
663
664    lp_port->m_state =TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
665    m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
666
667
668    if ( duration > 0.0 ){
669        add_port_duration( duration ,obj->get_port_id(),event_id );
670    }
671
672}
673
674
675bool TrexStatelessDpCore::are_all_ports_idle(){
676
677    bool res=true;
678    int i;
679    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
680        if ( m_ports[i].m_state != TrexStatelessDpPerPort::ppSTATE_IDLE ){
681            res=false;
682        }
683    }
684    return (res);
685}
686
687
688void
689TrexStatelessDpCore::resume_traffic(uint8_t port_id){
690
691    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
692
693    lp_port->resume_traffic(port_id);
694}
695
696
697void
698TrexStatelessDpCore::pause_traffic(uint8_t port_id){
699
700    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
701
702    lp_port->pause_traffic(port_id);
703}
704
705void
706TrexStatelessDpCore::update_traffic(uint8_t port_id, double factor) {
707
708    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
709
710    lp_port->update_traffic(port_id, factor);
711}
712
713
714void
715TrexStatelessDpCore::stop_traffic(uint8_t port_id,
716                                  bool stop_on_id,
717                                  int event_id) {
718    /* we cannot remove nodes not from the top of the queue so
719       for every active node - make sure next time
720       the scheduler invokes it, it will be free */
721
722    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
723
724    if ( lp_port->stop_traffic(port_id,stop_on_id,event_id) == false){
725        /* nothing to do ! already stopped */
726        //printf(" skip .. %f\n",m_core->m_cur_time_sec);
727        return;
728    }
729
730#if 0
731    if ( are_all_ports_idle() ) {
732        /* just a place holder if we will need to do somthing in that case */
733    }
734#endif
735
736    /* inform the control plane we stopped - this might be a async stop
737       (streams ended)
738     */
739    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
740    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
741                                                                   port_id,
742                                                                   TrexDpPortEvent::EVENT_STOP,
743                                                                   lp_port->get_event_id());
744    ring->Enqueue((CGenNode *)event_msg);
745
746}
747
748/**
749 * handle a message from CP to DP
750 *
751 */
752void
753TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) {
754    msg->handle(this);
755    delete msg;
756}
757
758