trex_stateless_dp_core.cpp revision 0901331f
1/*
2 Itay Marom
3 Hanoch Haim
4 Cisco Systems, Inc.
5*/
6
7/*
8Copyright (c) 2015-2015 Cisco Systems, Inc.
9
10Licensed under the Apache License, Version 2.0 (the "License");
11you may not use this file except in compliance with the License.
12You may obtain a copy of the License at
13
14    http://www.apache.org/licenses/LICENSE-2.0
15
16Unless required by applicable law or agreed to in writing, software
17distributed under the License is distributed on an "AS IS" BASIS,
18WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19See the License for the specific language governing permissions and
20limitations under the License.
21*/
22#include <trex_stateless_dp_core.h>
23#include <trex_stateless_messaging.h>
24#include <trex_streams_compiler.h>
25#include <trex_stream_node.h>
26#include <trex_stream.h>
27
28#include <bp_sim.h>
29
30
31void CDpOneStream::Delete(CFlowGenListPerThread   * core){
32    assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE);
33    core->free_node((CGenNode *)m_node);
34    delete m_dp_stream;
35    m_node=0;
36    m_dp_stream=0;
37}
38
39void CDpOneStream::DeleteOnlyStream(){
40    assert(m_dp_stream);
41    delete m_dp_stream;
42    m_dp_stream=0;
43}
44
45int CGenNodeStateless::get_stream_id(){
46    if (m_state ==CGenNodeStateless::ss_FREE_RESUSE) {
47        return (-1); // not valid
48    }
49    assert(m_ref_stream_info);
50    return ((int)m_ref_stream_info->m_stream_id);
51}
52
53
54void CGenNodeStateless::DumpHeader(FILE *fd){
55    fprintf(fd," pkt_id, time, port , action , state, stream_id , stype , m-burst# , burst# \n");
56
57}
58void CGenNodeStateless::Dump(FILE *fd){
59    fprintf(fd," %2.4f, %3lu, %s,%s, %3d, %s, %3lu, %3lu  \n",
60            m_time,
61            (ulong)m_port_id,
62            "s-pkt", //action
63            get_stream_state_str(m_state ).c_str(),
64            get_stream_id(),   //stream_id
65            TrexStream::get_stream_type_str(m_stream_type).c_str(), //stype
66            (ulong)m_multi_bursts,
67            (ulong)m_single_burst
68            );
69}
70
71
72void CGenNodeStateless::refresh(){
73
74    /* refill the stream info */
75    m_single_burst    = m_single_burst_refill;
76    m_multi_bursts    = m_ref_stream_info->m_num_bursts;
77    m_state           = CGenNodeStateless::ss_ACTIVE;
78}
79
80
81void CGenNodeCommand::free_command(){
82
83    assert(m_cmd);
84    m_cmd->on_node_remove();
85    delete m_cmd;
86}
87
88
89std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state){
90    std::string res;
91
92    switch (stream_state) {
93    case CGenNodeStateless::ss_FREE_RESUSE :
94         res="FREE    ";
95        break;
96    case CGenNodeStateless::ss_INACTIVE :
97        res="INACTIVE ";
98        break;
99    case CGenNodeStateless::ss_ACTIVE :
100        res="ACTIVE   ";
101        break;
102    default:
103        res="Unknow   ";
104    };
105    return(res);
106}
107
108
109rte_mbuf_t   * CGenNodeStateless::alloc_node_with_vm(){
110
111    rte_mbuf_t        * m;
112    /* alloc small packet buffer*/
113    uint16_t prefix_size = prefix_header_size();
114    m = CGlobalInfo::pktmbuf_alloc( get_socket_id(), prefix_size );
115    if (m==0) {
116        return (m);
117    }
118    /* TBD remove this, should handle cases of error */
119    assert(m);
120    char *p=rte_pktmbuf_append(m, prefix_size);
121    memcpy( p ,m_original_packet_data_prefix, prefix_size);
122
123
124    /* run the VM program */
125    StreamDPVmInstructionsRunner runner;
126
127    runner.run( m_vm_program_size,
128                m_vm_program,
129                m_vm_flow_var,
130                (uint8_t*)p);
131
132
133    rte_mbuf_t * m_const = get_const_mbuf();
134    if (  m_const != NULL) {
135        utl_rte_pktmbuf_add_after(m,m_const);
136    }
137    return (m);
138}
139
140
141void CGenNodeStateless::free_stl_node(){
142    /* if we have cache mbuf free it */
143    rte_mbuf_t * m=get_cache_mbuf();
144    if (m) {
145        rte_pktmbuf_free(m);
146        m_cache_mbuf=0;
147    }else{
148        /* non cache - must have an header */
149         m=get_const_mbuf();
150         if (m) {
151             rte_pktmbuf_free(m); /* reduce the ref counter */
152         }
153         free_prefix_header();
154    }
155    if (m_vm_flow_var) {
156        /* free flow var */
157        free(m_vm_flow_var);
158        m_vm_flow_var=0;
159    }
160}
161
162
163bool TrexStatelessDpPerPort::update_number_of_active_streams(uint32_t d){
164    m_active_streams-=d; /* reduce the number of streams */
165    if (m_active_streams == 0) {
166        return (true);
167    }
168    return (false);
169}
170
171bool TrexStatelessDpPerPort::resume_traffic(uint8_t port_id){
172
173    /* we are working with continues streams so we must be in transmit mode */
174    assert(m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE);
175
176    for (auto dp_stream : m_active_nodes) {
177        CGenNodeStateless * node =dp_stream.m_node;
178        assert(node->get_port_id() == port_id);
179        assert(node->is_pause() == true);
180        node->set_pause(false);
181    }
182    m_state = TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
183    return (true);
184}
185
186bool TrexStatelessDpPerPort::update_traffic(uint8_t port_id, double factor) {
187
188    assert( (m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING ||
189            (m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE)) );
190
191    for (auto dp_stream : m_active_nodes) {
192        CGenNodeStateless * node = dp_stream.m_node;
193        assert(node->get_port_id() == port_id);
194
195        node->update_rate(factor);
196    }
197
198    return (true);
199}
200
201bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){
202
203    /* we are working with continues streams so we must be in transmit mode */
204    assert(m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING);
205
206    for (auto dp_stream : m_active_nodes) {
207        CGenNodeStateless * node =dp_stream.m_node;
208        assert(node->get_port_id() == port_id);
209        assert(node->is_pause() == false);
210        node->set_pause(true);
211    }
212    m_state = TrexStatelessDpPerPort::ppSTATE_PAUSE;
213    return (true);
214}
215
216
217bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id,
218                                          bool stop_on_id,
219                                          int event_id){
220
221
222    if (m_state == TrexStatelessDpPerPort::ppSTATE_IDLE) {
223        assert(m_active_streams==0);
224        return false;
225    }
226
227    /* there could be race of stop after stop */
228    if ( stop_on_id ) {
229        if (event_id != m_event_id){
230            /* we can't stop it is an old message */
231            return false;
232        }
233    }
234
235    for (auto dp_stream : m_active_nodes) {
236        CGenNodeStateless * node =dp_stream.m_node;
237        assert(node->get_port_id() == port_id);
238        if ( node->get_state() == CGenNodeStateless::ss_ACTIVE) {
239            node->mark_for_free();
240            m_active_streams--;
241            dp_stream.DeleteOnlyStream();
242
243        }else{
244            dp_stream.Delete(m_core);
245        }
246    }
247
248    /* active stream should be zero */
249    assert(m_active_streams==0);
250    m_active_nodes.clear();
251    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
252    return (true);
253}
254
255
256void TrexStatelessDpPerPort::create(CFlowGenListPerThread   *  core){
257    m_core=core;
258    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
259    m_port_id=0;
260    m_active_streams=0;
261    m_active_nodes.clear();
262}
263
264
265
266void
267TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) {
268    m_thread_id = thread_id;
269    m_core = core;
270    m_local_port_offset = 2*core->getDualPortId();
271
272    CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
273
274    m_ring_from_cp = cp_dp->getRingCpToDp(thread_id);
275    m_ring_to_cp   = cp_dp->getRingDpToCp(thread_id);
276
277    m_state = STATE_IDLE;
278
279    int i;
280    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
281        m_ports[i].create(core);
282    }
283}
284
285
286/* move to the next stream, old stream move to INACTIVE */
287bool TrexStatelessDpCore::set_stateless_next_node(CGenNodeStateless * cur_node,
288                                                  CGenNodeStateless * next_node){
289
290    assert(cur_node);
291    TrexStatelessDpPerPort * lp_port = get_port_db(cur_node->m_port_id);
292    bool schedule =false;
293
294    bool to_stop_port=false;
295
296    if (next_node == NULL) {
297        /* there is no next stream , reduce the number of active streams*/
298        to_stop_port = lp_port->update_number_of_active_streams(1);
299
300    }else{
301        uint8_t state=next_node->get_state();
302
303        /* can't be FREE_RESUSE */
304        assert(state != CGenNodeStateless::ss_FREE_RESUSE);
305        if (next_node->get_state() == CGenNodeStateless::ss_INACTIVE ) {
306
307            /* refill start info and scedule, no update in active streams  */
308            next_node->refresh();
309            schedule = true;
310
311        }else{
312            to_stop_port = lp_port->update_number_of_active_streams(1);
313        }
314    }
315
316    if ( to_stop_port ) {
317        /* call stop port explictly to move the state */
318        stop_traffic(cur_node->m_port_id,false,0);
319    }
320
321    return ( schedule );
322}
323
324
325
326/**
327 * in idle state loop, the processor most of the time sleeps
328 * and periodically checks for messages
329 *
330 * @author imarom (01-Nov-15)
331 */
332void
333TrexStatelessDpCore::idle_state_loop() {
334
335    while (m_state == STATE_IDLE) {
336        periodic_check_for_cp_messages();
337        delay(200);
338    }
339}
340
341
342
343void TrexStatelessDpCore::quit_main_loop(){
344    m_core->set_terminate_mode(true); /* mark it as terminated */
345    m_state = STATE_TERMINATE;
346    add_global_duration(0.0001);
347}
348
349
350/**
351 * scehduler runs when traffic exists
352 * it will return when no more transmitting is done on this
353 * core
354 *
355 * @author imarom (01-Nov-15)
356 */
357void
358TrexStatelessDpCore::start_scheduler() {
359    /* creates a maintenace job using the scheduler */
360    CGenNode * node_sync = m_core->create_node() ;
361    node_sync->m_type = CGenNode::FLOW_SYNC;
362    node_sync->m_time = m_core->m_cur_time_sec + SYNC_TIME_OUT;
363    m_core->m_node_gen.add_node(node_sync);
364
365    double old_offset = 0.0;
366    m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset);
367    /* bail out in case of terminate */
368    if (m_state != TrexStatelessDpCore::STATE_TERMINATE) {
369        m_core->m_node_gen.close_file(m_core);
370        m_state = STATE_IDLE; /* we exit from all ports and we have nothing to do, we move to IDLE state */
371    }
372}
373
374
375void
376TrexStatelessDpCore::run_once(){
377
378    idle_state_loop();
379
380    if ( m_state == STATE_TERMINATE ){
381        return;
382    }
383
384    start_scheduler();
385}
386
387
388
389
390void
391TrexStatelessDpCore::start() {
392
393    while (true) {
394        run_once();
395
396        if ( m_core->is_terminated_by_master() ) {
397            break;
398        }
399    }
400}
401
402/* only if both port are idle we can exit */
403void
404TrexStatelessDpCore::schedule_exit(){
405
406    CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
407
408    node->m_type = CGenNode::COMMAND;
409
410    node->m_cmd = new TrexStatelessDpCanQuit();
411
412    /* make sure it will be scheduled after the current node */
413    node->m_time = m_core->m_cur_time_sec ;
414
415    m_core->m_node_gen.add_node((CGenNode *)node);
416}
417
418
419void
420TrexStatelessDpCore::add_global_duration(double duration){
421    if (duration > 0.0) {
422        CGenNode *node = m_core->create_node() ;
423
424        node->m_type = CGenNode::EXIT_SCHED;
425
426        /* make sure it will be scheduled after the current node */
427        node->m_time = m_core->m_cur_time_sec + duration ;
428
429        m_core->m_node_gen.add_node(node);
430    }
431}
432
433/* add per port exit */
434void
435TrexStatelessDpCore::add_port_duration(double duration,
436                                       uint8_t port_id,
437                                       int event_id){
438    if (duration > 0.0) {
439        CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
440
441        node->m_type = CGenNode::COMMAND;
442
443        /* make sure it will be scheduled after the current node */
444        node->m_time = m_core->m_cur_time_sec + duration ;
445
446        TrexStatelessDpStop * cmd=new TrexStatelessDpStop(port_id);
447
448
449        /* test this */
450        m_core->m_non_active_nodes++;
451        cmd->set_core_ptr(m_core);
452        cmd->set_event_id(event_id);
453        cmd->set_wait_for_event_id(true);
454
455        node->m_cmd = cmd;
456
457        m_core->m_node_gen.add_node((CGenNode *)node);
458    }
459}
460
461
462void
463TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port,
464                                TrexStream * stream,
465                                TrexStreamsCompiledObj *comp) {
466
467    CGenNodeStateless *node = m_core->create_node_sl();
468
469    /* add periodic */
470    node->m_cache_mbuf=0;
471    node->m_type = CGenNode::STATELESS_PKT;
472
473    /* clone the stream from control plane memory to DP memory */
474    node->m_ref_stream_info = stream->clone();
475
476    node->m_next_stream=0; /* will be fixed later */
477
478
479    if ( stream->m_self_start ){
480        /* if self start it is in active mode */
481        node->m_state =CGenNodeStateless::ss_ACTIVE;
482        lp_port->m_active_streams++;
483    }else{
484        node->m_state =CGenNodeStateless::ss_INACTIVE;
485    }
486
487    node->m_time = m_core->m_cur_time_sec + usec_to_sec(stream->m_isg_usec);
488
489    pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(stream->m_port_id);
490    node->m_flags = 0;
491    node->m_src_port =0;
492    node->m_original_packet_data_prefix = 0;
493
494
495
496    /* set socket id */
497    node->set_socket_id(m_core->m_node_gen.m_socket_id);
498
499    /* build a mbuf from a packet */
500
501    uint16_t pkt_size = stream->m_pkt.len;
502    const uint8_t *stream_pkt = stream->m_pkt.binary;
503
504    node->m_pause =0;
505    node->m_stream_type = stream->m_type;
506    node->m_next_time_offset = 1.0 / stream->get_pps();
507
508    /* stateless specific fields */
509    switch ( stream->m_type ) {
510
511    case TrexStream::stCONTINUOUS :
512        node->m_single_burst=0;
513        node->m_single_burst_refill=0;
514        node->m_multi_bursts=0;
515        node->m_ibg_sec                 = 0.0;
516        break;
517
518    case TrexStream::stSINGLE_BURST :
519        node->m_stream_type             = TrexStream::stMULTI_BURST;
520        node->m_single_burst            = stream->m_burst_total_pkts;
521        node->m_single_burst_refill     = stream->m_burst_total_pkts;
522        node->m_multi_bursts            = 1;  /* single burst in multi burst of 1 */
523        node->m_ibg_sec                 = 0.0;
524        break;
525
526    case TrexStream::stMULTI_BURST :
527        node->m_single_burst        = stream->m_burst_total_pkts;
528        node->m_single_burst_refill = stream->m_burst_total_pkts;
529        node->m_multi_bursts        = stream->m_num_bursts;
530        node->m_ibg_sec             = usec_to_sec( stream->m_ibg_usec );
531        break;
532    default:
533
534        assert(0);
535    };
536
537    node->m_port_id = stream->m_port_id;
538
539    /* set dir 0 or 1 client or server */
540    node->set_mbuf_cache_dir(dir);
541
542
543    if (node->m_ref_stream_info->getDpVm() == NULL) {
544        /* no VM */
545
546        node->m_vm_flow_var =  NULL;
547        node->m_vm_program  =  NULL;
548        node->m_vm_program_size =0;
549
550                /* allocate const mbuf */
551        rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size);
552        assert(m);
553
554        char *p = rte_pktmbuf_append(m, pkt_size);
555        assert(p);
556        /* copy the packet */
557        memcpy(p,stream_pkt,pkt_size);
558
559        /* TBD repace the mac if req we should add flag  */
560        m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*) p);
561
562        /* set the packet as a readonly */
563        node->set_cache_mbuf(m);
564
565        node->m_original_packet_data_prefix =0;
566    }else{
567
568        /* set the program */
569        TrexStream * local_mem_stream = node->m_ref_stream_info;
570
571        StreamVmDp  * lpDpVm = local_mem_stream->getDpVm();
572
573        node->m_vm_flow_var      = lpDpVm->clone_bss(); /* clone the flow var */
574        node->m_vm_program       = lpDpVm->get_program(); /* same ref to the program */
575        node->m_vm_program_size  = lpDpVm->get_program_size();
576
577
578        /* we need to copy the object */
579        if ( pkt_size > lpDpVm->get_prefix_size() ) {
580            /* we need const packet */
581            uint16_t const_pkt_size  = pkt_size - lpDpVm->get_prefix_size() ;
582            rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), const_pkt_size );
583            assert(m);
584
585            char *p = rte_pktmbuf_append(m, const_pkt_size);
586            assert(p);
587
588            /* copy packet data */
589            memcpy(p,(stream_pkt + lpDpVm->get_prefix_size()),const_pkt_size);
590
591            node->set_const_mbuf(m);
592        }
593
594
595        if (lpDpVm->get_prefix_size() > pkt_size ) {
596            lpDpVm->set_prefix_size(pkt_size);
597        }
598
599        /* copy the headr */
600        uint16_t header_size = lpDpVm->get_prefix_size();
601        assert(header_size);
602        node->alloc_prefix_header(header_size);
603        uint8_t *p=node->m_original_packet_data_prefix;
604        assert(p);
605
606        memcpy(p,stream_pkt , header_size);
607        /* TBD repace the mac if req we should add flag  */
608        m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, p);
609    }
610
611
612    CDpOneStream one_stream;
613
614    one_stream.m_dp_stream = node->m_ref_stream_info;
615    one_stream.m_node =node;
616
617    lp_port->m_active_nodes.push_back(one_stream);
618
619    /* schedule only if active */
620    if (node->m_state == CGenNodeStateless::ss_ACTIVE) {
621        m_core->m_node_gen.add_node((CGenNode *)node);
622    }
623}
624
625void
626TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj,
627                                   double duration,
628                                   int event_id) {
629
630
631    TrexStatelessDpPerPort * lp_port=get_port_db(obj->get_port_id());
632    lp_port->m_active_streams = 0;
633    lp_port->set_event_id(event_id);
634
635    /* no nodes in the list */
636    assert(lp_port->m_active_nodes.size()==0);
637
638    for (auto single_stream : obj->get_objects()) {
639        /* all commands should be for the same port */
640        assert(obj->get_port_id() == single_stream.m_stream->m_port_id);
641        add_stream(lp_port,single_stream.m_stream,obj);
642    }
643
644    uint32_t nodes = lp_port->m_active_nodes.size();
645    /* find next stream */
646    assert(nodes == obj->get_objects().size());
647
648    int cnt=0;
649
650    /* set the next_stream pointer  */
651    for (auto single_stream : obj->get_objects()) {
652
653        if (single_stream.m_stream->is_dp_next_stream() ) {
654            int stream_id = single_stream.m_stream->m_next_stream_id;
655            assert(stream_id<nodes);
656            /* point to the next stream , stream_id is fixed */
657            lp_port->m_active_nodes[cnt].m_node->m_next_stream = lp_port->m_active_nodes[stream_id].m_node ;
658        }
659        cnt++;
660    }
661
662    lp_port->m_state =TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
663    m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
664
665
666    if ( duration > 0.0 ){
667        add_port_duration( duration ,obj->get_port_id(),event_id );
668    }
669
670}
671
672
673bool TrexStatelessDpCore::are_all_ports_idle(){
674
675    bool res=true;
676    int i;
677    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
678        if ( m_ports[i].m_state != TrexStatelessDpPerPort::ppSTATE_IDLE ){
679            res=false;
680        }
681    }
682    return (res);
683}
684
685
686void
687TrexStatelessDpCore::resume_traffic(uint8_t port_id){
688
689    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
690
691    lp_port->resume_traffic(port_id);
692}
693
694
695void
696TrexStatelessDpCore::pause_traffic(uint8_t port_id){
697
698    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
699
700    lp_port->pause_traffic(port_id);
701}
702
703void
704TrexStatelessDpCore::update_traffic(uint8_t port_id, double factor) {
705
706    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
707
708    lp_port->update_traffic(port_id, factor);
709}
710
711
712void
713TrexStatelessDpCore::stop_traffic(uint8_t port_id,
714                                  bool stop_on_id,
715                                  int event_id) {
716    /* we cannot remove nodes not from the top of the queue so
717       for every active node - make sure next time
718       the scheduler invokes it, it will be free */
719
720    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
721
722    if ( lp_port->stop_traffic(port_id,stop_on_id,event_id) == false){
723        /* nothing to do ! already stopped */
724        //printf(" skip .. %f\n",m_core->m_cur_time_sec);
725        return;
726    }
727
728#if 0
729    if ( are_all_ports_idle() ) {
730        /* just a place holder if we will need to do somthing in that case */
731    }
732#endif
733
734    /* inform the control plane we stopped - this might be a async stop
735       (streams ended)
736     */
737    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
738    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
739                                                                   port_id,
740                                                                   TrexDpPortEvent::EVENT_STOP,
741                                                                   lp_port->get_event_id());
742    ring->Enqueue((CGenNode *)event_msg);
743
744}
745
746/**
747 * handle a message from CP to DP
748 *
749 */
750void
751TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) {
752    msg->handle(this);
753    delete msg;
754}
755
756