trex_stateless_dp_core.cpp revision b094110e
1/*
2 Itay Marom
3 Hanoch Haim
4 Cisco Systems, Inc.
5*/
6
7/*
8Copyright (c) 2015-2015 Cisco Systems, Inc.
9
10Licensed under the Apache License, Version 2.0 (the "License");
11you may not use this file except in compliance with the License.
12You may obtain a copy of the License at
13
14    http://www.apache.org/licenses/LICENSE-2.0
15
16Unless required by applicable law or agreed to in writing, software
17distributed under the License is distributed on an "AS IS" BASIS,
18WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19See the License for the specific language governing permissions and
20limitations under the License.
21*/
22#include <trex_stateless_dp_core.h>
23#include <trex_stateless_messaging.h>
24#include <trex_streams_compiler.h>
25#include <trex_stream_node.h>
26#include <trex_stream.h>
27
28#include <bp_sim.h>
29
30
31void CDpOneStream::Delete(CFlowGenListPerThread   * core){
32    assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE);
33    core->free_node((CGenNode *)m_node);
34    delete m_dp_stream;
35    m_node=0;
36    m_dp_stream=0;
37}
38
39void CDpOneStream::DeleteOnlyStream(){
40    assert(m_dp_stream);
41    delete m_dp_stream;
42    m_dp_stream=0;
43}
44
45int CGenNodeStateless::get_stream_id(){
46    if (m_state ==CGenNodeStateless::ss_FREE_RESUSE) {
47        return (-1); // not valid
48    }
49    assert(m_ref_stream_info);
50    return ((int)m_ref_stream_info->m_stream_id);
51}
52
53
54void CGenNodeStateless::DumpHeader(FILE *fd){
55    fprintf(fd," pkt_id, time, port , action , state, stream_id , stype , m-burst# , burst# \n");
56
57}
58void CGenNodeStateless::Dump(FILE *fd){
59    fprintf(fd," %2.4f, %3lu, %s,%s, %3d, %s, %3lu, %3lu  \n",
60            m_time,
61            (ulong)m_port_id,
62            "s-pkt", //action
63            get_stream_state_str(m_state ).c_str(),
64            get_stream_id(),   //stream_id
65            TrexStream::get_stream_type_str(m_stream_type).c_str(), //stype
66            (ulong)m_multi_bursts,
67            (ulong)m_single_burst
68            );
69}
70
71
72void CGenNodeStateless::refresh(){
73
74    /* refill the stream info */
75    m_single_burst    = m_single_burst_refill;
76    m_multi_bursts    = m_ref_stream_info->m_num_bursts;
77    m_state           = CGenNodeStateless::ss_ACTIVE;
78}
79
80
81
82void CGenNodeCommand::free_command(){
83    assert(m_cmd);
84    delete m_cmd;
85}
86
87
88std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state){
89    std::string res;
90
91    switch (stream_state) {
92    case CGenNodeStateless::ss_FREE_RESUSE :
93         res="FREE    ";
94        break;
95    case CGenNodeStateless::ss_INACTIVE :
96        res="INACTIVE ";
97        break;
98    case CGenNodeStateless::ss_ACTIVE :
99        res="ACTIVE   ";
100        break;
101    default:
102        res="Unknow   ";
103    };
104    return(res);
105}
106
107
108void CGenNodeStateless::free_stl_node(){
109    /* if we have cache mbuf free it */
110    rte_mbuf_t * m=get_cache_mbuf();
111    if (m) {
112        rte_pktmbuf_free(m);
113        m_cache_mbuf=0;
114    }
115}
116
117
118bool TrexStatelessDpPerPort::update_number_of_active_streams(uint32_t d){
119    m_active_streams-=d; /* reduce the number of streams */
120    if (m_active_streams == 0) {
121        return (true);
122    }
123    return (false);
124}
125
126
127void TrexStatelessDpPerPort::stop_traffic(uint8_t port_id){
128
129    assert(m_state==TrexStatelessDpPerPort::ppSTATE_TRANSMITTING);
130
131    for (auto dp_stream : m_active_nodes) {
132        CGenNodeStateless * node =dp_stream.m_node;
133        assert(node->get_port_id() == port_id);
134        if ( node->get_state() == CGenNodeStateless::ss_ACTIVE) {
135            node->mark_for_free();
136            m_active_streams--;
137            dp_stream.DeleteOnlyStream();
138
139        }else{
140            dp_stream.Delete(m_core);
141        }
142    }
143
144    /* active stream should be zero */
145    assert(m_active_streams==0);
146    m_active_nodes.clear();
147    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
148}
149
150
151void TrexStatelessDpPerPort::create(CFlowGenListPerThread   *  core){
152    m_core=core;
153    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
154    m_port_id=0;
155    m_active_streams=0;
156    m_active_nodes.clear();
157}
158
159
160
161void
162TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) {
163    m_thread_id = thread_id;
164    m_core = core;
165    m_local_port_offset = 2*core->getDualPortId();
166
167    CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
168
169    m_ring_from_cp = cp_dp->getRingCpToDp(thread_id);
170    m_ring_to_cp   = cp_dp->getRingDpToCp(thread_id);
171
172    m_state = STATE_IDLE;
173
174    int i;
175    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
176        m_ports[i].create(core);
177    }
178}
179
180
181/* move to the next stream, old stream move to INACTIVE */
182bool TrexStatelessDpCore::set_stateless_next_node(CGenNodeStateless * cur_node,
183                                                  CGenNodeStateless * next_node){
184
185    assert(cur_node);
186    TrexStatelessDpPerPort * lp_port = get_port_db(cur_node->m_port_id);
187    bool schedule =false;
188
189    bool to_stop_port=false;
190
191    if (next_node == NULL) {
192        /* there is no next stream , reduce the number of active streams*/
193        to_stop_port = lp_port->update_number_of_active_streams(1);
194
195    }else{
196        uint8_t state=next_node->get_state();
197
198        /* can't be FREE_RESUSE */
199        assert(state != CGenNodeStateless::ss_FREE_RESUSE);
200        if (next_node->get_state() == CGenNodeStateless::ss_INACTIVE ) {
201
202            /* refill start info and scedule, no update in active streams  */
203            next_node->refresh();
204            schedule = true;
205
206        }else{
207            to_stop_port = lp_port->update_number_of_active_streams(1);
208        }
209    }
210
211    if ( to_stop_port ) {
212        /* call stop port explictly to move the state */
213        stop_traffic(cur_node->m_port_id);
214    }
215
216    return ( schedule );
217}
218
219
220
221/**
222 * in idle state loop, the processor most of the time sleeps
223 * and periodically checks for messages
224 *
225 * @author imarom (01-Nov-15)
226 */
227void
228TrexStatelessDpCore::idle_state_loop() {
229
230    while (m_state == STATE_IDLE) {
231        periodic_check_for_cp_messages();
232        delay(200);
233    }
234}
235
236
237
238void TrexStatelessDpCore::quit_main_loop(){
239    m_core->set_terminate_mode(true); /* mark it as terminated */
240    m_state = STATE_TERMINATE;
241    add_global_duration(0.0001);
242}
243
244
245/**
246 * scehduler runs when traffic exists
247 * it will return when no more transmitting is done on this
248 * core
249 *
250 * @author imarom (01-Nov-15)
251 */
252void
253TrexStatelessDpCore::start_scheduler() {
254    /* creates a maintenace job using the scheduler */
255    CGenNode * node_sync = m_core->create_node() ;
256    node_sync->m_type = CGenNode::FLOW_SYNC;
257    node_sync->m_time = m_core->m_cur_time_sec + SYNC_TIME_OUT;
258    m_core->m_node_gen.add_node(node_sync);
259
260    double old_offset = 0.0;
261    m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset);
262    /* TBD do we need that ? */
263    m_core->m_node_gen.close_file(m_core);
264}
265
266
267void
268TrexStatelessDpCore::run_once(){
269
270    idle_state_loop();
271
272    if ( m_state == STATE_TERMINATE ){
273        return;
274    }
275
276    start_scheduler();
277}
278
279
280void
281TrexStatelessDpCore::start() {
282
283    while (true) {
284        run_once();
285
286        if ( m_core->is_terminated_by_master() ) {
287            break;
288        }
289    }
290}
291
292/* only if both port are idle we can exit */
293void
294TrexStatelessDpCore::schedule_exit(){
295
296    CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
297
298    node->m_type = CGenNode::COMMAND;
299
300    node->m_cmd = new TrexStatelessDpCanQuit();
301
302    /* make sure it will be scheduled after the current node */
303    node->m_time = m_core->m_cur_time_sec ;
304
305    m_core->m_node_gen.add_node((CGenNode *)node);
306}
307
308
309void
310TrexStatelessDpCore::add_global_duration(double duration){
311    if (duration > 0.0) {
312        CGenNode *node = m_core->create_node() ;
313
314        node->m_type = CGenNode::EXIT_SCHED;
315
316        /* make sure it will be scheduled after the current node */
317        node->m_time = m_core->m_cur_time_sec + duration ;
318
319        m_core->m_node_gen.add_node(node);
320    }
321}
322
323/* add per port exit */
324void
325TrexStatelessDpCore::add_port_duration(double duration,
326                                  uint8_t port_id){
327    if (duration > 0.0) {
328        CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
329
330        node->m_type = CGenNode::COMMAND;
331
332        /* make sure it will be scheduled after the current node */
333        node->m_time = m_core->m_cur_time_sec + duration ;
334
335        node->m_cmd = new TrexStatelessDpStop(port_id);
336
337        m_core->m_node_gen.add_node((CGenNode *)node);
338    }
339}
340
341
342void
343TrexStatelessDpCore::add_cont_stream(TrexStatelessDpPerPort * lp_port,
344                                     TrexStream * stream,
345                                     TrexStreamsCompiledObj *comp) {
346
347    CGenNodeStateless *node = m_core->create_node_sl();
348
349    /* add periodic */
350    node->m_type = CGenNode::STATELESS_PKT;
351
352    node->m_ref_stream_info  =   stream->clone_as_dp();
353
354    node->m_next_stream=0; /* will be fixed later */
355
356
357    if ( stream->m_self_start ){
358        /* if self start it is in active mode */
359        node->m_state =CGenNodeStateless::ss_ACTIVE;
360        lp_port->m_active_streams++;
361    }else{
362        node->m_state =CGenNodeStateless::ss_INACTIVE;
363    }
364
365    node->m_time = m_core->m_cur_time_sec + usec_to_sec(stream->m_isg_usec);
366
367    pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(stream->m_port_id);
368    node->m_flags = 0;
369
370    /* set socket id */
371    node->set_socket_id(m_core->m_node_gen.m_socket_id);
372
373    /* build a mbuf from a packet */
374
375    uint16_t pkt_size = stream->m_pkt.len;
376    const uint8_t *stream_pkt = stream->m_pkt.binary;
377
378    node->m_stream_type = stream->m_type;
379    node->m_next_time_offset =  1.0 / (stream->get_pps() * comp->get_multiplier()) ;
380
381
382    /* stateless specific fields */
383    switch ( stream->m_type ) {
384
385    case TrexStream::stCONTINUOUS :
386        node->m_single_burst=0;
387        node->m_single_burst_refill=0;
388        node->m_multi_bursts=0;
389        node->m_ibg_sec                 = 0.0;
390        break;
391
392    case TrexStream::stSINGLE_BURST :
393        node->m_stream_type             = TrexStream::stMULTI_BURST;
394        node->m_single_burst            = stream->m_burst_total_pkts;
395        node->m_single_burst_refill     = stream->m_burst_total_pkts;
396        node->m_multi_bursts            = 1;  /* single burst in multi burst of 1 */
397        node->m_ibg_sec                 = 0.0;
398        break;
399
400    case TrexStream::stMULTI_BURST :
401        node->m_single_burst        = stream->m_burst_total_pkts;
402        node->m_single_burst_refill = stream->m_burst_total_pkts;
403        node->m_multi_bursts        = stream->m_num_bursts;
404        node->m_ibg_sec             = usec_to_sec( stream->m_ibg_usec );
405        break;
406    default:
407
408        assert(0);
409    };
410
411    node->m_port_id = stream->m_port_id;
412
413    /* allocate const mbuf */
414    rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size);
415    assert(m);
416
417    char *p = rte_pktmbuf_append(m, pkt_size);
418    assert(p);
419    /* copy the packet */
420    memcpy(p,stream_pkt,pkt_size);
421
422    /* set dir 0 or 1 client or server */
423    node->set_mbuf_cache_dir(dir);
424
425    /* TBD repace the mac if req we should add flag  */
426    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, m);
427
428    /* set the packet as a readonly */
429    node->set_cache_mbuf(m);
430
431    CDpOneStream one_stream;
432
433    one_stream.m_dp_stream = node->m_ref_stream_info;
434    one_stream.m_node =node;
435
436    lp_port->m_active_nodes.push_back(one_stream);
437
438    /* schedule only if active */
439    if (node->m_state == CGenNodeStateless::ss_ACTIVE) {
440        m_core->m_node_gen.add_node((CGenNode *)node);
441    }
442}
443
444void
445TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj,
446                                   double duration) {
447
448#if 0
449    /* TBD to remove ! */
450    obj->Dump(stdout);
451#endif
452
453    TrexStatelessDpPerPort * lp_port=get_port_db(obj->get_port_id());
454    lp_port->m_active_streams = 0;
455    /* no nodes in the list */
456    assert(lp_port->m_active_nodes.size()==0);
457
458    for (auto single_stream : obj->get_objects()) {
459        /* all commands should be for the same port */
460        assert(obj->get_port_id() == single_stream.m_stream->m_port_id);
461        add_cont_stream(lp_port,single_stream.m_stream,obj);
462    }
463
464    uint32_t nodes = lp_port->m_active_nodes.size();
465    /* find next stream */
466    assert(nodes == obj->get_objects().size());
467
468    int cnt=0;
469
470    /* set the next_stream pointer  */
471    for (auto single_stream : obj->get_objects()) {
472
473        if (single_stream.m_stream->is_dp_next_stream() ) {
474            int stream_id = single_stream.m_stream->m_next_stream_id;
475            assert(stream_id<nodes);
476            /* point to the next stream , stream_id is fixed */
477            lp_port->m_active_nodes[cnt].m_node->m_next_stream = lp_port->m_active_nodes[stream_id].m_node ;
478        }
479        cnt++;
480    }
481
482    lp_port->m_state =TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
483    m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
484
485
486    if ( duration > 0.0 ){
487        add_port_duration( duration ,obj->get_port_id() );
488    }
489
490}
491
492
493bool TrexStatelessDpCore::are_all_ports_idle(){
494
495    bool res=true;
496    int i;
497    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
498        if ( m_ports[i].m_state != TrexStatelessDpPerPort::ppSTATE_IDLE ){
499            res=false;
500        }
501    }
502    return (res);
503}
504
505
506void
507TrexStatelessDpCore::stop_traffic(uint8_t port_id) {
508    /* we cannot remove nodes not from the top of the queue so
509       for every active node - make sure next time
510       the scheduler invokes it, it will be free */
511
512    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
513
514    lp_port->stop_traffic(port_id);
515
516    if ( are_all_ports_idle() ) {
517
518        schedule_exit();
519    }
520
521    /* inform the control plane we stopped - this might be a async stop
522       (streams ended)
523     */
524    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
525    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
526                                                                   port_id,
527                                                                   TrexDpPortEvent::EVENT_STOP,
528                                                                   lp_port->get_event_id());
529    ring->Enqueue((CGenNode *)event_msg);
530
531}
532
533/**
534 * handle a message from CP to DP
535 *
536 */
537void
538TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) {
539    msg->handle(this);
540    delete msg;
541}
542
543