trex_stateless_dp_core.cpp revision 3408c030
1/*
2 Itay Marom
3 Hanoch Haim
4 Cisco Systems, Inc.
5*/
6
7/*
8Copyright (c) 2015-2015 Cisco Systems, Inc.
9
10Licensed under the Apache License, Version 2.0 (the "License");
11you may not use this file except in compliance with the License.
12You may obtain a copy of the License at
13
14    http://www.apache.org/licenses/LICENSE-2.0
15
16Unless required by applicable law or agreed to in writing, software
17distributed under the License is distributed on an "AS IS" BASIS,
18WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19See the License for the specific language governing permissions and
20limitations under the License.
21*/
22#include <trex_stateless_dp_core.h>
23#include <trex_stateless_messaging.h>
24#include <trex_streams_compiler.h>
25#include <trex_stream_node.h>
26#include <trex_stream.h>
27
28#include <bp_sim.h>
29
30
31void CDpOneStream::Delete(CFlowGenListPerThread   * core){
32    assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE);
33    core->free_node((CGenNode *)m_node);
34    delete m_dp_stream;
35    m_node=0;
36    m_dp_stream=0;
37}
38
39void CDpOneStream::DeleteOnlyStream(){
40    assert(m_dp_stream);
41    delete m_dp_stream;
42    m_dp_stream=0;
43}
44
45int CGenNodeStateless::get_stream_id(){
46    if (m_state ==CGenNodeStateless::ss_FREE_RESUSE) {
47        return (-1); // not valid
48    }
49    assert(m_ref_stream_info);
50    return ((int)m_ref_stream_info->m_stream_id);
51}
52
53
54void CGenNodeStateless::DumpHeader(FILE *fd){
55    fprintf(fd," pkt_id, time, port , action , state, stream_id , stype , m-burst# , burst# \n");
56
57}
58void CGenNodeStateless::Dump(FILE *fd){
59    fprintf(fd," %2.4f, %3lu, %s,%s, %3d, %s, %3lu, %3lu  \n",
60            m_time,
61            (ulong)m_port_id,
62            "s-pkt", //action
63            get_stream_state_str(m_state ).c_str(),
64            get_stream_id(),   //stream_id
65            TrexStream::get_stream_type_str(m_stream_type).c_str(), //stype
66            (ulong)m_multi_bursts,
67            (ulong)m_single_burst
68            );
69}
70
71
72void CGenNodeStateless::refresh(){
73
74    /* refill the stream info */
75    m_single_burst    = m_single_burst_refill;
76    m_multi_bursts    = m_ref_stream_info->m_num_bursts;
77    m_state           = CGenNodeStateless::ss_ACTIVE;
78}
79
80
81void CGenNodeCommand::free_command(){
82
83    assert(m_cmd);
84    m_cmd->on_node_remove();
85    delete m_cmd;
86}
87
88
89std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state){
90    std::string res;
91
92    switch (stream_state) {
93    case CGenNodeStateless::ss_FREE_RESUSE :
94         res="FREE    ";
95        break;
96    case CGenNodeStateless::ss_INACTIVE :
97        res="INACTIVE ";
98        break;
99    case CGenNodeStateless::ss_ACTIVE :
100        res="ACTIVE   ";
101        break;
102    default:
103        res="Unknow   ";
104    };
105    return(res);
106}
107
108
109void CGenNodeStateless::free_stl_node(){
110    /* if we have cache mbuf free it */
111    rte_mbuf_t * m=get_cache_mbuf();
112    if (m) {
113        rte_pktmbuf_free(m);
114        m_cache_mbuf=0;
115    }
116}
117
118
119bool TrexStatelessDpPerPort::update_number_of_active_streams(uint32_t d){
120    m_active_streams-=d; /* reduce the number of streams */
121    if (m_active_streams == 0) {
122        return (true);
123    }
124    return (false);
125}
126
127
128bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id,
129                                          bool stop_on_id,
130                                          int event_id){
131
132    if (m_state == TrexStatelessDpPerPort::ppSTATE_IDLE) {
133        assert(m_active_streams==0);
134        return false;
135    }
136
137    /* there could be race of stop after stop */
138    if ( stop_on_id ) {
139        if (event_id != m_event_id){
140            /* we can't stop it is an old message */
141            return false;
142        }
143    }
144
145    for (auto dp_stream : m_active_nodes) {
146        CGenNodeStateless * node =dp_stream.m_node;
147        assert(node->get_port_id() == port_id);
148        if ( node->get_state() == CGenNodeStateless::ss_ACTIVE) {
149            node->mark_for_free();
150            m_active_streams--;
151            dp_stream.DeleteOnlyStream();
152
153        }else{
154            dp_stream.Delete(m_core);
155        }
156    }
157
158    /* active stream should be zero */
159    assert(m_active_streams==0);
160    m_active_nodes.clear();
161    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
162    return (true);
163}
164
165
166void TrexStatelessDpPerPort::create(CFlowGenListPerThread   *  core){
167    m_core=core;
168    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
169    m_port_id=0;
170    m_active_streams=0;
171    m_active_nodes.clear();
172}
173
174
175
176void
177TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) {
178    m_thread_id = thread_id;
179    m_core = core;
180    m_local_port_offset = 2*core->getDualPortId();
181
182    CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
183
184    m_ring_from_cp = cp_dp->getRingCpToDp(thread_id);
185    m_ring_to_cp   = cp_dp->getRingDpToCp(thread_id);
186
187    m_state = STATE_IDLE;
188
189    int i;
190    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
191        m_ports[i].create(core);
192    }
193}
194
195
196/* move to the next stream, old stream move to INACTIVE */
197bool TrexStatelessDpCore::set_stateless_next_node(CGenNodeStateless * cur_node,
198                                                  CGenNodeStateless * next_node){
199
200    assert(cur_node);
201    TrexStatelessDpPerPort * lp_port = get_port_db(cur_node->m_port_id);
202    bool schedule =false;
203
204    bool to_stop_port=false;
205
206    if (next_node == NULL) {
207        /* there is no next stream , reduce the number of active streams*/
208        to_stop_port = lp_port->update_number_of_active_streams(1);
209
210    }else{
211        uint8_t state=next_node->get_state();
212
213        /* can't be FREE_RESUSE */
214        assert(state != CGenNodeStateless::ss_FREE_RESUSE);
215        if (next_node->get_state() == CGenNodeStateless::ss_INACTIVE ) {
216
217            /* refill start info and scedule, no update in active streams  */
218            next_node->refresh();
219            schedule = true;
220
221        }else{
222            to_stop_port = lp_port->update_number_of_active_streams(1);
223        }
224    }
225
226    if ( to_stop_port ) {
227        /* call stop port explictly to move the state */
228        stop_traffic(cur_node->m_port_id,false,0);
229    }
230
231    return ( schedule );
232}
233
234
235
236/**
237 * in idle state loop, the processor most of the time sleeps
238 * and periodically checks for messages
239 *
240 * @author imarom (01-Nov-15)
241 */
242void
243TrexStatelessDpCore::idle_state_loop() {
244
245    while (m_state == STATE_IDLE) {
246        periodic_check_for_cp_messages();
247        delay(200);
248    }
249}
250
251
252
253void TrexStatelessDpCore::quit_main_loop(){
254    m_core->set_terminate_mode(true); /* mark it as terminated */
255    m_state = STATE_TERMINATE;
256    add_global_duration(0.0001);
257}
258
259
260/**
261 * scehduler runs when traffic exists
262 * it will return when no more transmitting is done on this
263 * core
264 *
265 * @author imarom (01-Nov-15)
266 */
267void
268TrexStatelessDpCore::start_scheduler() {
269    /* creates a maintenace job using the scheduler */
270    CGenNode * node_sync = m_core->create_node() ;
271    node_sync->m_type = CGenNode::FLOW_SYNC;
272    node_sync->m_time = m_core->m_cur_time_sec + SYNC_TIME_OUT;
273    m_core->m_node_gen.add_node(node_sync);
274
275    double old_offset = 0.0;
276    m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset);
277    /* bail out in case of terminate */
278    if (m_state != TrexStatelessDpCore::STATE_TERMINATE) {
279        m_core->m_node_gen.close_file(m_core);
280    }
281}
282
283
284void
285TrexStatelessDpCore::run_once(){
286
287    idle_state_loop();
288
289    if ( m_state == STATE_TERMINATE ){
290        return;
291    }
292
293    start_scheduler();
294}
295
296
297void
298TrexStatelessDpCore::start() {
299
300    while (true) {
301        run_once();
302
303        if ( m_core->is_terminated_by_master() ) {
304            break;
305        }
306    }
307}
308
309/* only if both port are idle we can exit */
310void
311TrexStatelessDpCore::schedule_exit(){
312
313    CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
314
315    node->m_type = CGenNode::COMMAND;
316
317    node->m_cmd = new TrexStatelessDpCanQuit();
318
319    /* make sure it will be scheduled after the current node */
320    node->m_time = m_core->m_cur_time_sec ;
321
322    m_core->m_node_gen.add_node((CGenNode *)node);
323}
324
325
326void
327TrexStatelessDpCore::add_global_duration(double duration){
328    if (duration > 0.0) {
329        CGenNode *node = m_core->create_node() ;
330
331        node->m_type = CGenNode::EXIT_SCHED;
332
333        /* make sure it will be scheduled after the current node */
334        node->m_time = m_core->m_cur_time_sec + duration ;
335
336        m_core->m_node_gen.add_node(node);
337    }
338}
339
340/* add per port exit */
341void
342TrexStatelessDpCore::add_port_duration(double duration,
343                                       uint8_t port_id,
344                                       int event_id){
345    if (duration > 0.0) {
346        CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
347
348        node->m_type = CGenNode::COMMAND;
349
350        /* make sure it will be scheduled after the current node */
351        node->m_time = m_core->m_cur_time_sec + duration ;
352
353        TrexStatelessDpStop * cmd=new TrexStatelessDpStop(port_id);
354
355
356        /* test this */
357        m_core->m_non_active_nodes++;
358        cmd->set_core_ptr(m_core);
359        cmd->set_event_id(event_id);
360        cmd->set_wait_for_event_id(true);
361
362        node->m_cmd = cmd;
363
364        m_core->m_node_gen.add_node((CGenNode *)node);
365    }
366}
367
368
369void
370TrexStatelessDpCore::add_cont_stream(TrexStatelessDpPerPort * lp_port,
371                                     TrexStream * stream,
372                                     TrexStreamsCompiledObj *comp) {
373
374    CGenNodeStateless *node = m_core->create_node_sl();
375
376    /* add periodic */
377    node->m_type = CGenNode::STATELESS_PKT;
378
379    node->m_ref_stream_info  =   stream->clone_as_dp();
380
381    node->m_next_stream=0; /* will be fixed later */
382
383
384    if ( stream->m_self_start ){
385        /* if self start it is in active mode */
386        node->m_state =CGenNodeStateless::ss_ACTIVE;
387        lp_port->m_active_streams++;
388    }else{
389        node->m_state =CGenNodeStateless::ss_INACTIVE;
390    }
391
392    node->m_time = m_core->m_cur_time_sec + usec_to_sec(stream->m_isg_usec);
393
394    pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(stream->m_port_id);
395    node->m_flags = 0;
396
397    /* set socket id */
398    node->set_socket_id(m_core->m_node_gen.m_socket_id);
399
400    /* build a mbuf from a packet */
401
402    uint16_t pkt_size = stream->m_pkt.len;
403    const uint8_t *stream_pkt = stream->m_pkt.binary;
404
405    node->m_stream_type = stream->m_type;
406    node->m_next_time_offset =  1.0 / (stream->get_pps() * comp->get_multiplier()) ;
407
408
409    /* stateless specific fields */
410    switch ( stream->m_type ) {
411
412    case TrexStream::stCONTINUOUS :
413        node->m_single_burst=0;
414        node->m_single_burst_refill=0;
415        node->m_multi_bursts=0;
416        node->m_ibg_sec                 = 0.0;
417        break;
418
419    case TrexStream::stSINGLE_BURST :
420        node->m_stream_type             = TrexStream::stMULTI_BURST;
421        node->m_single_burst            = stream->m_burst_total_pkts;
422        node->m_single_burst_refill     = stream->m_burst_total_pkts;
423        node->m_multi_bursts            = 1;  /* single burst in multi burst of 1 */
424        node->m_ibg_sec                 = 0.0;
425        break;
426
427    case TrexStream::stMULTI_BURST :
428        node->m_single_burst        = stream->m_burst_total_pkts;
429        node->m_single_burst_refill = stream->m_burst_total_pkts;
430        node->m_multi_bursts        = stream->m_num_bursts;
431        node->m_ibg_sec             = usec_to_sec( stream->m_ibg_usec );
432        break;
433    default:
434
435        assert(0);
436    };
437
438    node->m_port_id = stream->m_port_id;
439
440    /* allocate const mbuf */
441    rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size);
442    assert(m);
443
444    char *p = rte_pktmbuf_append(m, pkt_size);
445    assert(p);
446    /* copy the packet */
447    memcpy(p,stream_pkt,pkt_size);
448
449    /* set dir 0 or 1 client or server */
450    node->set_mbuf_cache_dir(dir);
451
452    /* TBD repace the mac if req we should add flag  */
453    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, m);
454
455    /* set the packet as a readonly */
456    node->set_cache_mbuf(m);
457
458    CDpOneStream one_stream;
459
460    one_stream.m_dp_stream = node->m_ref_stream_info;
461    one_stream.m_node =node;
462
463    lp_port->m_active_nodes.push_back(one_stream);
464
465    /* schedule only if active */
466    if (node->m_state == CGenNodeStateless::ss_ACTIVE) {
467        m_core->m_node_gen.add_node((CGenNode *)node);
468    }
469}
470
471void
472TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj,
473                                   double duration,
474                                   int event_id) {
475
476
477    TrexStatelessDpPerPort * lp_port=get_port_db(obj->get_port_id());
478    lp_port->m_active_streams = 0;
479    lp_port->set_event_id(event_id);
480
481    /* no nodes in the list */
482    assert(lp_port->m_active_nodes.size()==0);
483
484    for (auto single_stream : obj->get_objects()) {
485        /* all commands should be for the same port */
486        assert(obj->get_port_id() == single_stream.m_stream->m_port_id);
487        add_cont_stream(lp_port,single_stream.m_stream,obj);
488    }
489
490    uint32_t nodes = lp_port->m_active_nodes.size();
491    /* find next stream */
492    assert(nodes == obj->get_objects().size());
493
494    int cnt=0;
495
496    /* set the next_stream pointer  */
497    for (auto single_stream : obj->get_objects()) {
498
499        if (single_stream.m_stream->is_dp_next_stream() ) {
500            int stream_id = single_stream.m_stream->m_next_stream_id;
501            assert(stream_id<nodes);
502            /* point to the next stream , stream_id is fixed */
503            lp_port->m_active_nodes[cnt].m_node->m_next_stream = lp_port->m_active_nodes[stream_id].m_node ;
504        }
505        cnt++;
506    }
507
508    lp_port->m_state =TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
509    m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
510
511
512    if ( duration > 0.0 ){
513        add_port_duration( duration ,obj->get_port_id(),event_id );
514    }
515
516}
517
518
519bool TrexStatelessDpCore::are_all_ports_idle(){
520
521    bool res=true;
522    int i;
523    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
524        if ( m_ports[i].m_state != TrexStatelessDpPerPort::ppSTATE_IDLE ){
525            res=false;
526        }
527    }
528    return (res);
529}
530
531
532void
533TrexStatelessDpCore::stop_traffic(uint8_t port_id,
534                                  bool stop_on_id,
535                                  int event_id) {
536    /* we cannot remove nodes not from the top of the queue so
537       for every active node - make sure next time
538       the scheduler invokes it, it will be free */
539
540    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
541
542    if ( lp_port->stop_traffic(port_id,stop_on_id,event_id) == false){
543        /* nothing to do ! already stopped */
544        //printf(" skip .. %f\n",m_core->m_cur_time_sec);
545        return;
546    }
547
548#if 0
549    if ( are_all_ports_idle() ) {
550        /* just a place holder if we will need to do somthing in that case */
551    }
552#endif
553
554    /* inform the control plane we stopped - this might be a async stop
555       (streams ended)
556     */
557    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
558    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
559                                                                   port_id,
560                                                                   TrexDpPortEvent::EVENT_STOP,
561                                                                   lp_port->get_event_id());
562    ring->Enqueue((CGenNode *)event_msg);
563
564}
565
566/**
567 * handle a message from CP to DP
568 *
569 */
570void
571TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) {
572    msg->handle(this);
573    delete msg;
574}
575
576