trex_stateless_dp_core.cpp revision 0bde21ac
1/*
2 Itay Marom
3 Hanoch Haim
4 Cisco Systems, Inc.
5*/
6
7/*
8Copyright (c) 2015-2015 Cisco Systems, Inc.
9
10Licensed under the Apache License, Version 2.0 (the "License");
11you may not use this file except in compliance with the License.
12You may obtain a copy of the License at
13
14    http://www.apache.org/licenses/LICENSE-2.0
15
16Unless required by applicable law or agreed to in writing, software
17distributed under the License is distributed on an "AS IS" BASIS,
18WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19See the License for the specific language governing permissions and
20limitations under the License.
21*/
22#include <trex_stateless_dp_core.h>
23#include <trex_stateless_messaging.h>
24#include <trex_streams_compiler.h>
25#include <trex_stream_node.h>
26#include <trex_stream.h>
27
28#include <bp_sim.h>
29
30
31void CDpOneStream::Delete(CFlowGenListPerThread   * core){
32    assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE);
33    core->free_node((CGenNode *)m_node);
34    delete m_dp_stream;
35    m_node=0;
36    m_dp_stream=0;
37}
38
39void CDpOneStream::DeleteOnlyStream(){
40    assert(m_dp_stream);
41    delete m_dp_stream;
42    m_dp_stream=0;
43}
44
45int CGenNodeStateless::get_stream_id(){
46    if (m_state ==CGenNodeStateless::ss_FREE_RESUSE) {
47        return (-1); // not valid
48    }
49    assert(m_ref_stream_info);
50    return ((int)m_ref_stream_info->m_stream_id);
51}
52
53
54void CGenNodeStateless::DumpHeader(FILE *fd){
55    fprintf(fd," pkt_id, time, port , action , state, stream_id , stype , m-burst# , burst# \n");
56
57}
58void CGenNodeStateless::Dump(FILE *fd){
59    fprintf(fd," %2.4f, %3lu, %s,%s, %3d, %s, %3lu, %3lu  \n",
60            m_time,
61            (ulong)m_port_id,
62            "s-pkt", //action
63            get_stream_state_str(m_state ).c_str(),
64            get_stream_id(),   //stream_id
65            TrexStream::get_stream_type_str(m_stream_type).c_str(), //stype
66            (ulong)m_multi_bursts,
67            (ulong)m_single_burst
68            );
69}
70
71
72void CGenNodeStateless::refresh(){
73
74    /* refill the stream info */
75    m_single_burst    = m_single_burst_refill;
76    m_multi_bursts    = m_ref_stream_info->m_num_bursts;
77    m_state           = CGenNodeStateless::ss_ACTIVE;
78}
79
80
81void CGenNodeCommand::free_command(){
82
83    assert(m_cmd);
84    m_cmd->on_node_remove();
85    delete m_cmd;
86}
87
88
89std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state){
90    std::string res;
91
92    switch (stream_state) {
93    case CGenNodeStateless::ss_FREE_RESUSE :
94         res="FREE    ";
95        break;
96    case CGenNodeStateless::ss_INACTIVE :
97        res="INACTIVE ";
98        break;
99    case CGenNodeStateless::ss_ACTIVE :
100        res="ACTIVE   ";
101        break;
102    default:
103        res="Unknow   ";
104    };
105    return(res);
106}
107
108
109rte_mbuf_t   * CGenNodeStateless::alloc_node_with_vm(){
110
111    rte_mbuf_t        * m;
112    /* alloc small packet buffer*/
113    uint16_t prefix_size = prefix_header_size();
114    m = CGlobalInfo::pktmbuf_alloc( get_socket_id(), prefix_size );
115    if (m==0) {
116        return (m);
117    }
118    /* TBD remove this, should handle cases of error */
119    assert(m);
120    char *p=rte_pktmbuf_append(m, prefix_size);
121    memcpy( p ,m_original_packet_data_prefix, prefix_size);
122
123
124    /* run the VM program */
125    StreamDPVmInstructionsRunner runner;
126
127    runner.run( m_vm_program_size,
128                m_vm_program,
129                m_vm_flow_var,
130                (uint8_t*)p);
131
132
133    rte_mbuf_t * m_const = get_const_mbuf();
134    if (  m_const != NULL) {
135        utl_rte_pktmbuf_add_after(m,m_const);
136    }
137    return (m);
138}
139
140
141void CGenNodeStateless::free_stl_node(){
142    /* if we have cache mbuf free it */
143    rte_mbuf_t * m=get_cache_mbuf();
144    if (m) {
145        rte_pktmbuf_free(m);
146        m_cache_mbuf=0;
147    }else{
148        /* non cache - must have an header */
149         m=get_const_mbuf();
150         if (m) {
151             rte_pktmbuf_free(m); /* reduce the ref counter */
152         }
153         free_prefix_header();
154    }
155    if (m_vm_flow_var) {
156        /* free flow var */
157        free(m_vm_flow_var);
158        m_vm_flow_var=0;
159    }
160}
161
162
163bool TrexStatelessDpPerPort::update_number_of_active_streams(uint32_t d){
164    m_active_streams-=d; /* reduce the number of streams */
165    if (m_active_streams == 0) {
166        return (true);
167    }
168    return (false);
169}
170
171bool TrexStatelessDpPerPort::resume_traffic(uint8_t port_id){
172
173    /* we are working with continues streams so we must be in transmit mode */
174    assert(m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE);
175
176    for (auto dp_stream : m_active_nodes) {
177        CGenNodeStateless * node =dp_stream.m_node;
178        assert(node->get_port_id() == port_id);
179        assert(node->is_pause() == true);
180        node->set_pause(false);
181    }
182    m_state = TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
183    return (true);
184}
185
186bool TrexStatelessDpPerPort::update_traffic(uint8_t port_id, double factor) {
187
188    assert( (m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING ||
189            (m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE)) );
190
191    for (auto dp_stream : m_active_nodes) {
192        CGenNodeStateless * node = dp_stream.m_node;
193        assert(node->get_port_id() == port_id);
194
195        node->update_rate(factor);
196    }
197
198    return (true);
199}
200
201bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){
202
203    /* we are working with continues streams so we must be in transmit mode */
204    assert(m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING);
205
206    for (auto dp_stream : m_active_nodes) {
207        CGenNodeStateless * node =dp_stream.m_node;
208        assert(node->get_port_id() == port_id);
209        assert(node->is_pause() == false);
210        node->set_pause(true);
211    }
212    m_state = TrexStatelessDpPerPort::ppSTATE_PAUSE;
213    return (true);
214}
215
216
217bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id,
218                                          bool stop_on_id,
219                                          int event_id){
220
221
222    if (m_state == TrexStatelessDpPerPort::ppSTATE_IDLE) {
223        assert(m_active_streams==0);
224        return false;
225    }
226
227    /* there could be race of stop after stop */
228    if ( stop_on_id ) {
229        if (event_id != m_event_id){
230            /* we can't stop it is an old message */
231            return false;
232        }
233    }
234
235    for (auto dp_stream : m_active_nodes) {
236        CGenNodeStateless * node =dp_stream.m_node;
237        assert(node->get_port_id() == port_id);
238        if ( node->get_state() == CGenNodeStateless::ss_ACTIVE) {
239            node->mark_for_free();
240            m_active_streams--;
241            dp_stream.DeleteOnlyStream();
242
243        }else{
244            dp_stream.Delete(m_core);
245        }
246    }
247
248    /* active stream should be zero */
249    assert(m_active_streams==0);
250    m_active_nodes.clear();
251    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
252    return (true);
253}
254
255
256void TrexStatelessDpPerPort::create(CFlowGenListPerThread   *  core){
257    m_core=core;
258    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
259    m_port_id=0;
260    m_active_streams=0;
261    m_active_nodes.clear();
262}
263
264
265
266void
267TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) {
268    m_thread_id = thread_id;
269    m_core = core;
270    m_local_port_offset = 2*core->getDualPortId();
271
272    CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
273
274    m_ring_from_cp = cp_dp->getRingCpToDp(thread_id);
275    m_ring_to_cp   = cp_dp->getRingDpToCp(thread_id);
276
277    m_state = STATE_IDLE;
278
279    int i;
280    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
281        m_ports[i].create(core);
282    }
283}
284
285
286/* move to the next stream, old stream move to INACTIVE */
287bool TrexStatelessDpCore::set_stateless_next_node(CGenNodeStateless * cur_node,
288                                                  CGenNodeStateless * next_node){
289
290    assert(cur_node);
291    TrexStatelessDpPerPort * lp_port = get_port_db(cur_node->m_port_id);
292    bool schedule =false;
293
294    bool to_stop_port=false;
295
296    if (next_node == NULL) {
297        /* there is no next stream , reduce the number of active streams*/
298        to_stop_port = lp_port->update_number_of_active_streams(1);
299
300    }else{
301        uint8_t state=next_node->get_state();
302
303        /* can't be FREE_RESUSE */
304        assert(state != CGenNodeStateless::ss_FREE_RESUSE);
305        if (next_node->get_state() == CGenNodeStateless::ss_INACTIVE ) {
306
307            /* refill start info and scedule, no update in active streams  */
308            next_node->refresh();
309            schedule = true;
310
311        }else{
312            to_stop_port = lp_port->update_number_of_active_streams(1);
313        }
314    }
315
316    if ( to_stop_port ) {
317        /* call stop port explictly to move the state */
318        stop_traffic(cur_node->m_port_id,false,0);
319    }
320
321    return ( schedule );
322}
323
324
325
326/**
327 * in idle state loop, the processor most of the time sleeps
328 * and periodically checks for messages
329 *
330 * @author imarom (01-Nov-15)
331 */
332void
333TrexStatelessDpCore::idle_state_loop() {
334
335    while (m_state == STATE_IDLE) {
336        periodic_check_for_cp_messages();
337        delay(200);
338    }
339}
340
341
342
343void TrexStatelessDpCore::quit_main_loop(){
344    m_core->set_terminate_mode(true); /* mark it as terminated */
345    m_state = STATE_TERMINATE;
346    add_global_duration(0.0001);
347}
348
349
350/**
351 * scehduler runs when traffic exists
352 * it will return when no more transmitting is done on this
353 * core
354 *
355 * @author imarom (01-Nov-15)
356 */
357void
358TrexStatelessDpCore::start_scheduler() {
359    /* creates a maintenace job using the scheduler */
360    CGenNode * node_sync = m_core->create_node() ;
361    node_sync->m_type = CGenNode::FLOW_SYNC;
362    node_sync->m_time = m_core->m_cur_time_sec + SYNC_TIME_OUT;
363    m_core->m_node_gen.add_node(node_sync);
364
365    double old_offset = 0.0;
366    m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset);
367    /* bail out in case of terminate */
368    if (m_state != TrexStatelessDpCore::STATE_TERMINATE) {
369        m_core->m_node_gen.close_file(m_core);
370        m_state = STATE_IDLE; /* we exit from all ports and we have nothing to do, we move to IDLE state */
371    }
372}
373
374
375void
376TrexStatelessDpCore::run_once(){
377
378    idle_state_loop();
379
380    if ( m_state == STATE_TERMINATE ){
381        return;
382    }
383
384    start_scheduler();
385}
386
387
388
389
390void
391TrexStatelessDpCore::start() {
392
393    while (true) {
394        run_once();
395
396        if ( m_core->is_terminated_by_master() ) {
397            break;
398        }
399    }
400}
401
402/* only if both port are idle we can exit */
403void
404TrexStatelessDpCore::schedule_exit(){
405
406    CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
407
408    node->m_type = CGenNode::COMMAND;
409
410    node->m_cmd = new TrexStatelessDpCanQuit();
411
412    /* make sure it will be scheduled after the current node */
413    node->m_time = m_core->m_cur_time_sec ;
414
415    m_core->m_node_gen.add_node((CGenNode *)node);
416}
417
418
419void
420TrexStatelessDpCore::add_global_duration(double duration){
421    if (duration > 0.0) {
422        CGenNode *node = m_core->create_node() ;
423
424        node->m_type = CGenNode::EXIT_SCHED;
425
426        /* make sure it will be scheduled after the current node */
427        node->m_time = m_core->m_cur_time_sec + duration ;
428
429        m_core->m_node_gen.add_node(node);
430    }
431}
432
433/* add per port exit */
434void
435TrexStatelessDpCore::add_port_duration(double duration,
436                                       uint8_t port_id,
437                                       int event_id){
438    if (duration > 0.0) {
439        CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
440
441        node->m_type = CGenNode::COMMAND;
442
443        /* make sure it will be scheduled after the current node */
444        node->m_time = m_core->m_cur_time_sec + duration ;
445
446        TrexStatelessDpStop * cmd=new TrexStatelessDpStop(port_id);
447
448
449        /* test this */
450        m_core->m_non_active_nodes++;
451        cmd->set_core_ptr(m_core);
452        cmd->set_event_id(event_id);
453        cmd->set_wait_for_event_id(true);
454
455        node->m_cmd = cmd;
456
457        m_core->m_node_gen.add_node((CGenNode *)node);
458    }
459}
460
461
462void
463TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port,
464                                TrexStream * stream,
465                                TrexStreamsCompiledObj *comp) {
466
467    CGenNodeStateless *node = m_core->create_node_sl();
468
469    /* add periodic */
470    node->m_cache_mbuf=0;
471    node->m_type = CGenNode::STATELESS_PKT;
472
473    /* clone the stream from control plane memory to DP memory */
474    node->m_ref_stream_info = stream->clone();
475    /* no need for this memory anymore on the control plane memory */
476    stream->release_dp_object();
477
478    node->m_next_stream=0; /* will be fixed later */
479
480
481    if ( stream->m_self_start ){
482        /* if self start it is in active mode */
483        node->m_state =CGenNodeStateless::ss_ACTIVE;
484        lp_port->m_active_streams++;
485    }else{
486        node->m_state =CGenNodeStateless::ss_INACTIVE;
487    }
488
489    node->m_time = m_core->m_cur_time_sec + usec_to_sec(stream->m_isg_usec);
490
491    pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(stream->m_port_id);
492    node->m_flags = 0;
493    node->m_src_port =0;
494    node->m_original_packet_data_prefix = 0;
495
496
497
498    /* set socket id */
499    node->set_socket_id(m_core->m_node_gen.m_socket_id);
500
501    /* build a mbuf from a packet */
502
503    uint16_t pkt_size = stream->m_pkt.len;
504    const uint8_t *stream_pkt = stream->m_pkt.binary;
505
506    node->m_pause =0;
507    node->m_stream_type = stream->m_type;
508    node->m_next_time_offset = 1.0 / stream->get_pps();
509
510    /* stateless specific fields */
511    switch ( stream->m_type ) {
512
513    case TrexStream::stCONTINUOUS :
514        node->m_single_burst=0;
515        node->m_single_burst_refill=0;
516        node->m_multi_bursts=0;
517        node->m_ibg_sec                 = 0.0;
518        break;
519
520    case TrexStream::stSINGLE_BURST :
521        node->m_stream_type             = TrexStream::stMULTI_BURST;
522        node->m_single_burst            = stream->m_burst_total_pkts;
523        node->m_single_burst_refill     = stream->m_burst_total_pkts;
524        node->m_multi_bursts            = 1;  /* single burst in multi burst of 1 */
525        node->m_ibg_sec                 = 0.0;
526        break;
527
528    case TrexStream::stMULTI_BURST :
529        node->m_single_burst        = stream->m_burst_total_pkts;
530        node->m_single_burst_refill = stream->m_burst_total_pkts;
531        node->m_multi_bursts        = stream->m_num_bursts;
532        node->m_ibg_sec             = usec_to_sec( stream->m_ibg_usec );
533        break;
534    default:
535
536        assert(0);
537    };
538
539    node->m_port_id = stream->m_port_id;
540
541    /* set dir 0 or 1 client or server */
542    node->set_mbuf_cache_dir(dir);
543
544
545    if (node->m_ref_stream_info->getDpVm() == NULL) {
546        /* no VM */
547
548        node->m_vm_flow_var =  NULL;
549        node->m_vm_program  =  NULL;
550        node->m_vm_program_size =0;
551
552                /* allocate const mbuf */
553        rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size);
554        assert(m);
555
556        char *p = rte_pktmbuf_append(m, pkt_size);
557        assert(p);
558        /* copy the packet */
559        memcpy(p,stream_pkt,pkt_size);
560
561        /* TBD repace the mac if req we should add flag  */
562        m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*) p);
563
564        /* set the packet as a readonly */
565        node->set_cache_mbuf(m);
566
567        node->m_original_packet_data_prefix =0;
568    }else{
569
570        /* set the program */
571        TrexStream * local_mem_stream = node->m_ref_stream_info;
572
573        StreamVmDp  * lpDpVm = local_mem_stream->getDpVm();
574
575        node->m_vm_flow_var      = lpDpVm->clone_bss(); /* clone the flow var */
576        node->m_vm_program       = lpDpVm->get_program(); /* same ref to the program */
577        node->m_vm_program_size  = lpDpVm->get_program_size();
578
579
580        /* we need to copy the object */
581        if ( pkt_size > lpDpVm->get_prefix_size() ) {
582            /* we need const packet */
583            uint16_t const_pkt_size  = pkt_size - lpDpVm->get_prefix_size() ;
584            rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), const_pkt_size );
585            assert(m);
586
587            char *p = rte_pktmbuf_append(m, const_pkt_size);
588            assert(p);
589
590            /* copy packet data */
591            memcpy(p,(stream_pkt + lpDpVm->get_prefix_size()),const_pkt_size);
592
593            node->set_const_mbuf(m);
594        }
595
596
597        if (lpDpVm->get_prefix_size() > pkt_size ) {
598            lpDpVm->set_prefix_size(pkt_size);
599        }
600
601        /* copy the headr */
602        uint16_t header_size = lpDpVm->get_prefix_size();
603        assert(header_size);
604        node->alloc_prefix_header(header_size);
605        uint8_t *p=node->m_original_packet_data_prefix;
606        assert(p);
607
608        memcpy(p,stream_pkt , header_size);
609        /* TBD repace the mac if req we should add flag  */
610        m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, p);
611    }
612
613
614    CDpOneStream one_stream;
615
616    one_stream.m_dp_stream = node->m_ref_stream_info;
617    one_stream.m_node =node;
618
619    lp_port->m_active_nodes.push_back(one_stream);
620
621    /* schedule only if active */
622    if (node->m_state == CGenNodeStateless::ss_ACTIVE) {
623        m_core->m_node_gen.add_node((CGenNode *)node);
624    }
625}
626
627void
628TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj,
629                                   double duration,
630                                   int event_id) {
631
632
633    TrexStatelessDpPerPort * lp_port=get_port_db(obj->get_port_id());
634    lp_port->m_active_streams = 0;
635    lp_port->set_event_id(event_id);
636
637    /* no nodes in the list */
638    assert(lp_port->m_active_nodes.size()==0);
639
640    for (auto single_stream : obj->get_objects()) {
641        /* all commands should be for the same port */
642        assert(obj->get_port_id() == single_stream.m_stream->m_port_id);
643        add_stream(lp_port,single_stream.m_stream,obj);
644    }
645
646    uint32_t nodes = lp_port->m_active_nodes.size();
647    /* find next stream */
648    assert(nodes == obj->get_objects().size());
649
650    int cnt=0;
651
652    /* set the next_stream pointer  */
653    for (auto single_stream : obj->get_objects()) {
654
655        if (single_stream.m_stream->is_dp_next_stream() ) {
656            int stream_id = single_stream.m_stream->m_next_stream_id;
657            assert(stream_id<nodes);
658            /* point to the next stream , stream_id is fixed */
659            lp_port->m_active_nodes[cnt].m_node->m_next_stream = lp_port->m_active_nodes[stream_id].m_node ;
660        }
661        cnt++;
662    }
663
664    lp_port->m_state =TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
665    m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
666
667
668    if ( duration > 0.0 ){
669        add_port_duration( duration ,obj->get_port_id(),event_id );
670    }
671
672}
673
674
675bool TrexStatelessDpCore::are_all_ports_idle(){
676
677    bool res=true;
678    int i;
679    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
680        if ( m_ports[i].m_state != TrexStatelessDpPerPort::ppSTATE_IDLE ){
681            res=false;
682        }
683    }
684    return (res);
685}
686
687
688void
689TrexStatelessDpCore::resume_traffic(uint8_t port_id){
690
691    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
692
693    lp_port->resume_traffic(port_id);
694}
695
696
697void
698TrexStatelessDpCore::pause_traffic(uint8_t port_id){
699
700    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
701
702    lp_port->pause_traffic(port_id);
703}
704
705void
706TrexStatelessDpCore::update_traffic(uint8_t port_id, double factor) {
707
708    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
709
710    lp_port->update_traffic(port_id, factor);
711}
712
713
714void
715TrexStatelessDpCore::stop_traffic(uint8_t port_id,
716                                  bool stop_on_id,
717                                  int event_id) {
718    /* we cannot remove nodes not from the top of the queue so
719       for every active node - make sure next time
720       the scheduler invokes it, it will be free */
721
722    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
723
724    if ( lp_port->stop_traffic(port_id,stop_on_id,event_id) == false){
725        /* nothing to do ! already stopped */
726        //printf(" skip .. %f\n",m_core->m_cur_time_sec);
727        return;
728    }
729
730#if 0
731    if ( are_all_ports_idle() ) {
732        /* just a place holder if we will need to do somthing in that case */
733    }
734#endif
735
736    /* inform the control plane we stopped - this might be a async stop
737       (streams ended)
738     */
739    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
740    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
741                                                                   port_id,
742                                                                   TrexDpPortEvent::EVENT_STOP,
743                                                                   lp_port->get_event_id());
744    ring->Enqueue((CGenNode *)event_msg);
745
746}
747
748/**
749 * handle a message from CP to DP
750 *
751 */
752void
753TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) {
754    msg->handle(this);
755    delete msg;
756}
757
758