trex_stateless_dp_core.cpp revision 59548ae8
1/*
2 Itay Marom
3 Hanoch Haim
4 Cisco Systems, Inc.
5*/
6
7/*
8Copyright (c) 2015-2015 Cisco Systems, Inc.
9
10Licensed under the Apache License, Version 2.0 (the "License");
11you may not use this file except in compliance with the License.
12You may obtain a copy of the License at
13
14    http://www.apache.org/licenses/LICENSE-2.0
15
16Unless required by applicable law or agreed to in writing, software
17distributed under the License is distributed on an "AS IS" BASIS,
18WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19See the License for the specific language governing permissions and
20limitations under the License.
21*/
22#include <trex_stateless_dp_core.h>
23#include <trex_stateless_messaging.h>
24#include <trex_streams_compiler.h>
25#include <trex_stream_node.h>
26#include <trex_stream.h>
27
28#include <bp_sim.h>
29
30
31void CDpOneStream::Delete(CFlowGenListPerThread   * core){
32    assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE);
33    core->free_node((CGenNode *)m_node);
34    delete m_dp_stream;
35    m_node=0;
36    m_dp_stream=0;
37}
38
39void CDpOneStream::DeleteOnlyStream(){
40    assert(m_dp_stream);
41    delete m_dp_stream;
42    m_dp_stream=0;
43}
44
45int CGenNodeStateless::get_stream_id(){
46    if (m_state ==CGenNodeStateless::ss_FREE_RESUSE) {
47        return (-1); // not valid
48    }
49    assert(m_ref_stream_info);
50    return ((int)m_ref_stream_info->m_stream_id);
51}
52
53
54void CGenNodeStateless::DumpHeader(FILE *fd){
55    fprintf(fd," pkt_id, time, port , action , state, stream_id , stype , m-burst# , burst# \n");
56
57}
58void CGenNodeStateless::Dump(FILE *fd){
59    fprintf(fd," %2.4f, %3lu, %s,%s, %3d, %s, %3lu, %3lu  \n",
60            m_time,
61            (ulong)m_port_id,
62            "s-pkt", //action
63            get_stream_state_str(m_state ).c_str(),
64            get_stream_id(),   //stream_id
65            TrexStream::get_stream_type_str(m_stream_type).c_str(), //stype
66            (ulong)m_multi_bursts,
67            (ulong)m_single_burst
68            );
69}
70
71
72void CGenNodeStateless::refresh(){
73
74    /* refill the stream info */
75    m_single_burst    = m_single_burst_refill;
76    m_multi_bursts    = m_ref_stream_info->m_num_bursts;
77    m_state           = CGenNodeStateless::ss_ACTIVE;
78}
79
80
81void CGenNodeCommand::free_command(){
82
83    assert(m_cmd);
84    m_cmd->on_node_remove();
85    delete m_cmd;
86}
87
88
89std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state){
90    std::string res;
91
92    switch (stream_state) {
93    case CGenNodeStateless::ss_FREE_RESUSE :
94         res="FREE    ";
95        break;
96    case CGenNodeStateless::ss_INACTIVE :
97        res="INACTIVE ";
98        break;
99    case CGenNodeStateless::ss_ACTIVE :
100        res="ACTIVE   ";
101        break;
102    default:
103        res="Unknow   ";
104    };
105    return(res);
106}
107
108
109void CGenNodeStateless::free_stl_node(){
110    /* if we have cache mbuf free it */
111    rte_mbuf_t * m=get_cache_mbuf();
112    if (m) {
113        rte_pktmbuf_free(m);
114        m_cache_mbuf=0;
115    }
116}
117
118
119bool TrexStatelessDpPerPort::update_number_of_active_streams(uint32_t d){
120    m_active_streams-=d; /* reduce the number of streams */
121    if (m_active_streams == 0) {
122        return (true);
123    }
124    return (false);
125}
126
127bool TrexStatelessDpPerPort::resume_traffic(uint8_t port_id){
128
129    /* we are working with continues streams so we must be in transmit mode */
130    assert(m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE);
131
132    for (auto dp_stream : m_active_nodes) {
133        CGenNodeStateless * node =dp_stream.m_node;
134        assert(node->get_port_id() == port_id);
135        assert(node->is_pause() == true);
136        node->set_pause(false);
137    }
138    m_state = TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
139    return (true);
140}
141
142bool TrexStatelessDpPerPort::update_traffic(uint8_t port_id, double factor) {
143
144    assert( (m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING ||
145            (m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE)) );
146
147    for (auto dp_stream : m_active_nodes) {
148        CGenNodeStateless * node = dp_stream.m_node;
149        assert(node->get_port_id() == port_id);
150
151        node->update_rate(factor);
152    }
153
154    return (true);
155}
156
157bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){
158
159    /* we are working with continues streams so we must be in transmit mode */
160    assert(m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING);
161
162    for (auto dp_stream : m_active_nodes) {
163        CGenNodeStateless * node =dp_stream.m_node;
164        assert(node->get_port_id() == port_id);
165        assert(node->is_pause() == false);
166        node->set_pause(true);
167    }
168    m_state = TrexStatelessDpPerPort::ppSTATE_PAUSE;
169    return (true);
170}
171
172
173bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id,
174                                          bool stop_on_id,
175                                          int event_id){
176
177
178    if (m_state == TrexStatelessDpPerPort::ppSTATE_IDLE) {
179        assert(m_active_streams==0);
180        return false;
181    }
182
183    /* there could be race of stop after stop */
184    if ( stop_on_id ) {
185        if (event_id != m_event_id){
186            /* we can't stop it is an old message */
187            return false;
188        }
189    }
190
191    for (auto dp_stream : m_active_nodes) {
192        CGenNodeStateless * node =dp_stream.m_node;
193        assert(node->get_port_id() == port_id);
194        if ( node->get_state() == CGenNodeStateless::ss_ACTIVE) {
195            node->mark_for_free();
196            m_active_streams--;
197            dp_stream.DeleteOnlyStream();
198
199        }else{
200            dp_stream.Delete(m_core);
201        }
202    }
203
204    /* active stream should be zero */
205    assert(m_active_streams==0);
206    m_active_nodes.clear();
207    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
208    return (true);
209}
210
211
212void TrexStatelessDpPerPort::create(CFlowGenListPerThread   *  core){
213    m_core=core;
214    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
215    m_port_id=0;
216    m_active_streams=0;
217    m_active_nodes.clear();
218}
219
220
221
222void
223TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) {
224    m_thread_id = thread_id;
225    m_core = core;
226    m_local_port_offset = 2*core->getDualPortId();
227
228    CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
229
230    m_ring_from_cp = cp_dp->getRingCpToDp(thread_id);
231    m_ring_to_cp   = cp_dp->getRingDpToCp(thread_id);
232
233    m_state = STATE_IDLE;
234
235    int i;
236    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
237        m_ports[i].create(core);
238    }
239}
240
241
242/* move to the next stream, old stream move to INACTIVE */
243bool TrexStatelessDpCore::set_stateless_next_node(CGenNodeStateless * cur_node,
244                                                  CGenNodeStateless * next_node){
245
246    assert(cur_node);
247    TrexStatelessDpPerPort * lp_port = get_port_db(cur_node->m_port_id);
248    bool schedule =false;
249
250    bool to_stop_port=false;
251
252    if (next_node == NULL) {
253        /* there is no next stream , reduce the number of active streams*/
254        to_stop_port = lp_port->update_number_of_active_streams(1);
255
256    }else{
257        uint8_t state=next_node->get_state();
258
259        /* can't be FREE_RESUSE */
260        assert(state != CGenNodeStateless::ss_FREE_RESUSE);
261        if (next_node->get_state() == CGenNodeStateless::ss_INACTIVE ) {
262
263            /* refill start info and scedule, no update in active streams  */
264            next_node->refresh();
265            schedule = true;
266
267        }else{
268            to_stop_port = lp_port->update_number_of_active_streams(1);
269        }
270    }
271
272    if ( to_stop_port ) {
273        /* call stop port explictly to move the state */
274        stop_traffic(cur_node->m_port_id,false,0);
275    }
276
277    return ( schedule );
278}
279
280
281
282/**
283 * in idle state loop, the processor most of the time sleeps
284 * and periodically checks for messages
285 *
286 * @author imarom (01-Nov-15)
287 */
288void
289TrexStatelessDpCore::idle_state_loop() {
290
291    while (m_state == STATE_IDLE) {
292        periodic_check_for_cp_messages();
293        delay(200);
294    }
295}
296
297
298
299void TrexStatelessDpCore::quit_main_loop(){
300    m_core->set_terminate_mode(true); /* mark it as terminated */
301    m_state = STATE_TERMINATE;
302    add_global_duration(0.0001);
303}
304
305
306/**
307 * scehduler runs when traffic exists
308 * it will return when no more transmitting is done on this
309 * core
310 *
311 * @author imarom (01-Nov-15)
312 */
313void
314TrexStatelessDpCore::start_scheduler() {
315    /* creates a maintenace job using the scheduler */
316    CGenNode * node_sync = m_core->create_node() ;
317    node_sync->m_type = CGenNode::FLOW_SYNC;
318    node_sync->m_time = m_core->m_cur_time_sec + SYNC_TIME_OUT;
319    m_core->m_node_gen.add_node(node_sync);
320
321    double old_offset = 0.0;
322    m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset);
323    /* bail out in case of terminate */
324    if (m_state != TrexStatelessDpCore::STATE_TERMINATE) {
325        m_core->m_node_gen.close_file(m_core);
326        m_state = STATE_IDLE; /* we exit from all ports and we have nothing to do, we move to IDLE state */
327    }
328}
329
330
331void
332TrexStatelessDpCore::run_once(){
333
334    idle_state_loop();
335
336    if ( m_state == STATE_TERMINATE ){
337        return;
338    }
339
340    start_scheduler();
341}
342
343
344
345
346void
347TrexStatelessDpCore::start() {
348
349    while (true) {
350        run_once();
351
352        if ( m_core->is_terminated_by_master() ) {
353            break;
354        }
355    }
356}
357
358/* only if both port are idle we can exit */
359void
360TrexStatelessDpCore::schedule_exit(){
361
362    CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
363
364    node->m_type = CGenNode::COMMAND;
365
366    node->m_cmd = new TrexStatelessDpCanQuit();
367
368    /* make sure it will be scheduled after the current node */
369    node->m_time = m_core->m_cur_time_sec ;
370
371    m_core->m_node_gen.add_node((CGenNode *)node);
372}
373
374
375void
376TrexStatelessDpCore::add_global_duration(double duration){
377    if (duration > 0.0) {
378        CGenNode *node = m_core->create_node() ;
379
380        node->m_type = CGenNode::EXIT_SCHED;
381
382        /* make sure it will be scheduled after the current node */
383        node->m_time = m_core->m_cur_time_sec + duration ;
384
385        m_core->m_node_gen.add_node(node);
386    }
387}
388
389/* add per port exit */
390void
391TrexStatelessDpCore::add_port_duration(double duration,
392                                       uint8_t port_id,
393                                       int event_id){
394    if (duration > 0.0) {
395        CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
396
397        node->m_type = CGenNode::COMMAND;
398
399        /* make sure it will be scheduled after the current node */
400        node->m_time = m_core->m_cur_time_sec + duration ;
401
402        TrexStatelessDpStop * cmd=new TrexStatelessDpStop(port_id);
403
404
405        /* test this */
406        m_core->m_non_active_nodes++;
407        cmd->set_core_ptr(m_core);
408        cmd->set_event_id(event_id);
409        cmd->set_wait_for_event_id(true);
410
411        node->m_cmd = cmd;
412
413        m_core->m_node_gen.add_node((CGenNode *)node);
414    }
415}
416
417
418void
419TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port,
420                                TrexStream * stream,
421                                TrexStreamsCompiledObj *comp) {
422
423    CGenNodeStateless *node = m_core->create_node_sl();
424
425    /* add periodic */
426    node->m_type = CGenNode::STATELESS_PKT;
427
428    node->m_ref_stream_info  =   stream->clone_as_dp();
429
430    node->m_next_stream=0; /* will be fixed later */
431
432
433    if ( stream->m_self_start ){
434        /* if self start it is in active mode */
435        node->m_state =CGenNodeStateless::ss_ACTIVE;
436        lp_port->m_active_streams++;
437    }else{
438        node->m_state =CGenNodeStateless::ss_INACTIVE;
439    }
440
441    node->m_time = m_core->m_cur_time_sec + usec_to_sec(stream->m_isg_usec);
442
443    pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(stream->m_port_id);
444    node->m_flags = 0;
445
446    /* set socket id */
447    node->set_socket_id(m_core->m_node_gen.m_socket_id);
448
449    /* build a mbuf from a packet */
450
451    uint16_t pkt_size = stream->m_pkt.len;
452    const uint8_t *stream_pkt = stream->m_pkt.binary;
453
454    node->m_pause =0;
455    node->m_stream_type = stream->m_type;
456    node->m_next_time_offset = 1.0 / (stream->get_pps() * comp->get_multiplier());
457
458    /* stateless specific fields */
459    switch ( stream->m_type ) {
460
461    case TrexStream::stCONTINUOUS :
462        node->m_single_burst=0;
463        node->m_single_burst_refill=0;
464        node->m_multi_bursts=0;
465        node->m_ibg_sec                 = 0.0;
466        break;
467
468    case TrexStream::stSINGLE_BURST :
469        node->m_stream_type             = TrexStream::stMULTI_BURST;
470        node->m_single_burst            = stream->m_burst_total_pkts;
471        node->m_single_burst_refill     = stream->m_burst_total_pkts;
472        node->m_multi_bursts            = 1;  /* single burst in multi burst of 1 */
473        node->m_ibg_sec                 = 0.0;
474        break;
475
476    case TrexStream::stMULTI_BURST :
477        node->m_single_burst        = stream->m_burst_total_pkts;
478        node->m_single_burst_refill = stream->m_burst_total_pkts;
479        node->m_multi_bursts        = stream->m_num_bursts;
480        node->m_ibg_sec             = usec_to_sec( stream->m_ibg_usec );
481        break;
482    default:
483
484        assert(0);
485    };
486
487    node->m_port_id = stream->m_port_id;
488
489    /* allocate const mbuf */
490    rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size);
491    assert(m);
492
493    char *p = rte_pktmbuf_append(m, pkt_size);
494    assert(p);
495    /* copy the packet */
496    memcpy(p,stream_pkt,pkt_size);
497
498    /* set dir 0 or 1 client or server */
499    node->set_mbuf_cache_dir(dir);
500
501    /* TBD repace the mac if req we should add flag  */
502    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, m);
503
504    /* set the packet as a readonly */
505    node->set_cache_mbuf(m);
506
507    CDpOneStream one_stream;
508
509    one_stream.m_dp_stream = node->m_ref_stream_info;
510    one_stream.m_node =node;
511
512    lp_port->m_active_nodes.push_back(one_stream);
513
514    /* schedule only if active */
515    if (node->m_state == CGenNodeStateless::ss_ACTIVE) {
516        m_core->m_node_gen.add_node((CGenNode *)node);
517    }
518}
519
520void
521TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj,
522                                   double duration,
523                                   int event_id) {
524
525
526    TrexStatelessDpPerPort * lp_port=get_port_db(obj->get_port_id());
527    lp_port->m_active_streams = 0;
528    lp_port->set_event_id(event_id);
529
530    /* no nodes in the list */
531    assert(lp_port->m_active_nodes.size()==0);
532
533    for (auto single_stream : obj->get_objects()) {
534        /* all commands should be for the same port */
535        assert(obj->get_port_id() == single_stream.m_stream->m_port_id);
536        add_stream(lp_port,single_stream.m_stream,obj);
537    }
538
539    uint32_t nodes = lp_port->m_active_nodes.size();
540    /* find next stream */
541    assert(nodes == obj->get_objects().size());
542
543    int cnt=0;
544
545    /* set the next_stream pointer  */
546    for (auto single_stream : obj->get_objects()) {
547
548        if (single_stream.m_stream->is_dp_next_stream() ) {
549            int stream_id = single_stream.m_stream->m_next_stream_id;
550            assert(stream_id<nodes);
551            /* point to the next stream , stream_id is fixed */
552            lp_port->m_active_nodes[cnt].m_node->m_next_stream = lp_port->m_active_nodes[stream_id].m_node ;
553        }
554        cnt++;
555    }
556
557    lp_port->m_state =TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
558    m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
559
560
561    if ( duration > 0.0 ){
562        add_port_duration( duration ,obj->get_port_id(),event_id );
563    }
564
565}
566
567
568bool TrexStatelessDpCore::are_all_ports_idle(){
569
570    bool res=true;
571    int i;
572    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
573        if ( m_ports[i].m_state != TrexStatelessDpPerPort::ppSTATE_IDLE ){
574            res=false;
575        }
576    }
577    return (res);
578}
579
580
581void
582TrexStatelessDpCore::resume_traffic(uint8_t port_id){
583
584    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
585
586    lp_port->resume_traffic(port_id);
587}
588
589
590void
591TrexStatelessDpCore::pause_traffic(uint8_t port_id){
592
593    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
594
595    lp_port->pause_traffic(port_id);
596}
597
598void
599TrexStatelessDpCore::update_traffic(uint8_t port_id, double factor) {
600
601    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
602
603    lp_port->update_traffic(port_id, factor);
604}
605
606
607void
608TrexStatelessDpCore::stop_traffic(uint8_t port_id,
609                                  bool stop_on_id,
610                                  int event_id) {
611    /* we cannot remove nodes not from the top of the queue so
612       for every active node - make sure next time
613       the scheduler invokes it, it will be free */
614
615    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
616
617    if ( lp_port->stop_traffic(port_id,stop_on_id,event_id) == false){
618        /* nothing to do ! already stopped */
619        //printf(" skip .. %f\n",m_core->m_cur_time_sec);
620        return;
621    }
622
623#if 0
624    if ( are_all_ports_idle() ) {
625        /* just a place holder if we will need to do somthing in that case */
626    }
627#endif
628
629    /* inform the control plane we stopped - this might be a async stop
630       (streams ended)
631     */
632    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
633    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
634                                                                   port_id,
635                                                                   TrexDpPortEvent::EVENT_STOP,
636                                                                   lp_port->get_event_id());
637    ring->Enqueue((CGenNode *)event_msg);
638
639}
640
641/**
642 * handle a message from CP to DP
643 *
644 */
645void
646TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) {
647    msg->handle(this);
648    delete msg;
649}
650
651