trex_stateless_dp_core.cpp revision 12a19244
1/*
2 Itay Marom
3 Hanoch Haim
4 Cisco Systems, Inc.
5*/
6
7/*
8Copyright (c) 2015-2015 Cisco Systems, Inc.
9
10Licensed under the Apache License, Version 2.0 (the "License");
11you may not use this file except in compliance with the License.
12You may obtain a copy of the License at
13
14    http://www.apache.org/licenses/LICENSE-2.0
15
16Unless required by applicable law or agreed to in writing, software
17distributed under the License is distributed on an "AS IS" BASIS,
18WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19See the License for the specific language governing permissions and
20limitations under the License.
21*/
22#include <trex_stateless_dp_core.h>
23#include <trex_stateless_messaging.h>
24#include <trex_streams_compiler.h>
25#include <trex_stream_node.h>
26#include <trex_stream.h>
27
28#include <bp_sim.h>
29
30
31void CDpOneStream::Delete(CFlowGenListPerThread   * core){
32    assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE);
33    core->free_node((CGenNode *)m_node);
34    delete m_dp_stream;
35    m_node=0;
36    m_dp_stream=0;
37}
38
39void CDpOneStream::DeleteOnlyStream(){
40    assert(m_dp_stream);
41    delete m_dp_stream;
42    m_dp_stream=0;
43}
44
45int CGenNodeStateless::get_stream_id(){
46    if (m_state ==CGenNodeStateless::ss_FREE_RESUSE) {
47        return (-1); // not valid
48    }
49    assert(m_ref_stream_info);
50    return ((int)m_ref_stream_info->m_stream_id);
51}
52
53
54void CGenNodeStateless::DumpHeader(FILE *fd){
55    fprintf(fd," pkt_id, time, port , action , state, stream_id , stype , m-burst# , burst# \n");
56
57}
58void CGenNodeStateless::Dump(FILE *fd){
59    fprintf(fd," %2.4f, %3lu, %s,%s, %3d, %s, %3lu, %3lu  \n",
60            m_time,
61            (ulong)m_port_id,
62            "s-pkt", //action
63            get_stream_state_str(m_state ).c_str(),
64            get_stream_id(),   //stream_id
65            TrexStream::get_stream_type_str(m_stream_type).c_str(), //stype
66            (ulong)m_multi_bursts,
67            (ulong)m_single_burst
68            );
69}
70
71
72void CGenNodeStateless::refresh(){
73
74    /* refill the stream info */
75    m_single_burst    = m_single_burst_refill;
76    m_multi_bursts    = m_ref_stream_info->m_num_bursts;
77    m_state           = CGenNodeStateless::ss_ACTIVE;
78}
79
80
81void CGenNodeCommand::free_command(){
82
83    assert(m_cmd);
84    m_cmd->on_node_remove();
85    delete m_cmd;
86}
87
88
89std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state){
90    std::string res;
91
92    switch (stream_state) {
93    case CGenNodeStateless::ss_FREE_RESUSE :
94         res="FREE    ";
95        break;
96    case CGenNodeStateless::ss_INACTIVE :
97        res="INACTIVE ";
98        break;
99    case CGenNodeStateless::ss_ACTIVE :
100        res="ACTIVE   ";
101        break;
102    default:
103        res="Unknow   ";
104    };
105    return(res);
106}
107
108
109void CGenNodeStateless::free_stl_node(){
110    /* if we have cache mbuf free it */
111    rte_mbuf_t * m=get_cache_mbuf();
112    if (m) {
113        rte_pktmbuf_free(m);
114        m_cache_mbuf=0;
115    }
116}
117
118
119bool TrexStatelessDpPerPort::update_number_of_active_streams(uint32_t d){
120    m_active_streams-=d; /* reduce the number of streams */
121    if (m_active_streams == 0) {
122        return (true);
123    }
124    return (false);
125}
126
127bool TrexStatelessDpPerPort::resume_traffic(uint8_t port_id){
128
129    /* we are working with continues streams so we must be in transmit mode */
130    assert(m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE);
131
132    for (auto dp_stream : m_active_nodes) {
133        CGenNodeStateless * node =dp_stream.m_node;
134        assert(node->get_port_id() == port_id);
135        assert(node->is_pause() == true);
136        node->set_pause(false);
137    }
138    m_state = TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
139    return (true);
140}
141
142
143bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){
144
145    /* we are working with continues streams so we must be in transmit mode */
146    assert(m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING);
147
148    for (auto dp_stream : m_active_nodes) {
149        CGenNodeStateless * node =dp_stream.m_node;
150        assert(node->get_port_id() == port_id);
151        assert(node->is_pause() == false);
152        node->set_pause(true);
153    }
154    m_state = TrexStatelessDpPerPort::ppSTATE_PAUSE;
155    return (true);
156}
157
158
159bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id,
160                                          bool stop_on_id,
161                                          int event_id){
162
163
164    if (m_state == TrexStatelessDpPerPort::ppSTATE_IDLE) {
165        assert(m_active_streams==0);
166        return false;
167    }
168
169    /* there could be race of stop after stop */
170    if ( stop_on_id ) {
171        if (event_id != m_event_id){
172            /* we can't stop it is an old message */
173            return false;
174        }
175    }
176
177    for (auto dp_stream : m_active_nodes) {
178        CGenNodeStateless * node =dp_stream.m_node;
179        assert(node->get_port_id() == port_id);
180        if ( node->get_state() == CGenNodeStateless::ss_ACTIVE) {
181            node->mark_for_free();
182            m_active_streams--;
183            dp_stream.DeleteOnlyStream();
184
185        }else{
186            dp_stream.Delete(m_core);
187        }
188    }
189
190    /* active stream should be zero */
191    assert(m_active_streams==0);
192    m_active_nodes.clear();
193    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
194    return (true);
195}
196
197
198void TrexStatelessDpPerPort::create(CFlowGenListPerThread   *  core){
199    m_core=core;
200    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
201    m_port_id=0;
202    m_active_streams=0;
203    m_active_nodes.clear();
204}
205
206
207
208void
209TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) {
210    m_thread_id = thread_id;
211    m_core = core;
212    m_local_port_offset = 2*core->getDualPortId();
213
214    CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
215
216    m_ring_from_cp = cp_dp->getRingCpToDp(thread_id);
217    m_ring_to_cp   = cp_dp->getRingDpToCp(thread_id);
218
219    m_state = STATE_IDLE;
220
221    int i;
222    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
223        m_ports[i].create(core);
224    }
225}
226
227
228/* move to the next stream, old stream move to INACTIVE */
229bool TrexStatelessDpCore::set_stateless_next_node(CGenNodeStateless * cur_node,
230                                                  CGenNodeStateless * next_node){
231
232    assert(cur_node);
233    TrexStatelessDpPerPort * lp_port = get_port_db(cur_node->m_port_id);
234    bool schedule =false;
235
236    bool to_stop_port=false;
237
238    if (next_node == NULL) {
239        /* there is no next stream , reduce the number of active streams*/
240        to_stop_port = lp_port->update_number_of_active_streams(1);
241
242    }else{
243        uint8_t state=next_node->get_state();
244
245        /* can't be FREE_RESUSE */
246        assert(state != CGenNodeStateless::ss_FREE_RESUSE);
247        if (next_node->get_state() == CGenNodeStateless::ss_INACTIVE ) {
248
249            /* refill start info and scedule, no update in active streams  */
250            next_node->refresh();
251            schedule = true;
252
253        }else{
254            to_stop_port = lp_port->update_number_of_active_streams(1);
255        }
256    }
257
258    if ( to_stop_port ) {
259        /* call stop port explictly to move the state */
260        stop_traffic(cur_node->m_port_id,false,0);
261    }
262
263    return ( schedule );
264}
265
266
267
268/**
269 * in idle state loop, the processor most of the time sleeps
270 * and periodically checks for messages
271 *
272 * @author imarom (01-Nov-15)
273 */
274void
275TrexStatelessDpCore::idle_state_loop() {
276
277    while (m_state == STATE_IDLE) {
278        periodic_check_for_cp_messages();
279        delay(200);
280    }
281}
282
283
284
285void TrexStatelessDpCore::quit_main_loop(){
286    m_core->set_terminate_mode(true); /* mark it as terminated */
287    m_state = STATE_TERMINATE;
288    add_global_duration(0.0001);
289}
290
291
292/**
293 * scehduler runs when traffic exists
294 * it will return when no more transmitting is done on this
295 * core
296 *
297 * @author imarom (01-Nov-15)
298 */
299void
300TrexStatelessDpCore::start_scheduler() {
301    /* creates a maintenace job using the scheduler */
302    CGenNode * node_sync = m_core->create_node() ;
303    node_sync->m_type = CGenNode::FLOW_SYNC;
304    node_sync->m_time = m_core->m_cur_time_sec + SYNC_TIME_OUT;
305    m_core->m_node_gen.add_node(node_sync);
306
307    double old_offset = 0.0;
308    m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset);
309    /* bail out in case of terminate */
310    if (m_state != TrexStatelessDpCore::STATE_TERMINATE) {
311        m_core->m_node_gen.close_file(m_core);
312        m_state = STATE_IDLE; /* we exit from all ports and we have nothing to do, we move to IDLE state */
313    }
314}
315
316
317void
318TrexStatelessDpCore::run_once(){
319
320    idle_state_loop();
321
322    if ( m_state == STATE_TERMINATE ){
323        return;
324    }
325
326    start_scheduler();
327}
328
329
330
331
332void
333TrexStatelessDpCore::start() {
334
335    while (true) {
336        run_once();
337
338        if ( m_core->is_terminated_by_master() ) {
339            break;
340        }
341    }
342}
343
344/* only if both port are idle we can exit */
345void
346TrexStatelessDpCore::schedule_exit(){
347
348    CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
349
350    node->m_type = CGenNode::COMMAND;
351
352    node->m_cmd = new TrexStatelessDpCanQuit();
353
354    /* make sure it will be scheduled after the current node */
355    node->m_time = m_core->m_cur_time_sec ;
356
357    m_core->m_node_gen.add_node((CGenNode *)node);
358}
359
360
361void
362TrexStatelessDpCore::add_global_duration(double duration){
363    if (duration > 0.0) {
364        CGenNode *node = m_core->create_node() ;
365
366        node->m_type = CGenNode::EXIT_SCHED;
367
368        /* make sure it will be scheduled after the current node */
369        node->m_time = m_core->m_cur_time_sec + duration ;
370
371        m_core->m_node_gen.add_node(node);
372    }
373}
374
375/* add per port exit */
376void
377TrexStatelessDpCore::add_port_duration(double duration,
378                                       uint8_t port_id,
379                                       int event_id){
380    if (duration > 0.0) {
381        CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
382
383        node->m_type = CGenNode::COMMAND;
384
385        /* make sure it will be scheduled after the current node */
386        node->m_time = m_core->m_cur_time_sec + duration ;
387
388        TrexStatelessDpStop * cmd=new TrexStatelessDpStop(port_id);
389
390
391        /* test this */
392        m_core->m_non_active_nodes++;
393        cmd->set_core_ptr(m_core);
394        cmd->set_event_id(event_id);
395        cmd->set_wait_for_event_id(true);
396
397        node->m_cmd = cmd;
398
399        m_core->m_node_gen.add_node((CGenNode *)node);
400    }
401}
402
403
404void
405TrexStatelessDpCore::add_cont_stream(TrexStatelessDpPerPort * lp_port,
406                                     TrexStream * stream,
407                                     TrexStreamsCompiledObj *comp) {
408
409    CGenNodeStateless *node = m_core->create_node_sl();
410
411    /* add periodic */
412    node->m_type = CGenNode::STATELESS_PKT;
413
414    node->m_ref_stream_info  =   stream->clone_as_dp();
415
416    node->m_next_stream=0; /* will be fixed later */
417
418
419    if ( stream->m_self_start ){
420        /* if self start it is in active mode */
421        node->m_state =CGenNodeStateless::ss_ACTIVE;
422        lp_port->m_active_streams++;
423    }else{
424        node->m_state =CGenNodeStateless::ss_INACTIVE;
425    }
426
427    node->m_time = m_core->m_cur_time_sec + usec_to_sec(stream->m_isg_usec);
428
429    pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(stream->m_port_id);
430    node->m_flags = 0;
431
432    /* set socket id */
433    node->set_socket_id(m_core->m_node_gen.m_socket_id);
434
435    /* build a mbuf from a packet */
436
437    uint16_t pkt_size = stream->m_pkt.len;
438    const uint8_t *stream_pkt = stream->m_pkt.binary;
439
440    node->m_pause =0;
441    node->m_stream_type = stream->m_type;
442    node->m_next_time_offset =  1.0 / (stream->get_pps() * comp->get_multiplier()) ;
443
444
445    /* stateless specific fields */
446    switch ( stream->m_type ) {
447
448    case TrexStream::stCONTINUOUS :
449        node->m_single_burst=0;
450        node->m_single_burst_refill=0;
451        node->m_multi_bursts=0;
452        node->m_ibg_sec                 = 0.0;
453        break;
454
455    case TrexStream::stSINGLE_BURST :
456        node->m_stream_type             = TrexStream::stMULTI_BURST;
457        node->m_single_burst            = stream->m_burst_total_pkts;
458        node->m_single_burst_refill     = stream->m_burst_total_pkts;
459        node->m_multi_bursts            = 1;  /* single burst in multi burst of 1 */
460        node->m_ibg_sec                 = 0.0;
461        break;
462
463    case TrexStream::stMULTI_BURST :
464        node->m_single_burst        = stream->m_burst_total_pkts;
465        node->m_single_burst_refill = stream->m_burst_total_pkts;
466        node->m_multi_bursts        = stream->m_num_bursts;
467        node->m_ibg_sec             = usec_to_sec( stream->m_ibg_usec );
468        break;
469    default:
470
471        assert(0);
472    };
473
474    node->m_port_id = stream->m_port_id;
475
476    /* allocate const mbuf */
477    rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size);
478    assert(m);
479
480    char *p = rte_pktmbuf_append(m, pkt_size);
481    assert(p);
482    /* copy the packet */
483    memcpy(p,stream_pkt,pkt_size);
484
485    /* set dir 0 or 1 client or server */
486    node->set_mbuf_cache_dir(dir);
487
488    /* TBD repace the mac if req we should add flag  */
489    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, m);
490
491    /* set the packet as a readonly */
492    node->set_cache_mbuf(m);
493
494    CDpOneStream one_stream;
495
496    one_stream.m_dp_stream = node->m_ref_stream_info;
497    one_stream.m_node =node;
498
499    lp_port->m_active_nodes.push_back(one_stream);
500
501    /* schedule only if active */
502    if (node->m_state == CGenNodeStateless::ss_ACTIVE) {
503        m_core->m_node_gen.add_node((CGenNode *)node);
504    }
505}
506
507void
508TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj,
509                                   double duration,
510                                   int event_id) {
511
512
513    TrexStatelessDpPerPort * lp_port=get_port_db(obj->get_port_id());
514    lp_port->m_active_streams = 0;
515    lp_port->set_event_id(event_id);
516
517    /* no nodes in the list */
518    assert(lp_port->m_active_nodes.size()==0);
519
520    for (auto single_stream : obj->get_objects()) {
521        /* all commands should be for the same port */
522        assert(obj->get_port_id() == single_stream.m_stream->m_port_id);
523        add_cont_stream(lp_port,single_stream.m_stream,obj);
524    }
525
526    uint32_t nodes = lp_port->m_active_nodes.size();
527    /* find next stream */
528    assert(nodes == obj->get_objects().size());
529
530    int cnt=0;
531
532    /* set the next_stream pointer  */
533    for (auto single_stream : obj->get_objects()) {
534
535        if (single_stream.m_stream->is_dp_next_stream() ) {
536            int stream_id = single_stream.m_stream->m_next_stream_id;
537            assert(stream_id<nodes);
538            /* point to the next stream , stream_id is fixed */
539            lp_port->m_active_nodes[cnt].m_node->m_next_stream = lp_port->m_active_nodes[stream_id].m_node ;
540        }
541        cnt++;
542    }
543
544    lp_port->m_state =TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
545    m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
546
547
548    if ( duration > 0.0 ){
549        add_port_duration( duration ,obj->get_port_id(),event_id );
550    }
551
552}
553
554
555bool TrexStatelessDpCore::are_all_ports_idle(){
556
557    bool res=true;
558    int i;
559    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
560        if ( m_ports[i].m_state != TrexStatelessDpPerPort::ppSTATE_IDLE ){
561            res=false;
562        }
563    }
564    return (res);
565}
566
567
568void
569TrexStatelessDpCore::resume_traffic(uint8_t port_id){
570
571    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
572
573    lp_port->resume_traffic(port_id);
574}
575
576
577void
578TrexStatelessDpCore::pause_traffic(uint8_t port_id){
579
580    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
581
582    lp_port->pause_traffic(port_id);
583}
584
585
586void
587TrexStatelessDpCore::stop_traffic(uint8_t port_id,
588                                  bool stop_on_id,
589                                  int event_id) {
590    /* we cannot remove nodes not from the top of the queue so
591       for every active node - make sure next time
592       the scheduler invokes it, it will be free */
593
594    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
595
596    if ( lp_port->stop_traffic(port_id,stop_on_id,event_id) == false){
597        /* nothing to do ! already stopped */
598        //printf(" skip .. %f\n",m_core->m_cur_time_sec);
599        return;
600    }
601
602#if 0
603    if ( are_all_ports_idle() ) {
604        /* just a place holder if we will need to do somthing in that case */
605    }
606#endif
607
608    /* inform the control plane we stopped - this might be a async stop
609       (streams ended)
610     */
611    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
612    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
613                                                                   port_id,
614                                                                   TrexDpPortEvent::EVENT_STOP,
615                                                                   lp_port->get_event_id());
616    ring->Enqueue((CGenNode *)event_msg);
617
618}
619
620/**
621 * handle a message from CP to DP
622 *
623 */
624void
625TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) {
626    msg->handle(this);
627    delete msg;
628}
629
630