trex_stateless_dp_core.cpp revision 36dc8ea5
1/*
2 Itay Marom
3 Hanoch Haim
4 Cisco Systems, Inc.
5*/
6
7/*
8Copyright (c) 2015-2015 Cisco Systems, Inc.
9
10Licensed under the Apache License, Version 2.0 (the "License");
11you may not use this file except in compliance with the License.
12You may obtain a copy of the License at
13
14    http://www.apache.org/licenses/LICENSE-2.0
15
16Unless required by applicable law or agreed to in writing, software
17distributed under the License is distributed on an "AS IS" BASIS,
18WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19See the License for the specific language governing permissions and
20limitations under the License.
21*/
22#include <trex_stateless_dp_core.h>
23#include <trex_stateless_messaging.h>
24#include <trex_streams_compiler.h>
25#include <trex_stream_node.h>
26#include <trex_stream.h>
27
28#include <bp_sim.h>
29
30
31void CDpOneStream::Delete(CFlowGenListPerThread   * core){
32    assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE);
33    core->free_node((CGenNode *)m_node);
34    delete m_dp_stream;
35    m_node=0;
36    m_dp_stream=0;
37}
38
39void CDpOneStream::DeleteOnlyStream(){
40    assert(m_dp_stream);
41    delete m_dp_stream;
42    m_dp_stream=0;
43}
44
45int CGenNodeStateless::get_stream_id(){
46    if (m_state ==CGenNodeStateless::ss_FREE_RESUSE) {
47        return (-1); // not valid
48    }
49    assert(m_ref_stream_info);
50    return ((int)m_ref_stream_info->m_stream_id);
51}
52
53
54void CGenNodeStateless::DumpHeader(FILE *fd){
55    fprintf(fd," pkt_id, time, port , action , state, stream_id , stype , m-burst# , burst# \n");
56
57}
58void CGenNodeStateless::Dump(FILE *fd){
59    fprintf(fd," %2.4f, %3lu, %s,%s, %3d, %s, %3lu, %3lu  \n",
60            m_time,
61            (ulong)m_port_id,
62            "s-pkt", //action
63            get_stream_state_str(m_state ).c_str(),
64            get_stream_id(),   //stream_id
65            TrexStream::get_stream_type_str(m_stream_type).c_str(), //stype
66            (ulong)m_multi_bursts,
67            (ulong)m_single_burst
68            );
69}
70
71
72void CGenNodeStateless::refresh(){
73
74    /* refill the stream info */
75    m_single_burst    = m_single_burst_refill;
76    m_multi_bursts    = m_ref_stream_info->m_num_bursts;
77    m_state           = CGenNodeStateless::ss_ACTIVE;
78}
79
80
81
82void CGenNodeCommand::free_command(){
83    assert(m_cmd);
84    delete m_cmd;
85}
86
87
88std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state){
89    std::string res;
90
91    switch (stream_state) {
92    case CGenNodeStateless::ss_FREE_RESUSE :
93         res="FREE    ";
94        break;
95    case CGenNodeStateless::ss_INACTIVE :
96        res="INACTIVE ";
97        break;
98    case CGenNodeStateless::ss_ACTIVE :
99        res="ACTIVE   ";
100        break;
101    default:
102        res="Unknow   ";
103    };
104    return(res);
105}
106
107
108void CGenNodeStateless::free_stl_node(){
109    /* if we have cache mbuf free it */
110    rte_mbuf_t * m=get_cache_mbuf();
111    if (m) {
112        rte_pktmbuf_free(m);
113        m_cache_mbuf=0;
114    }
115}
116
117
118bool TrexStatelessDpPerPort::update_number_of_active_streams(uint32_t d){
119    m_active_streams-=d; /* reduce the number of streams */
120    if (m_active_streams == 0) {
121        return (true);
122    }
123    return (false);
124}
125
126
127bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id){
128
129    /* there could be race of stop after stop */
130    if (m_state == TrexStatelessDpPerPort::ppSTATE_IDLE) {
131        assert(m_active_streams==0);
132        return false;
133    }
134
135    for (auto dp_stream : m_active_nodes) {
136        CGenNodeStateless * node =dp_stream.m_node;
137        assert(node->get_port_id() == port_id);
138        if ( node->get_state() == CGenNodeStateless::ss_ACTIVE) {
139            node->mark_for_free();
140            m_active_streams--;
141            dp_stream.DeleteOnlyStream();
142
143        }else{
144            dp_stream.Delete(m_core);
145        }
146    }
147
148    /* active stream should be zero */
149    assert(m_active_streams==0);
150    m_active_nodes.clear();
151    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
152    return (true);
153}
154
155
156void TrexStatelessDpPerPort::create(CFlowGenListPerThread   *  core){
157    m_core=core;
158    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
159    m_port_id=0;
160    m_active_streams=0;
161    m_active_nodes.clear();
162}
163
164
165
166void
167TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) {
168    m_thread_id = thread_id;
169    m_core = core;
170    m_local_port_offset = 2*core->getDualPortId();
171
172    CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
173
174    m_ring_from_cp = cp_dp->getRingCpToDp(thread_id);
175    m_ring_to_cp   = cp_dp->getRingDpToCp(thread_id);
176
177    m_state = STATE_IDLE;
178
179    int i;
180    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
181        m_ports[i].create(core);
182    }
183}
184
185
186/* move to the next stream, old stream move to INACTIVE */
187bool TrexStatelessDpCore::set_stateless_next_node(CGenNodeStateless * cur_node,
188                                                  CGenNodeStateless * next_node){
189
190    assert(cur_node);
191    TrexStatelessDpPerPort * lp_port = get_port_db(cur_node->m_port_id);
192    bool schedule =false;
193
194    bool to_stop_port=false;
195
196    if (next_node == NULL) {
197        /* there is no next stream , reduce the number of active streams*/
198        to_stop_port = lp_port->update_number_of_active_streams(1);
199
200    }else{
201        uint8_t state=next_node->get_state();
202
203        /* can't be FREE_RESUSE */
204        assert(state != CGenNodeStateless::ss_FREE_RESUSE);
205        if (next_node->get_state() == CGenNodeStateless::ss_INACTIVE ) {
206
207            /* refill start info and scedule, no update in active streams  */
208            next_node->refresh();
209            schedule = true;
210
211        }else{
212            to_stop_port = lp_port->update_number_of_active_streams(1);
213        }
214    }
215
216    if ( to_stop_port ) {
217        /* call stop port explictly to move the state */
218        stop_traffic(cur_node->m_port_id);
219    }
220
221    return ( schedule );
222}
223
224
225
226/**
227 * in idle state loop, the processor most of the time sleeps
228 * and periodically checks for messages
229 *
230 * @author imarom (01-Nov-15)
231 */
232void
233TrexStatelessDpCore::idle_state_loop() {
234
235    while (m_state == STATE_IDLE) {
236        periodic_check_for_cp_messages();
237        delay(200);
238    }
239}
240
241
242
243void TrexStatelessDpCore::quit_main_loop(){
244    m_core->set_terminate_mode(true); /* mark it as terminated */
245    m_state = STATE_TERMINATE;
246    add_global_duration(0.0001);
247}
248
249
250/**
251 * scehduler runs when traffic exists
252 * it will return when no more transmitting is done on this
253 * core
254 *
255 * @author imarom (01-Nov-15)
256 */
257void
258TrexStatelessDpCore::start_scheduler() {
259    /* creates a maintenace job using the scheduler */
260    CGenNode * node_sync = m_core->create_node() ;
261    node_sync->m_type = CGenNode::FLOW_SYNC;
262    node_sync->m_time = m_core->m_cur_time_sec + SYNC_TIME_OUT;
263    m_core->m_node_gen.add_node(node_sync);
264
265    double old_offset = 0.0;
266    m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset);
267    /* bail out in case of terminate */
268    if (m_state != TrexStatelessDpCore::STATE_TERMINATE) {
269        m_core->m_node_gen.close_file(m_core);
270    }
271}
272
273
274void
275TrexStatelessDpCore::run_once(){
276
277    idle_state_loop();
278
279    if ( m_state == STATE_TERMINATE ){
280        return;
281    }
282
283    start_scheduler();
284}
285
286
287void
288TrexStatelessDpCore::start() {
289
290    while (true) {
291        run_once();
292
293        if ( m_core->is_terminated_by_master() ) {
294            break;
295        }
296    }
297}
298
299/* only if both port are idle we can exit */
300void
301TrexStatelessDpCore::schedule_exit(){
302
303    CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
304
305    node->m_type = CGenNode::COMMAND;
306
307    node->m_cmd = new TrexStatelessDpCanQuit();
308
309    /* make sure it will be scheduled after the current node */
310    node->m_time = m_core->m_cur_time_sec ;
311
312    m_core->m_node_gen.add_node((CGenNode *)node);
313}
314
315
316void
317TrexStatelessDpCore::add_global_duration(double duration){
318    if (duration > 0.0) {
319        CGenNode *node = m_core->create_node() ;
320
321        node->m_type = CGenNode::EXIT_SCHED;
322
323        /* make sure it will be scheduled after the current node */
324        node->m_time = m_core->m_cur_time_sec + duration ;
325
326        m_core->m_node_gen.add_node(node);
327    }
328}
329
330/* add per port exit */
331void
332TrexStatelessDpCore::add_port_duration(double duration,
333                                  uint8_t port_id){
334    if (duration > 0.0) {
335        CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
336
337        node->m_type = CGenNode::COMMAND;
338
339        /* make sure it will be scheduled after the current node */
340        node->m_time = m_core->m_cur_time_sec + duration ;
341
342        node->m_cmd = new TrexStatelessDpStop(port_id);
343
344        m_core->m_node_gen.add_node((CGenNode *)node);
345    }
346}
347
348
349void
350TrexStatelessDpCore::add_cont_stream(TrexStatelessDpPerPort * lp_port,
351                                     TrexStream * stream,
352                                     TrexStreamsCompiledObj *comp) {
353
354    CGenNodeStateless *node = m_core->create_node_sl();
355
356    /* add periodic */
357    node->m_type = CGenNode::STATELESS_PKT;
358
359    node->m_ref_stream_info  =   stream->clone_as_dp();
360
361    node->m_next_stream=0; /* will be fixed later */
362
363
364    if ( stream->m_self_start ){
365        /* if self start it is in active mode */
366        node->m_state =CGenNodeStateless::ss_ACTIVE;
367        lp_port->m_active_streams++;
368    }else{
369        node->m_state =CGenNodeStateless::ss_INACTIVE;
370    }
371
372    node->m_time = m_core->m_cur_time_sec + usec_to_sec(stream->m_isg_usec);
373
374    pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(stream->m_port_id);
375    node->m_flags = 0;
376
377    /* set socket id */
378    node->set_socket_id(m_core->m_node_gen.m_socket_id);
379
380    /* build a mbuf from a packet */
381
382    uint16_t pkt_size = stream->m_pkt.len;
383    const uint8_t *stream_pkt = stream->m_pkt.binary;
384
385    node->m_stream_type = stream->m_type;
386    node->m_next_time_offset =  1.0 / (stream->get_pps() * comp->get_multiplier()) ;
387
388
389    /* stateless specific fields */
390    switch ( stream->m_type ) {
391
392    case TrexStream::stCONTINUOUS :
393        node->m_single_burst=0;
394        node->m_single_burst_refill=0;
395        node->m_multi_bursts=0;
396        node->m_ibg_sec                 = 0.0;
397        break;
398
399    case TrexStream::stSINGLE_BURST :
400        node->m_stream_type             = TrexStream::stMULTI_BURST;
401        node->m_single_burst            = stream->m_burst_total_pkts;
402        node->m_single_burst_refill     = stream->m_burst_total_pkts;
403        node->m_multi_bursts            = 1;  /* single burst in multi burst of 1 */
404        node->m_ibg_sec                 = 0.0;
405        break;
406
407    case TrexStream::stMULTI_BURST :
408        node->m_single_burst        = stream->m_burst_total_pkts;
409        node->m_single_burst_refill = stream->m_burst_total_pkts;
410        node->m_multi_bursts        = stream->m_num_bursts;
411        node->m_ibg_sec             = usec_to_sec( stream->m_ibg_usec );
412        break;
413    default:
414
415        assert(0);
416    };
417
418    node->m_port_id = stream->m_port_id;
419
420    /* allocate const mbuf */
421    rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size);
422    assert(m);
423
424    char *p = rte_pktmbuf_append(m, pkt_size);
425    assert(p);
426    /* copy the packet */
427    memcpy(p,stream_pkt,pkt_size);
428
429    /* set dir 0 or 1 client or server */
430    node->set_mbuf_cache_dir(dir);
431
432    /* TBD repace the mac if req we should add flag  */
433    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, m);
434
435    /* set the packet as a readonly */
436    node->set_cache_mbuf(m);
437
438    CDpOneStream one_stream;
439
440    one_stream.m_dp_stream = node->m_ref_stream_info;
441    one_stream.m_node =node;
442
443    lp_port->m_active_nodes.push_back(one_stream);
444
445    /* schedule only if active */
446    if (node->m_state == CGenNodeStateless::ss_ACTIVE) {
447        m_core->m_node_gen.add_node((CGenNode *)node);
448    }
449}
450
451void
452TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj,
453                                   double duration) {
454
455#if 0
456    /* TBD to remove ! */
457    obj->Dump(stdout);
458#endif
459
460    TrexStatelessDpPerPort * lp_port=get_port_db(obj->get_port_id());
461    lp_port->m_active_streams = 0;
462    /* no nodes in the list */
463    assert(lp_port->m_active_nodes.size()==0);
464
465    for (auto single_stream : obj->get_objects()) {
466        /* all commands should be for the same port */
467        assert(obj->get_port_id() == single_stream.m_stream->m_port_id);
468        add_cont_stream(lp_port,single_stream.m_stream,obj);
469    }
470
471    uint32_t nodes = lp_port->m_active_nodes.size();
472    /* find next stream */
473    assert(nodes == obj->get_objects().size());
474
475    int cnt=0;
476
477    /* set the next_stream pointer  */
478    for (auto single_stream : obj->get_objects()) {
479
480        if (single_stream.m_stream->is_dp_next_stream() ) {
481            int stream_id = single_stream.m_stream->m_next_stream_id;
482            assert(stream_id<nodes);
483            /* point to the next stream , stream_id is fixed */
484            lp_port->m_active_nodes[cnt].m_node->m_next_stream = lp_port->m_active_nodes[stream_id].m_node ;
485        }
486        cnt++;
487    }
488
489    lp_port->m_state =TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
490    m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
491
492
493    if ( duration > 0.0 ){
494        add_port_duration( duration ,obj->get_port_id() );
495    }
496
497}
498
499
500bool TrexStatelessDpCore::are_all_ports_idle(){
501
502    bool res=true;
503    int i;
504    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
505        if ( m_ports[i].m_state != TrexStatelessDpPerPort::ppSTATE_IDLE ){
506            res=false;
507        }
508    }
509    return (res);
510}
511
512
513void
514TrexStatelessDpCore::stop_traffic(uint8_t port_id) {
515    /* we cannot remove nodes not from the top of the queue so
516       for every active node - make sure next time
517       the scheduler invokes it, it will be free */
518
519    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
520
521    if ( lp_port->stop_traffic(port_id) == false){
522        /* nothing to do ! already stopped */
523        return;
524    }
525
526
527    if ( are_all_ports_idle() ) {
528
529        schedule_exit();
530    }
531
532    /* inform the control plane we stopped - this might be a async stop
533       (streams ended)
534     */
535    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
536    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
537                                                                   port_id,
538                                                                   TrexDpPortEvent::EVENT_STOP,
539                                                                   lp_port->get_event_id());
540    ring->Enqueue((CGenNode *)event_msg);
541
542}
543
544/**
545 * handle a message from CP to DP
546 *
547 */
548void
549TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) {
550    msg->handle(this);
551    delete msg;
552}
553
554