trex_stateless_dp_core.cpp revision d9a11302
1/*
2 Itay Marom
3 Hanoch Haim
4 Cisco Systems, Inc.
5*/
6
7/*
8Copyright (c) 2015-2015 Cisco Systems, Inc.
9
10Licensed under the Apache License, Version 2.0 (the "License");
11you may not use this file except in compliance with the License.
12You may obtain a copy of the License at
13
14    http://www.apache.org/licenses/LICENSE-2.0
15
16Unless required by applicable law or agreed to in writing, software
17distributed under the License is distributed on an "AS IS" BASIS,
18WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19See the License for the specific language governing permissions and
20limitations under the License.
21*/
22#include <trex_stateless_dp_core.h>
23#include <trex_stateless_messaging.h>
24#include <trex_streams_compiler.h>
25#include <trex_stream_node.h>
26#include <trex_stream.h>
27
28#include <bp_sim.h>
29
30
31void CDpOneStream::Delete(CFlowGenListPerThread   * core){
32    assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE);
33    core->free_node((CGenNode *)m_node);
34    delete m_dp_stream;
35    m_node=0;
36    m_dp_stream=0;
37}
38
39void CDpOneStream::DeleteOnlyStream(){
40    assert(m_dp_stream);
41    delete m_dp_stream;
42    m_dp_stream=0;
43}
44
45int CGenNodeStateless::get_stream_id(){
46    if (m_state ==CGenNodeStateless::ss_FREE_RESUSE) {
47        return (-1); // not valid
48    }
49    assert(m_ref_stream_info);
50    return ((int)m_ref_stream_info->m_stream_id);
51}
52
53
54void CGenNodeStateless::DumpHeader(FILE *fd){
55    fprintf(fd," pkt_id, time, port , action , state, stream_id , stype , m-burst# , burst# \n");
56
57}
58void CGenNodeStateless::Dump(FILE *fd){
59    fprintf(fd," %2.4f, %3lu, %s,%s, %3d, %s, %3lu, %3lu  \n",
60            m_time,
61            (ulong)m_port_id,
62            "s-pkt", //action
63            get_stream_state_str(m_state ).c_str(),
64            get_stream_id(),   //stream_id
65            TrexStream::get_stream_type_str(m_stream_type).c_str(), //stype
66            (ulong)m_multi_bursts,
67            (ulong)m_single_burst
68            );
69}
70
71
72void CGenNodeStateless::refresh(){
73
74    /* refill the stream info */
75    m_single_burst    = m_single_burst_refill;
76    m_multi_bursts    = m_ref_stream_info->m_num_bursts;
77    m_state           = CGenNodeStateless::ss_ACTIVE;
78}
79
80
81void CGenNodeCommand::free_command(){
82
83    assert(m_cmd);
84    m_cmd->on_node_remove();
85    delete m_cmd;
86}
87
88
89std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state){
90    std::string res;
91
92    switch (stream_state) {
93    case CGenNodeStateless::ss_FREE_RESUSE :
94         res="FREE    ";
95        break;
96    case CGenNodeStateless::ss_INACTIVE :
97        res="INACTIVE ";
98        break;
99    case CGenNodeStateless::ss_ACTIVE :
100        res="ACTIVE   ";
101        break;
102    default:
103        res="Unknow   ";
104    };
105    return(res);
106}
107
108
109void CGenNodeStateless::free_stl_node(){
110    /* if we have cache mbuf free it */
111    rte_mbuf_t * m=get_cache_mbuf();
112    if (m) {
113        rte_pktmbuf_free(m);
114        m_cache_mbuf=0;
115    }
116}
117
118
119bool TrexStatelessDpPerPort::update_number_of_active_streams(uint32_t d){
120    m_active_streams-=d; /* reduce the number of streams */
121    if (m_active_streams == 0) {
122        return (true);
123    }
124    return (false);
125}
126
127bool TrexStatelessDpPerPort::resume_traffic(uint8_t port_id){
128
129    /* we are working with continues streams so we must be in transmit mode */
130    assert(m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE);
131
132    for (auto dp_stream : m_active_nodes) {
133        CGenNodeStateless * node =dp_stream.m_node;
134        assert(node->get_port_id() == port_id);
135        assert(node->is_pause() == true);
136        node->set_pause(false);
137    }
138    m_state = TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
139    return (true);
140}
141
142bool TrexStatelessDpPerPort::update_traffic(uint8_t port_id, double mul) {
143
144    assert( (m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING ||
145            (m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE)) );
146
147    for (auto dp_stream : m_active_nodes) {
148        CGenNodeStateless * node = dp_stream.m_node;
149        assert(node->get_port_id() == port_id);
150
151        node->set_multiplier(mul);
152    }
153
154    return (true);
155}
156
157bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){
158
159    /* we are working with continues streams so we must be in transmit mode */
160    assert(m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING);
161
162    for (auto dp_stream : m_active_nodes) {
163        CGenNodeStateless * node =dp_stream.m_node;
164        assert(node->get_port_id() == port_id);
165        assert(node->is_pause() == false);
166        node->set_pause(true);
167    }
168    m_state = TrexStatelessDpPerPort::ppSTATE_PAUSE;
169    return (true);
170}
171
172
173bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id,
174                                          bool stop_on_id,
175                                          int event_id){
176
177
178    if (m_state == TrexStatelessDpPerPort::ppSTATE_IDLE) {
179        assert(m_active_streams==0);
180        return false;
181    }
182
183    /* there could be race of stop after stop */
184    if ( stop_on_id ) {
185        if (event_id != m_event_id){
186            /* we can't stop it is an old message */
187            return false;
188        }
189    }
190
191    for (auto dp_stream : m_active_nodes) {
192        CGenNodeStateless * node =dp_stream.m_node;
193        assert(node->get_port_id() == port_id);
194        if ( node->get_state() == CGenNodeStateless::ss_ACTIVE) {
195            node->mark_for_free();
196            m_active_streams--;
197            dp_stream.DeleteOnlyStream();
198
199        }else{
200            dp_stream.Delete(m_core);
201        }
202    }
203
204    /* active stream should be zero */
205    assert(m_active_streams==0);
206    m_active_nodes.clear();
207    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
208    return (true);
209}
210
211
212void TrexStatelessDpPerPort::create(CFlowGenListPerThread   *  core){
213    m_core=core;
214    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
215    m_port_id=0;
216    m_active_streams=0;
217    m_active_nodes.clear();
218}
219
220
221
222void
223TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) {
224    m_thread_id = thread_id;
225    m_core = core;
226    m_local_port_offset = 2*core->getDualPortId();
227
228    CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
229
230    m_ring_from_cp = cp_dp->getRingCpToDp(thread_id);
231    m_ring_to_cp   = cp_dp->getRingDpToCp(thread_id);
232
233    m_state = STATE_IDLE;
234
235    int i;
236    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
237        m_ports[i].create(core);
238    }
239}
240
241
242/* move to the next stream, old stream move to INACTIVE */
243bool TrexStatelessDpCore::set_stateless_next_node(CGenNodeStateless * cur_node,
244                                                  CGenNodeStateless * next_node){
245
246    assert(cur_node);
247    TrexStatelessDpPerPort * lp_port = get_port_db(cur_node->m_port_id);
248    bool schedule =false;
249
250    bool to_stop_port=false;
251
252    if (next_node == NULL) {
253        /* there is no next stream , reduce the number of active streams*/
254        to_stop_port = lp_port->update_number_of_active_streams(1);
255
256    }else{
257        uint8_t state=next_node->get_state();
258
259        /* can't be FREE_RESUSE */
260        assert(state != CGenNodeStateless::ss_FREE_RESUSE);
261        if (next_node->get_state() == CGenNodeStateless::ss_INACTIVE ) {
262
263            /* refill start info and scedule, no update in active streams  */
264            next_node->refresh();
265            schedule = true;
266
267        }else{
268            to_stop_port = lp_port->update_number_of_active_streams(1);
269        }
270    }
271
272    if ( to_stop_port ) {
273        /* call stop port explictly to move the state */
274        stop_traffic(cur_node->m_port_id,false,0);
275    }
276
277    return ( schedule );
278}
279
280
281
282/**
283 * in idle state loop, the processor most of the time sleeps
284 * and periodically checks for messages
285 *
286 * @author imarom (01-Nov-15)
287 */
288void
289TrexStatelessDpCore::idle_state_loop() {
290
291    while (m_state == STATE_IDLE) {
292        periodic_check_for_cp_messages();
293        delay(200);
294    }
295}
296
297
298
299void TrexStatelessDpCore::quit_main_loop(){
300    m_core->set_terminate_mode(true); /* mark it as terminated */
301    m_state = STATE_TERMINATE;
302    add_global_duration(0.0001);
303}
304
305
306/**
307 * scehduler runs when traffic exists
308 * it will return when no more transmitting is done on this
309 * core
310 *
311 * @author imarom (01-Nov-15)
312 */
313void
314TrexStatelessDpCore::start_scheduler() {
315    /* creates a maintenace job using the scheduler */
316    CGenNode * node_sync = m_core->create_node() ;
317    node_sync->m_type = CGenNode::FLOW_SYNC;
318    node_sync->m_time = m_core->m_cur_time_sec + SYNC_TIME_OUT;
319    m_core->m_node_gen.add_node(node_sync);
320
321    double old_offset = 0.0;
322    m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset);
323    /* bail out in case of terminate */
324    if (m_state != TrexStatelessDpCore::STATE_TERMINATE) {
325        m_core->m_node_gen.close_file(m_core);
326        m_state = STATE_IDLE; /* we exit from all ports and we have nothing to do, we move to IDLE state */
327    }
328}
329
330
331void
332TrexStatelessDpCore::run_once(){
333
334    idle_state_loop();
335
336    if ( m_state == STATE_TERMINATE ){
337        return;
338    }
339
340    start_scheduler();
341}
342
343
344
345
346void
347TrexStatelessDpCore::start() {
348
349    while (true) {
350        run_once();
351
352        if ( m_core->is_terminated_by_master() ) {
353            break;
354        }
355    }
356}
357
358/* only if both port are idle we can exit */
359void
360TrexStatelessDpCore::schedule_exit(){
361
362    CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
363
364    node->m_type = CGenNode::COMMAND;
365
366    node->m_cmd = new TrexStatelessDpCanQuit();
367
368    /* make sure it will be scheduled after the current node */
369    node->m_time = m_core->m_cur_time_sec ;
370
371    m_core->m_node_gen.add_node((CGenNode *)node);
372}
373
374
375void
376TrexStatelessDpCore::add_global_duration(double duration){
377    if (duration > 0.0) {
378        CGenNode *node = m_core->create_node() ;
379
380        node->m_type = CGenNode::EXIT_SCHED;
381
382        /* make sure it will be scheduled after the current node */
383        node->m_time = m_core->m_cur_time_sec + duration ;
384
385        m_core->m_node_gen.add_node(node);
386    }
387}
388
389/* add per port exit */
390void
391TrexStatelessDpCore::add_port_duration(double duration,
392                                       uint8_t port_id,
393                                       int event_id){
394    if (duration > 0.0) {
395        CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
396
397        node->m_type = CGenNode::COMMAND;
398
399        /* make sure it will be scheduled after the current node */
400        node->m_time = m_core->m_cur_time_sec + duration ;
401
402        TrexStatelessDpStop * cmd=new TrexStatelessDpStop(port_id);
403
404
405        /* test this */
406        m_core->m_non_active_nodes++;
407        cmd->set_core_ptr(m_core);
408        cmd->set_event_id(event_id);
409        cmd->set_wait_for_event_id(true);
410
411        node->m_cmd = cmd;
412
413        m_core->m_node_gen.add_node((CGenNode *)node);
414    }
415}
416
417
418void
419TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port,
420                                TrexStream * stream,
421                                TrexStreamsCompiledObj *comp) {
422
423    CGenNodeStateless *node = m_core->create_node_sl();
424
425    /* add periodic */
426    node->m_type = CGenNode::STATELESS_PKT;
427
428    node->m_ref_stream_info  =   stream->clone_as_dp();
429
430    node->m_next_stream=0; /* will be fixed later */
431
432
433    if ( stream->m_self_start ){
434        /* if self start it is in active mode */
435        node->m_state =CGenNodeStateless::ss_ACTIVE;
436        lp_port->m_active_streams++;
437    }else{
438        node->m_state =CGenNodeStateless::ss_INACTIVE;
439    }
440
441    node->m_time = m_core->m_cur_time_sec + usec_to_sec(stream->m_isg_usec);
442
443    pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(stream->m_port_id);
444    node->m_flags = 0;
445
446    /* set socket id */
447    node->set_socket_id(m_core->m_node_gen.m_socket_id);
448
449    /* build a mbuf from a packet */
450
451    uint16_t pkt_size = stream->m_pkt.len;
452    const uint8_t *stream_pkt = stream->m_pkt.binary;
453
454    node->m_pause =0;
455    node->m_stream_type = stream->m_type;
456    node->m_base_pps = stream->get_pps();
457    node->set_multiplier(comp->get_multiplier());
458
459    /* stateless specific fields */
460    switch ( stream->m_type ) {
461
462    case TrexStream::stCONTINUOUS :
463        node->m_single_burst=0;
464        node->m_single_burst_refill=0;
465        node->m_multi_bursts=0;
466        node->m_ibg_sec                 = 0.0;
467        break;
468
469    case TrexStream::stSINGLE_BURST :
470        node->m_stream_type             = TrexStream::stMULTI_BURST;
471        node->m_single_burst            = stream->m_burst_total_pkts;
472        node->m_single_burst_refill     = stream->m_burst_total_pkts;
473        node->m_multi_bursts            = 1;  /* single burst in multi burst of 1 */
474        node->m_ibg_sec                 = 0.0;
475        break;
476
477    case TrexStream::stMULTI_BURST :
478        node->m_single_burst        = stream->m_burst_total_pkts;
479        node->m_single_burst_refill = stream->m_burst_total_pkts;
480        node->m_multi_bursts        = stream->m_num_bursts;
481        node->m_ibg_sec             = usec_to_sec( stream->m_ibg_usec );
482        break;
483    default:
484
485        assert(0);
486    };
487
488    node->m_port_id = stream->m_port_id;
489
490    /* allocate const mbuf */
491    rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size);
492    assert(m);
493
494    char *p = rte_pktmbuf_append(m, pkt_size);
495    assert(p);
496    /* copy the packet */
497    memcpy(p,stream_pkt,pkt_size);
498
499    /* set dir 0 or 1 client or server */
500    node->set_mbuf_cache_dir(dir);
501
502    /* TBD repace the mac if req we should add flag  */
503    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, m);
504
505    /* set the packet as a readonly */
506    node->set_cache_mbuf(m);
507
508    CDpOneStream one_stream;
509
510    one_stream.m_dp_stream = node->m_ref_stream_info;
511    one_stream.m_node =node;
512
513    lp_port->m_active_nodes.push_back(one_stream);
514
515    /* schedule only if active */
516    if (node->m_state == CGenNodeStateless::ss_ACTIVE) {
517        m_core->m_node_gen.add_node((CGenNode *)node);
518    }
519}
520
521void
522TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj,
523                                   double duration,
524                                   int event_id) {
525
526
527    TrexStatelessDpPerPort * lp_port=get_port_db(obj->get_port_id());
528    lp_port->m_active_streams = 0;
529    lp_port->set_event_id(event_id);
530
531    /* no nodes in the list */
532    assert(lp_port->m_active_nodes.size()==0);
533
534    for (auto single_stream : obj->get_objects()) {
535        /* all commands should be for the same port */
536        assert(obj->get_port_id() == single_stream.m_stream->m_port_id);
537        add_stream(lp_port,single_stream.m_stream,obj);
538    }
539
540    uint32_t nodes = lp_port->m_active_nodes.size();
541    /* find next stream */
542    assert(nodes == obj->get_objects().size());
543
544    int cnt=0;
545
546    /* set the next_stream pointer  */
547    for (auto single_stream : obj->get_objects()) {
548
549        if (single_stream.m_stream->is_dp_next_stream() ) {
550            int stream_id = single_stream.m_stream->m_next_stream_id;
551            assert(stream_id<nodes);
552            /* point to the next stream , stream_id is fixed */
553            lp_port->m_active_nodes[cnt].m_node->m_next_stream = lp_port->m_active_nodes[stream_id].m_node ;
554        }
555        cnt++;
556    }
557
558    lp_port->m_state =TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
559    m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
560
561
562    if ( duration > 0.0 ){
563        add_port_duration( duration ,obj->get_port_id(),event_id );
564    }
565
566}
567
568
569bool TrexStatelessDpCore::are_all_ports_idle(){
570
571    bool res=true;
572    int i;
573    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
574        if ( m_ports[i].m_state != TrexStatelessDpPerPort::ppSTATE_IDLE ){
575            res=false;
576        }
577    }
578    return (res);
579}
580
581
582void
583TrexStatelessDpCore::resume_traffic(uint8_t port_id){
584
585    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
586
587    lp_port->resume_traffic(port_id);
588}
589
590
591void
592TrexStatelessDpCore::pause_traffic(uint8_t port_id){
593
594    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
595
596    lp_port->pause_traffic(port_id);
597}
598
599void
600TrexStatelessDpCore::update_traffic(uint8_t port_id, double mul) {
601
602    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
603
604    lp_port->update_traffic(port_id, mul);
605}
606
607
608void
609TrexStatelessDpCore::stop_traffic(uint8_t port_id,
610                                  bool stop_on_id,
611                                  int event_id) {
612    /* we cannot remove nodes not from the top of the queue so
613       for every active node - make sure next time
614       the scheduler invokes it, it will be free */
615
616    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
617
618    if ( lp_port->stop_traffic(port_id,stop_on_id,event_id) == false){
619        /* nothing to do ! already stopped */
620        //printf(" skip .. %f\n",m_core->m_cur_time_sec);
621        return;
622    }
623
624#if 0
625    if ( are_all_ports_idle() ) {
626        /* just a place holder if we will need to do somthing in that case */
627    }
628#endif
629
630    /* inform the control plane we stopped - this might be a async stop
631       (streams ended)
632     */
633    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
634    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
635                                                                   port_id,
636                                                                   TrexDpPortEvent::EVENT_STOP,
637                                                                   lp_port->get_event_id());
638    ring->Enqueue((CGenNode *)event_msg);
639
640}
641
642/**
643 * handle a message from CP to DP
644 *
645 */
646void
647TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) {
648    msg->handle(this);
649    delete msg;
650}
651
652