trex_stateless_dp_core.cpp revision aae09645
1/*
2 Itay Marom
3 Hanoch Haim
4 Cisco Systems, Inc.
5*/
6
7/*
8Copyright (c) 2015-2015 Cisco Systems, Inc.
9
10Licensed under the Apache License, Version 2.0 (the "License");
11you may not use this file except in compliance with the License.
12You may obtain a copy of the License at
13
14    http://www.apache.org/licenses/LICENSE-2.0
15
16Unless required by applicable law or agreed to in writing, software
17distributed under the License is distributed on an "AS IS" BASIS,
18WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19See the License for the specific language governing permissions and
20limitations under the License.
21*/
22#include <trex_stateless_dp_core.h>
23#include <trex_stateless_messaging.h>
24#include <trex_streams_compiler.h>
25#include <trex_stream_node.h>
26#include <trex_stream.h>
27
28#include <bp_sim.h>
29
30
31void CDpOneStream::Delete(CFlowGenListPerThread   * core){
32    assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE);
33    core->free_node((CGenNode *)m_node);
34    delete m_dp_stream;
35    m_node=0;
36    m_dp_stream=0;
37}
38
39void CDpOneStream::DeleteOnlyStream(){
40    assert(m_dp_stream);
41    delete m_dp_stream;
42    m_dp_stream=0;
43}
44
45int CGenNodeStateless::get_stream_id(){
46    if (m_state ==CGenNodeStateless::ss_FREE_RESUSE) {
47        return (-1); // not valid
48    }
49    assert(m_ref_stream_info);
50    return ((int)m_ref_stream_info->m_stream_id);
51}
52
53
54void CGenNodeStateless::DumpHeader(FILE *fd){
55    fprintf(fd," pkt_id, time, port , action , state, stream_id , stype , m-burst# , burst# \n");
56
57}
58void CGenNodeStateless::Dump(FILE *fd){
59    fprintf(fd," %2.4f, %3lu, %s,%s, %3d, %s, %3lu, %3lu  \n",
60            m_time,
61            (ulong)m_port_id,
62            "s-pkt", //action
63            get_stream_state_str(m_state ).c_str(),
64            get_stream_id(),   //stream_id
65            TrexStream::get_stream_type_str(m_stream_type).c_str(), //stype
66            (ulong)m_multi_bursts,
67            (ulong)m_single_burst
68            );
69}
70
71
72void CGenNodeStateless::refresh(){
73
74    /* refill the stream info */
75    m_single_burst    = m_single_burst_refill;
76    m_multi_bursts    = m_ref_stream_info->m_num_bursts;
77    m_state           = CGenNodeStateless::ss_ACTIVE;
78}
79
80
81void CGenNodeCommand::free_command(){
82
83    assert(m_cmd);
84    m_cmd->on_node_remove();
85    delete m_cmd;
86}
87
88
89std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state){
90    std::string res;
91
92    switch (stream_state) {
93    case CGenNodeStateless::ss_FREE_RESUSE :
94         res="FREE    ";
95        break;
96    case CGenNodeStateless::ss_INACTIVE :
97        res="INACTIVE ";
98        break;
99    case CGenNodeStateless::ss_ACTIVE :
100        res="ACTIVE   ";
101        break;
102    default:
103        res="Unknow   ";
104    };
105    return(res);
106}
107
108
109rte_mbuf_t   * CGenNodeStateless::alloc_node_with_vm(){
110
111    rte_mbuf_t        * m;
112    /* alloc small packet buffer*/
113    uint16_t prefix_size = prefix_header_size();
114    m = CGlobalInfo::pktmbuf_alloc( get_socket_id(), prefix_size );
115    if (m==0) {
116        return (m);
117    }
118    /* TBD remove this, should handle cases of error */
119    assert(m);
120    char *p=rte_pktmbuf_append(m, prefix_size);
121    memcpy( p ,m_original_packet_data_prefix, prefix_size);
122
123    /* TBD run VM on the pointer p */
124
125    rte_mbuf_t * m_const = get_const_mbuf();
126    if (  m_const != NULL) {
127        utl_rte_pktmbuf_add_after(m,m_const);
128    }
129    return (m);
130}
131
132
133void CGenNodeStateless::free_stl_node(){
134    /* if we have cache mbuf free it */
135    rte_mbuf_t * m=get_cache_mbuf();
136    if (m) {
137        rte_pktmbuf_free(m);
138        m_cache_mbuf=0;
139    }else{
140        /* non cache - must have an header */
141         m=get_const_mbuf();
142         if (m) {
143             rte_pktmbuf_free(m); /* reduce the ref counter */
144         }
145         free_prefix_header();
146    }
147
148}
149
150
151bool TrexStatelessDpPerPort::update_number_of_active_streams(uint32_t d){
152    m_active_streams-=d; /* reduce the number of streams */
153    if (m_active_streams == 0) {
154        return (true);
155    }
156    return (false);
157}
158
159bool TrexStatelessDpPerPort::resume_traffic(uint8_t port_id){
160
161    /* we are working with continues streams so we must be in transmit mode */
162    assert(m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE);
163
164    for (auto dp_stream : m_active_nodes) {
165        CGenNodeStateless * node =dp_stream.m_node;
166        assert(node->get_port_id() == port_id);
167        assert(node->is_pause() == true);
168        node->set_pause(false);
169    }
170    m_state = TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
171    return (true);
172}
173
174bool TrexStatelessDpPerPort::update_traffic(uint8_t port_id, double factor) {
175
176    assert( (m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING ||
177            (m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE)) );
178
179    for (auto dp_stream : m_active_nodes) {
180        CGenNodeStateless * node = dp_stream.m_node;
181        assert(node->get_port_id() == port_id);
182
183        node->update_rate(factor);
184    }
185
186    return (true);
187}
188
189bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){
190
191    /* we are working with continues streams so we must be in transmit mode */
192    assert(m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING);
193
194    for (auto dp_stream : m_active_nodes) {
195        CGenNodeStateless * node =dp_stream.m_node;
196        assert(node->get_port_id() == port_id);
197        assert(node->is_pause() == false);
198        node->set_pause(true);
199    }
200    m_state = TrexStatelessDpPerPort::ppSTATE_PAUSE;
201    return (true);
202}
203
204
205bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id,
206                                          bool stop_on_id,
207                                          int event_id){
208
209
210    if (m_state == TrexStatelessDpPerPort::ppSTATE_IDLE) {
211        assert(m_active_streams==0);
212        return false;
213    }
214
215    /* there could be race of stop after stop */
216    if ( stop_on_id ) {
217        if (event_id != m_event_id){
218            /* we can't stop it is an old message */
219            return false;
220        }
221    }
222
223    for (auto dp_stream : m_active_nodes) {
224        CGenNodeStateless * node =dp_stream.m_node;
225        assert(node->get_port_id() == port_id);
226        if ( node->get_state() == CGenNodeStateless::ss_ACTIVE) {
227            node->mark_for_free();
228            m_active_streams--;
229            dp_stream.DeleteOnlyStream();
230
231        }else{
232            dp_stream.Delete(m_core);
233        }
234    }
235
236    /* active stream should be zero */
237    assert(m_active_streams==0);
238    m_active_nodes.clear();
239    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
240    return (true);
241}
242
243
244void TrexStatelessDpPerPort::create(CFlowGenListPerThread   *  core){
245    m_core=core;
246    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
247    m_port_id=0;
248    m_active_streams=0;
249    m_active_nodes.clear();
250}
251
252
253
254void
255TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) {
256    m_thread_id = thread_id;
257    m_core = core;
258    m_local_port_offset = 2*core->getDualPortId();
259
260    CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
261
262    m_ring_from_cp = cp_dp->getRingCpToDp(thread_id);
263    m_ring_to_cp   = cp_dp->getRingDpToCp(thread_id);
264
265    m_state = STATE_IDLE;
266
267    int i;
268    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
269        m_ports[i].create(core);
270    }
271}
272
273
274/* move to the next stream, old stream move to INACTIVE */
275bool TrexStatelessDpCore::set_stateless_next_node(CGenNodeStateless * cur_node,
276                                                  CGenNodeStateless * next_node){
277
278    assert(cur_node);
279    TrexStatelessDpPerPort * lp_port = get_port_db(cur_node->m_port_id);
280    bool schedule =false;
281
282    bool to_stop_port=false;
283
284    if (next_node == NULL) {
285        /* there is no next stream , reduce the number of active streams*/
286        to_stop_port = lp_port->update_number_of_active_streams(1);
287
288    }else{
289        uint8_t state=next_node->get_state();
290
291        /* can't be FREE_RESUSE */
292        assert(state != CGenNodeStateless::ss_FREE_RESUSE);
293        if (next_node->get_state() == CGenNodeStateless::ss_INACTIVE ) {
294
295            /* refill start info and scedule, no update in active streams  */
296            next_node->refresh();
297            schedule = true;
298
299        }else{
300            to_stop_port = lp_port->update_number_of_active_streams(1);
301        }
302    }
303
304    if ( to_stop_port ) {
305        /* call stop port explictly to move the state */
306        stop_traffic(cur_node->m_port_id,false,0);
307    }
308
309    return ( schedule );
310}
311
312
313
314/**
315 * in idle state loop, the processor most of the time sleeps
316 * and periodically checks for messages
317 *
318 * @author imarom (01-Nov-15)
319 */
320void
321TrexStatelessDpCore::idle_state_loop() {
322
323    while (m_state == STATE_IDLE) {
324        periodic_check_for_cp_messages();
325        delay(200);
326    }
327}
328
329
330
331void TrexStatelessDpCore::quit_main_loop(){
332    m_core->set_terminate_mode(true); /* mark it as terminated */
333    m_state = STATE_TERMINATE;
334    add_global_duration(0.0001);
335}
336
337
338/**
339 * scehduler runs when traffic exists
340 * it will return when no more transmitting is done on this
341 * core
342 *
343 * @author imarom (01-Nov-15)
344 */
345void
346TrexStatelessDpCore::start_scheduler() {
347    /* creates a maintenace job using the scheduler */
348    CGenNode * node_sync = m_core->create_node() ;
349    node_sync->m_type = CGenNode::FLOW_SYNC;
350    node_sync->m_time = m_core->m_cur_time_sec + SYNC_TIME_OUT;
351    m_core->m_node_gen.add_node(node_sync);
352
353    double old_offset = 0.0;
354    m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset);
355    /* bail out in case of terminate */
356    if (m_state != TrexStatelessDpCore::STATE_TERMINATE) {
357        m_core->m_node_gen.close_file(m_core);
358        m_state = STATE_IDLE; /* we exit from all ports and we have nothing to do, we move to IDLE state */
359    }
360}
361
362
363void
364TrexStatelessDpCore::run_once(){
365
366    idle_state_loop();
367
368    if ( m_state == STATE_TERMINATE ){
369        return;
370    }
371
372    start_scheduler();
373}
374
375
376
377
378void
379TrexStatelessDpCore::start() {
380
381    while (true) {
382        run_once();
383
384        if ( m_core->is_terminated_by_master() ) {
385            break;
386        }
387    }
388}
389
390/* only if both port are idle we can exit */
391void
392TrexStatelessDpCore::schedule_exit(){
393
394    CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
395
396    node->m_type = CGenNode::COMMAND;
397
398    node->m_cmd = new TrexStatelessDpCanQuit();
399
400    /* make sure it will be scheduled after the current node */
401    node->m_time = m_core->m_cur_time_sec ;
402
403    m_core->m_node_gen.add_node((CGenNode *)node);
404}
405
406
407void
408TrexStatelessDpCore::add_global_duration(double duration){
409    if (duration > 0.0) {
410        CGenNode *node = m_core->create_node() ;
411
412        node->m_type = CGenNode::EXIT_SCHED;
413
414        /* make sure it will be scheduled after the current node */
415        node->m_time = m_core->m_cur_time_sec + duration ;
416
417        m_core->m_node_gen.add_node(node);
418    }
419}
420
421/* add per port exit */
422void
423TrexStatelessDpCore::add_port_duration(double duration,
424                                       uint8_t port_id,
425                                       int event_id){
426    if (duration > 0.0) {
427        CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
428
429        node->m_type = CGenNode::COMMAND;
430
431        /* make sure it will be scheduled after the current node */
432        node->m_time = m_core->m_cur_time_sec + duration ;
433
434        TrexStatelessDpStop * cmd=new TrexStatelessDpStop(port_id);
435
436
437        /* test this */
438        m_core->m_non_active_nodes++;
439        cmd->set_core_ptr(m_core);
440        cmd->set_event_id(event_id);
441        cmd->set_wait_for_event_id(true);
442
443        node->m_cmd = cmd;
444
445        m_core->m_node_gen.add_node((CGenNode *)node);
446    }
447}
448
449
450void
451TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port,
452                                TrexStream * stream,
453                                TrexStreamsCompiledObj *comp) {
454
455    CGenNodeStateless *node = m_core->create_node_sl();
456
457    /* add periodic */
458    node->m_cache_mbuf=0;
459    node->m_type = CGenNode::STATELESS_PKT;
460
461    node->m_ref_stream_info  =   stream->clone_as_dp();
462
463    node->m_next_stream=0; /* will be fixed later */
464
465
466    if ( stream->m_self_start ){
467        /* if self start it is in active mode */
468        node->m_state =CGenNodeStateless::ss_ACTIVE;
469        lp_port->m_active_streams++;
470    }else{
471        node->m_state =CGenNodeStateless::ss_INACTIVE;
472    }
473
474    node->m_time = m_core->m_cur_time_sec + usec_to_sec(stream->m_isg_usec);
475
476    pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(stream->m_port_id);
477    node->m_flags = 0;
478    node->m_src_port =0;
479    node->m_original_packet_data_prefix = 0;
480
481
482
483    /* set socket id */
484    node->set_socket_id(m_core->m_node_gen.m_socket_id);
485
486    /* build a mbuf from a packet */
487
488    uint16_t pkt_size = stream->m_pkt.len;
489    const uint8_t *stream_pkt = stream->m_pkt.binary;
490
491    node->m_pause =0;
492    node->m_stream_type = stream->m_type;
493    node->m_next_time_offset = 1.0 / (stream->get_pps() * comp->get_multiplier());
494
495    /* stateless specific fields */
496    switch ( stream->m_type ) {
497
498    case TrexStream::stCONTINUOUS :
499        node->m_single_burst=0;
500        node->m_single_burst_refill=0;
501        node->m_multi_bursts=0;
502        node->m_ibg_sec                 = 0.0;
503        break;
504
505    case TrexStream::stSINGLE_BURST :
506        node->m_stream_type             = TrexStream::stMULTI_BURST;
507        node->m_single_burst            = stream->m_burst_total_pkts;
508        node->m_single_burst_refill     = stream->m_burst_total_pkts;
509        node->m_multi_bursts            = 1;  /* single burst in multi burst of 1 */
510        node->m_ibg_sec                 = 0.0;
511        break;
512
513    case TrexStream::stMULTI_BURST :
514        node->m_single_burst        = stream->m_burst_total_pkts;
515        node->m_single_burst_refill = stream->m_burst_total_pkts;
516        node->m_multi_bursts        = stream->m_num_bursts;
517        node->m_ibg_sec             = usec_to_sec( stream->m_ibg_usec );
518        break;
519    default:
520
521        assert(0);
522    };
523
524    node->m_port_id = stream->m_port_id;
525
526    /* set dir 0 or 1 client or server */
527    node->set_mbuf_cache_dir(dir);
528
529
530    if (stream->m_has_vm  == false ) {
531                /* allocate const mbuf */
532        rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size);
533        assert(m);
534
535        char *p = rte_pktmbuf_append(m, pkt_size);
536        assert(p);
537        /* copy the packet */
538        memcpy(p,stream_pkt,pkt_size);
539
540        /* TBD repace the mac if req we should add flag  */
541        m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*) p);
542
543        /* set the packet as a readonly */
544        node->set_cache_mbuf(m);
545
546        node->m_original_packet_data_prefix =0;
547    }else{
548        /* we need to copy the object */
549
550        if ( pkt_size > stream->m_vm_prefix_size  ) {
551            /* we need const packet */
552            uint16_t const_pkt_size  = pkt_size - stream->m_vm_prefix_size ;
553            rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), const_pkt_size );
554            assert(m);
555
556            char *p = rte_pktmbuf_append(m, const_pkt_size);
557            assert(p);
558
559            /* copy packet data */
560            memcpy(p,(stream_pkt+ stream->m_vm_prefix_size),const_pkt_size);
561
562            node->set_const_mbuf(m);
563        }
564
565
566        if (stream->m_vm_prefix_size > pkt_size ) {
567            stream->m_vm_prefix_size = pkt_size;
568        }
569        /* copy the headr */
570        uint16_t header_size = stream->m_vm_prefix_size;
571        assert(header_size);
572        node->alloc_prefix_header(header_size);
573        uint8_t *p=node->m_original_packet_data_prefix;
574        assert(p);
575
576        memcpy(p,stream_pkt , header_size);
577        /* TBD repace the mac if req we should add flag  */
578        m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, p);
579    }
580
581
582    CDpOneStream one_stream;
583
584    one_stream.m_dp_stream = node->m_ref_stream_info;
585    one_stream.m_node =node;
586
587    lp_port->m_active_nodes.push_back(one_stream);
588
589    /* schedule only if active */
590    if (node->m_state == CGenNodeStateless::ss_ACTIVE) {
591        m_core->m_node_gen.add_node((CGenNode *)node);
592    }
593}
594
595void
596TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj,
597                                   double duration,
598                                   int event_id) {
599
600
601    TrexStatelessDpPerPort * lp_port=get_port_db(obj->get_port_id());
602    lp_port->m_active_streams = 0;
603    lp_port->set_event_id(event_id);
604
605    /* no nodes in the list */
606    assert(lp_port->m_active_nodes.size()==0);
607
608    for (auto single_stream : obj->get_objects()) {
609        /* all commands should be for the same port */
610        assert(obj->get_port_id() == single_stream.m_stream->m_port_id);
611        add_stream(lp_port,single_stream.m_stream,obj);
612    }
613
614    uint32_t nodes = lp_port->m_active_nodes.size();
615    /* find next stream */
616    assert(nodes == obj->get_objects().size());
617
618    int cnt=0;
619
620    /* set the next_stream pointer  */
621    for (auto single_stream : obj->get_objects()) {
622
623        if (single_stream.m_stream->is_dp_next_stream() ) {
624            int stream_id = single_stream.m_stream->m_next_stream_id;
625            assert(stream_id<nodes);
626            /* point to the next stream , stream_id is fixed */
627            lp_port->m_active_nodes[cnt].m_node->m_next_stream = lp_port->m_active_nodes[stream_id].m_node ;
628        }
629        cnt++;
630    }
631
632    lp_port->m_state =TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
633    m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
634
635
636    if ( duration > 0.0 ){
637        add_port_duration( duration ,obj->get_port_id(),event_id );
638    }
639
640}
641
642
643bool TrexStatelessDpCore::are_all_ports_idle(){
644
645    bool res=true;
646    int i;
647    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
648        if ( m_ports[i].m_state != TrexStatelessDpPerPort::ppSTATE_IDLE ){
649            res=false;
650        }
651    }
652    return (res);
653}
654
655
656void
657TrexStatelessDpCore::resume_traffic(uint8_t port_id){
658
659    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
660
661    lp_port->resume_traffic(port_id);
662}
663
664
665void
666TrexStatelessDpCore::pause_traffic(uint8_t port_id){
667
668    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
669
670    lp_port->pause_traffic(port_id);
671}
672
673void
674TrexStatelessDpCore::update_traffic(uint8_t port_id, double factor) {
675
676    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
677
678    lp_port->update_traffic(port_id, factor);
679}
680
681
682void
683TrexStatelessDpCore::stop_traffic(uint8_t port_id,
684                                  bool stop_on_id,
685                                  int event_id) {
686    /* we cannot remove nodes not from the top of the queue so
687       for every active node - make sure next time
688       the scheduler invokes it, it will be free */
689
690    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
691
692    if ( lp_port->stop_traffic(port_id,stop_on_id,event_id) == false){
693        /* nothing to do ! already stopped */
694        //printf(" skip .. %f\n",m_core->m_cur_time_sec);
695        return;
696    }
697
698#if 0
699    if ( are_all_ports_idle() ) {
700        /* just a place holder if we will need to do somthing in that case */
701    }
702#endif
703
704    /* inform the control plane we stopped - this might be a async stop
705       (streams ended)
706     */
707    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
708    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
709                                                                   port_id,
710                                                                   TrexDpPortEvent::EVENT_STOP,
711                                                                   lp_port->get_event_id());
712    ring->Enqueue((CGenNode *)event_msg);
713
714}
715
716/**
717 * handle a message from CP to DP
718 *
719 */
720void
721TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) {
722    msg->handle(this);
723    delete msg;
724}
725
726