trex_stateless_dp_core.cpp revision f6901ca1
1/*
2 Itay Marom
3 Hanoch Haim
4 Cisco Systems, Inc.
5*/
6
7/*
8Copyright (c) 2015-2015 Cisco Systems, Inc.
9
10Licensed under the Apache License, Version 2.0 (the "License");
11you may not use this file except in compliance with the License.
12You may obtain a copy of the License at
13
14    http://www.apache.org/licenses/LICENSE-2.0
15
16Unless required by applicable law or agreed to in writing, software
17distributed under the License is distributed on an "AS IS" BASIS,
18WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19See the License for the specific language governing permissions and
20limitations under the License.
21*/
22#include <trex_stateless_dp_core.h>
23#include <trex_stateless_messaging.h>
24#include <trex_streams_compiler.h>
25#include <trex_stream_node.h>
26#include <trex_stream.h>
27
28#include <bp_sim.h>
29
30
31void CDpOneStream::Delete(CFlowGenListPerThread   * core){
32    assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE);
33    core->free_node((CGenNode *)m_node);
34    delete m_dp_stream;
35    m_node=0;
36    m_dp_stream=0;
37}
38
39void CDpOneStream::DeleteOnlyStream(){
40    assert(m_dp_stream);
41    delete m_dp_stream;
42    m_dp_stream=0;
43}
44
45int CGenNodeStateless::get_stream_id(){
46    if (m_state ==CGenNodeStateless::ss_FREE_RESUSE) {
47        return (-1); // not valid
48    }
49    assert(m_ref_stream_info);
50    return ((int)m_ref_stream_info->m_stream_id);
51}
52
53
54void CGenNodeStateless::DumpHeader(FILE *fd){
55    fprintf(fd," pkt_id, time, port , action , state, stream_id , stype , m-burst# , burst# \n");
56
57}
58void CGenNodeStateless::Dump(FILE *fd){
59    fprintf(fd," %2.4f, %3lu, %s,%s, %3d, %s, %3lu, %3lu  \n",
60            m_time,
61            (ulong)m_port_id,
62            "s-pkt", //action
63            get_stream_state_str(m_state ).c_str(),
64            get_stream_id(),   //stream_id
65            TrexStream::get_stream_type_str(m_stream_type).c_str(), //stype
66            (ulong)m_multi_bursts,
67            (ulong)m_single_burst
68            );
69}
70
71
72
73void CGenNodeStateless::refresh_vm_bss(){
74    if ( m_vm_flow_var ) {
75        StreamVmDp  * vm_s=m_ref_stream_info->m_vm_dp;
76        assert(vm_s);
77        memcpy(m_vm_flow_var,vm_s->get_bss(),vm_s->get_bss_size());
78    }
79}
80
81
82/**
83 * this function called when stream restart after it was inactive
84 */
85void CGenNodeStateless::refresh(){
86
87    /* refill the stream info */
88    m_single_burst    = m_single_burst_refill;
89    m_multi_bursts    = m_ref_stream_info->m_num_bursts;
90    m_state           = CGenNodeStateless::ss_ACTIVE;
91
92    /* refresh init value */
93#if 0
94    /* TBD should add a JSON varible for that */
95    refresh_vm_bss();
96#endif
97}
98
99
100void CGenNodeCommand::free_command(){
101
102    assert(m_cmd);
103    m_cmd->on_node_remove();
104    delete m_cmd;
105}
106
107
108std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state){
109    std::string res;
110
111    switch (stream_state) {
112    case CGenNodeStateless::ss_FREE_RESUSE :
113         res="FREE    ";
114        break;
115    case CGenNodeStateless::ss_INACTIVE :
116        res="INACTIVE ";
117        break;
118    case CGenNodeStateless::ss_ACTIVE :
119        res="ACTIVE   ";
120        break;
121    default:
122        res="Unknow   ";
123    };
124    return(res);
125}
126
127
128rte_mbuf_t   * CGenNodeStateless::alloc_node_with_vm(){
129
130    rte_mbuf_t        * m;
131    /* alloc small packet buffer*/
132    uint16_t prefix_size = prefix_header_size();
133    m = CGlobalInfo::pktmbuf_alloc( get_socket_id(), prefix_size );
134    if (m==0) {
135        return (m);
136    }
137    /* TBD remove this, should handle cases of error */
138    assert(m);
139    char *p=rte_pktmbuf_append(m, prefix_size);
140    memcpy( p ,m_original_packet_data_prefix, prefix_size);
141
142
143    /* run the VM program */
144    StreamDPVmInstructionsRunner runner;
145
146    runner.run( (uint32_t*)m_vm_flow_var,
147                m_vm_program_size,
148                m_vm_program,
149                m_vm_flow_var,
150                (uint8_t*)p);
151
152    uint16_t pkt_new_size=runner.get_new_pkt_size();
153    if ( likely( pkt_new_size == 0) ) {
154        /* no packet size change */
155        rte_mbuf_t * m_const = get_const_mbuf();
156        if (  m_const != NULL) {
157            utl_rte_pktmbuf_add_after(m,m_const);
158        }
159        return (m);
160    }
161
162    /* packet size change there are a few changes */
163    rte_mbuf_t * m_const = get_const_mbuf();
164    if ( (m_const == 0 ) || (pkt_new_size<=prefix_size) ) {
165        /* one mbuf , just trim it */
166        m->data_len = pkt_new_size;
167        m->pkt_len  = pkt_new_size;
168        return (m);
169    }
170
171    rte_mbuf_t * mi= CGlobalInfo::pktmbuf_alloc_small(get_socket_id());
172    assert(mi);
173    rte_pktmbuf_attach(mi,m_const);
174    utl_rte_pktmbuf_add_after2(m,mi);
175
176    if ( pkt_new_size < m->pkt_len) {
177        /* need to trim it */
178        mi->data_len = (pkt_new_size - prefix_size);
179        m->pkt_len   = pkt_new_size;
180    }
181    return (m);
182}
183
184
185void CGenNodeStateless::free_stl_node(){
186    /* if we have cache mbuf free it */
187    rte_mbuf_t * m=get_cache_mbuf();
188    if (m) {
189        rte_pktmbuf_free(m);
190        m_cache_mbuf=0;
191    }else{
192        /* non cache - must have an header */
193         m=get_const_mbuf();
194         if (m) {
195             rte_pktmbuf_free(m); /* reduce the ref counter */
196         }
197         free_prefix_header();
198    }
199    if (m_vm_flow_var) {
200        /* free flow var */
201        free(m_vm_flow_var);
202        m_vm_flow_var=0;
203    }
204}
205
206
207bool TrexStatelessDpPerPort::update_number_of_active_streams(uint32_t d){
208    m_active_streams-=d; /* reduce the number of streams */
209    if (m_active_streams == 0) {
210        return (true);
211    }
212    return (false);
213}
214
215bool TrexStatelessDpPerPort::resume_traffic(uint8_t port_id){
216
217    /* we are working with continues streams so we must be in transmit mode */
218    assert(m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE);
219
220    for (auto dp_stream : m_active_nodes) {
221        CGenNodeStateless * node =dp_stream.m_node;
222        assert(node->get_port_id() == port_id);
223        assert(node->is_pause() == true);
224        node->set_pause(false);
225    }
226    m_state = TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
227    return (true);
228}
229
230bool TrexStatelessDpPerPort::update_traffic(uint8_t port_id, double factor) {
231
232    assert( (m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING ||
233            (m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE)) );
234
235    for (auto dp_stream : m_active_nodes) {
236        CGenNodeStateless * node = dp_stream.m_node;
237        assert(node->get_port_id() == port_id);
238
239        node->update_rate(factor);
240    }
241
242    return (true);
243}
244
245bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){
246
247    /* we are working with continues streams so we must be in transmit mode */
248    assert(m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING);
249
250    for (auto dp_stream : m_active_nodes) {
251        CGenNodeStateless * node =dp_stream.m_node;
252        assert(node->get_port_id() == port_id);
253        assert(node->is_pause() == false);
254        node->set_pause(true);
255    }
256    m_state = TrexStatelessDpPerPort::ppSTATE_PAUSE;
257    return (true);
258}
259
260
261bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id,
262                                          bool stop_on_id,
263                                          int event_id){
264
265
266    if (m_state == TrexStatelessDpPerPort::ppSTATE_IDLE) {
267        assert(m_active_streams==0);
268        return false;
269    }
270
271    /* there could be race of stop after stop */
272    if ( stop_on_id ) {
273        if (event_id != m_event_id){
274            /* we can't stop it is an old message */
275            return false;
276        }
277    }
278
279    for (auto dp_stream : m_active_nodes) {
280        CGenNodeStateless * node =dp_stream.m_node;
281        assert(node->get_port_id() == port_id);
282        if ( node->get_state() == CGenNodeStateless::ss_ACTIVE) {
283            node->mark_for_free();
284            m_active_streams--;
285            dp_stream.DeleteOnlyStream();
286
287        }else{
288            dp_stream.Delete(m_core);
289        }
290    }
291
292    /* active stream should be zero */
293    assert(m_active_streams==0);
294    m_active_nodes.clear();
295    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
296    return (true);
297}
298
299
300void TrexStatelessDpPerPort::create(CFlowGenListPerThread   *  core){
301    m_core=core;
302    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
303    m_port_id=0;
304    m_active_streams=0;
305    m_active_nodes.clear();
306}
307
308
309
310void
311TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) {
312    m_thread_id = thread_id;
313    m_core = core;
314    m_local_port_offset = 2*core->getDualPortId();
315
316    CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
317
318    m_ring_from_cp = cp_dp->getRingCpToDp(thread_id);
319    m_ring_to_cp   = cp_dp->getRingDpToCp(thread_id);
320
321    m_state = STATE_IDLE;
322
323    int i;
324    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
325        m_ports[i].create(core);
326    }
327}
328
329
330/* move to the next stream, old stream move to INACTIVE */
331bool TrexStatelessDpCore::set_stateless_next_node(CGenNodeStateless * cur_node,
332                                                  CGenNodeStateless * next_node){
333
334    assert(cur_node);
335    TrexStatelessDpPerPort * lp_port = get_port_db(cur_node->m_port_id);
336    bool schedule =false;
337
338    bool to_stop_port=false;
339
340    if (next_node == NULL) {
341        /* there is no next stream , reduce the number of active streams*/
342        to_stop_port = lp_port->update_number_of_active_streams(1);
343
344    }else{
345        uint8_t state=next_node->get_state();
346
347        /* can't be FREE_RESUSE */
348        assert(state != CGenNodeStateless::ss_FREE_RESUSE);
349        if (state == CGenNodeStateless::ss_INACTIVE ) {
350
351            /* refill start info and scedule, no update in active streams  */
352            next_node->refresh();
353            schedule = true;
354
355        }else{
356            to_stop_port = lp_port->update_number_of_active_streams(1);
357        }
358    }
359
360    if ( to_stop_port ) {
361        /* call stop port explictly to move the state */
362        stop_traffic(cur_node->m_port_id,false,0);
363    }
364
365    return ( schedule );
366}
367
368
369
370/**
371 * in idle state loop, the processor most of the time sleeps
372 * and periodically checks for messages
373 *
374 * @author imarom (01-Nov-15)
375 */
376void
377TrexStatelessDpCore::idle_state_loop() {
378
379    while (m_state == STATE_IDLE) {
380        bool had_msg = periodic_check_for_cp_messages();
381        /* if no message - backoff for some time */
382        if (!had_msg) {
383            delay(200);
384        }
385    }
386}
387
388
389
390void TrexStatelessDpCore::quit_main_loop(){
391    m_core->set_terminate_mode(true); /* mark it as terminated */
392    m_state = STATE_TERMINATE;
393    add_global_duration(0.0001);
394}
395
396
397/**
398 * scehduler runs when traffic exists
399 * it will return when no more transmitting is done on this
400 * core
401 *
402 * @author imarom (01-Nov-15)
403 */
404void
405TrexStatelessDpCore::start_scheduler() {
406    /* creates a maintenace job using the scheduler */
407    CGenNode * node_sync = m_core->create_node() ;
408    node_sync->m_type = CGenNode::FLOW_SYNC;
409    node_sync->m_time = m_core->m_cur_time_sec + SYNC_TIME_OUT;
410    m_core->m_node_gen.add_node(node_sync);
411
412    double old_offset = 0.0;
413    m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset);
414    /* bail out in case of terminate */
415    if (m_state != TrexStatelessDpCore::STATE_TERMINATE) {
416        m_core->m_node_gen.close_file(m_core);
417        m_state = STATE_IDLE; /* we exit from all ports and we have nothing to do, we move to IDLE state */
418    }
419}
420
421
422void
423TrexStatelessDpCore::run_once(){
424
425    idle_state_loop();
426
427    if ( m_state == STATE_TERMINATE ){
428        return;
429    }
430
431    start_scheduler();
432}
433
434
435
436
437void
438TrexStatelessDpCore::start() {
439
440    while (true) {
441        run_once();
442
443        if ( m_core->is_terminated_by_master() ) {
444            break;
445        }
446    }
447}
448
449/* only if both port are idle we can exit */
450void
451TrexStatelessDpCore::schedule_exit(){
452
453    CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
454
455    node->m_type = CGenNode::COMMAND;
456
457    node->m_cmd = new TrexStatelessDpCanQuit();
458
459    /* make sure it will be scheduled after the current node */
460    node->m_time = m_core->m_cur_time_sec ;
461
462    m_core->m_node_gen.add_node((CGenNode *)node);
463}
464
465
466void
467TrexStatelessDpCore::add_global_duration(double duration){
468    if (duration > 0.0) {
469        CGenNode *node = m_core->create_node() ;
470
471        node->m_type = CGenNode::EXIT_SCHED;
472
473        /* make sure it will be scheduled after the current node */
474        node->m_time = m_core->m_cur_time_sec + duration ;
475
476        m_core->m_node_gen.add_node(node);
477    }
478}
479
480/* add per port exit */
481void
482TrexStatelessDpCore::add_port_duration(double duration,
483                                       uint8_t port_id,
484                                       int event_id){
485    if (duration > 0.0) {
486        CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
487
488        node->m_type = CGenNode::COMMAND;
489
490        /* make sure it will be scheduled after the current node */
491        node->m_time = m_core->m_cur_time_sec + duration ;
492
493        TrexStatelessDpStop * cmd=new TrexStatelessDpStop(port_id);
494
495
496        /* test this */
497        m_core->m_non_active_nodes++;
498        cmd->set_core_ptr(m_core);
499        cmd->set_event_id(event_id);
500        cmd->set_wait_for_event_id(true);
501
502        node->m_cmd = cmd;
503
504        m_core->m_node_gen.add_node((CGenNode *)node);
505    }
506}
507
508
509void TrexStatelessDpCore::update_mac_addr(TrexStream * stream,
510                                          CGenNodeStateless *node,
511                                          pkt_dir_t dir,
512                                          char *raw_pkt){
513    bool              ov_src = stream->get_override_src_mac_by_pkt_data();
514    TrexStream::stream_dst_mac_t  ov_dst = stream->get_override_dst_mac_mode();
515
516
517    if ( (ov_src == true) && (ov_dst == TrexStream::stPKT) ) {
518        /* nothing to do, take from the packet both */
519        return;
520    }
521
522        /* take from cfg_file */
523    if ( (ov_src == false) &&
524         (ov_dst == TrexStream::stCFG_FILE) ){
525
526          m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*)raw_pkt);
527          return;
528    }
529
530    /* save the pkt*/
531    char tmp_pkt[12];
532    memcpy(tmp_pkt,raw_pkt,12);
533
534    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*)raw_pkt);
535
536    if ((ov_src == true) && (ov_dst == TrexStream::stCFG_FILE)) {
537        memcpy(raw_pkt+6,tmp_pkt+6,6);
538    }
539
540    if ((ov_src == false) && (ov_dst == TrexStream::stPKT)) {
541        memcpy(raw_pkt,tmp_pkt,6);
542    }
543}
544
545
546void
547TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port,
548                                TrexStream * stream,
549                                TrexStreamsCompiledObj *comp) {
550
551    CGenNodeStateless *node = m_core->create_node_sl();
552
553    /* add periodic */
554    node->m_cache_mbuf=0;
555    node->m_type = CGenNode::STATELESS_PKT;
556
557    /* clone the stream from control plane memory to DP memory */
558    node->m_ref_stream_info = stream->clone();
559    /* no need for this memory anymore on the control plane memory */
560    stream->release_dp_object();
561
562    node->m_next_stream=0; /* will be fixed later */
563
564
565    if ( stream->m_self_start ){
566        /* if self start it is in active mode */
567        node->m_state =CGenNodeStateless::ss_ACTIVE;
568        lp_port->m_active_streams++;
569    }else{
570        node->m_state =CGenNodeStateless::ss_INACTIVE;
571    }
572
573    node->m_time = m_core->m_cur_time_sec + usec_to_sec(stream->m_isg_usec);
574
575    pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(stream->m_port_id);
576    node->m_flags = 0;
577    node->m_src_port =0;
578    node->m_original_packet_data_prefix = 0;
579
580
581
582    /* set socket id */
583    node->set_socket_id(m_core->m_node_gen.m_socket_id);
584
585    /* build a mbuf from a packet */
586
587    uint16_t pkt_size = stream->m_pkt.len;
588    const uint8_t *stream_pkt = stream->m_pkt.binary;
589
590    node->m_pause =0;
591    node->m_stream_type = stream->m_type;
592    node->m_next_time_offset = 1.0 / stream->get_pps();
593
594    /* stateless specific fields */
595    switch ( stream->m_type ) {
596
597    case TrexStream::stCONTINUOUS :
598        node->m_single_burst=0;
599        node->m_single_burst_refill=0;
600        node->m_multi_bursts=0;
601        break;
602
603    case TrexStream::stSINGLE_BURST :
604        node->m_stream_type             = TrexStream::stMULTI_BURST;
605        node->m_single_burst            = stream->m_burst_total_pkts;
606        node->m_single_burst_refill     = stream->m_burst_total_pkts;
607        node->m_multi_bursts            = 1;  /* single burst in multi burst of 1 */
608        break;
609
610    case TrexStream::stMULTI_BURST :
611        node->m_single_burst        = stream->m_burst_total_pkts;
612        node->m_single_burst_refill = stream->m_burst_total_pkts;
613        node->m_multi_bursts        = stream->m_num_bursts;
614        break;
615    default:
616
617        assert(0);
618    };
619
620    node->m_port_id = stream->m_port_id;
621
622    /* set dir 0 or 1 client or server */
623    node->set_mbuf_cache_dir(dir);
624
625
626    if (node->m_ref_stream_info->getDpVm() == NULL) {
627        /* no VM */
628
629        node->m_vm_flow_var =  NULL;
630        node->m_vm_program  =  NULL;
631        node->m_vm_program_size =0;
632
633                /* allocate const mbuf */
634        rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size);
635        assert(m);
636
637        char *p = rte_pktmbuf_append(m, pkt_size);
638        assert(p);
639        /* copy the packet */
640        memcpy(p,stream_pkt,pkt_size);
641
642        update_mac_addr(stream,node,dir,p);
643
644        /* set the packet as a readonly */
645        node->set_cache_mbuf(m);
646
647        node->m_original_packet_data_prefix =0;
648    }else{
649
650        /* set the program */
651        TrexStream * local_mem_stream = node->m_ref_stream_info;
652
653        StreamVmDp  * lpDpVm = local_mem_stream->getDpVm();
654
655        node->m_vm_flow_var      = lpDpVm->clone_bss(); /* clone the flow var */
656        node->m_vm_program       = lpDpVm->get_program(); /* same ref to the program */
657        node->m_vm_program_size  = lpDpVm->get_program_size();
658
659
660        /* we need to copy the object */
661        if ( pkt_size > lpDpVm->get_prefix_size() ) {
662            /* we need const packet */
663            uint16_t const_pkt_size  = pkt_size - lpDpVm->get_prefix_size() ;
664            rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), const_pkt_size );
665            assert(m);
666
667            char *p = rte_pktmbuf_append(m, const_pkt_size);
668            assert(p);
669
670            /* copy packet data */
671            memcpy(p,(stream_pkt + lpDpVm->get_prefix_size()),const_pkt_size);
672
673            node->set_const_mbuf(m);
674        }
675
676
677        if ( lpDpVm->is_pkt_size_var() ) {
678            // mark the node as varible size
679            node->set_var_pkt_size();
680        }
681
682
683        if (lpDpVm->get_prefix_size() > pkt_size ) {
684            lpDpVm->set_prefix_size(pkt_size);
685        }
686
687        /* copy the headr */
688        uint16_t header_size = lpDpVm->get_prefix_size();
689        assert(header_size);
690        node->alloc_prefix_header(header_size);
691        uint8_t *p=node->m_original_packet_data_prefix;
692        assert(p);
693
694        memcpy(p,stream_pkt , header_size);
695
696        update_mac_addr(stream,node,dir,(char *)p);
697    }
698
699
700    CDpOneStream one_stream;
701
702    one_stream.m_dp_stream = node->m_ref_stream_info;
703    one_stream.m_node =node;
704
705    lp_port->m_active_nodes.push_back(one_stream);
706
707    /* schedule only if active */
708    if (node->m_state == CGenNodeStateless::ss_ACTIVE) {
709        m_core->m_node_gen.add_node((CGenNode *)node);
710    }
711}
712
713void
714TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj,
715                                   double duration,
716                                   int event_id) {
717
718
719    TrexStatelessDpPerPort * lp_port=get_port_db(obj->get_port_id());
720    lp_port->m_active_streams = 0;
721    lp_port->set_event_id(event_id);
722
723    /* no nodes in the list */
724    assert(lp_port->m_active_nodes.size()==0);
725
726    for (auto single_stream : obj->get_objects()) {
727        /* all commands should be for the same port */
728        assert(obj->get_port_id() == single_stream.m_stream->m_port_id);
729        add_stream(lp_port,single_stream.m_stream,obj);
730    }
731
732    uint32_t nodes = lp_port->m_active_nodes.size();
733    /* find next stream */
734    assert(nodes == obj->get_objects().size());
735
736    int cnt=0;
737
738    /* set the next_stream pointer  */
739    for (auto single_stream : obj->get_objects()) {
740
741        if (single_stream.m_stream->is_dp_next_stream() ) {
742            int stream_id = single_stream.m_stream->m_next_stream_id;
743            assert(stream_id<nodes);
744            /* point to the next stream , stream_id is fixed */
745            lp_port->m_active_nodes[cnt].m_node->m_next_stream = lp_port->m_active_nodes[stream_id].m_node ;
746        }
747        cnt++;
748    }
749
750    lp_port->m_state =TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
751    m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
752
753
754    if ( duration > 0.0 ){
755        add_port_duration( duration ,obj->get_port_id(),event_id );
756    }
757
758}
759
760
761bool TrexStatelessDpCore::are_all_ports_idle(){
762
763    bool res=true;
764    int i;
765    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
766        if ( m_ports[i].m_state != TrexStatelessDpPerPort::ppSTATE_IDLE ){
767            res=false;
768        }
769    }
770    return (res);
771}
772
773
774void
775TrexStatelessDpCore::resume_traffic(uint8_t port_id){
776
777    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
778
779    lp_port->resume_traffic(port_id);
780}
781
782
783void
784TrexStatelessDpCore::pause_traffic(uint8_t port_id){
785
786    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
787
788    lp_port->pause_traffic(port_id);
789}
790
791void
792TrexStatelessDpCore::update_traffic(uint8_t port_id, double factor) {
793
794    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
795
796    lp_port->update_traffic(port_id, factor);
797}
798
799
800void
801TrexStatelessDpCore::stop_traffic(uint8_t port_id,
802                                  bool stop_on_id,
803                                  int event_id) {
804    /* we cannot remove nodes not from the top of the queue so
805       for every active node - make sure next time
806       the scheduler invokes it, it will be free */
807
808    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
809
810    if ( lp_port->stop_traffic(port_id,stop_on_id,event_id) == false){
811        /* nothing to do ! already stopped */
812        //printf(" skip .. %f\n",m_core->m_cur_time_sec);
813        return;
814    }
815
816#if 0
817    if ( are_all_ports_idle() ) {
818        /* just a place holder if we will need to do somthing in that case */
819    }
820#endif
821
822    /* inform the control plane we stopped - this might be a async stop
823       (streams ended)
824     */
825    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
826    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
827                                                                   port_id,
828                                                                   TrexDpPortEvent::EVENT_STOP,
829                                                                   lp_port->get_event_id());
830    ring->Enqueue((CGenNode *)event_msg);
831
832}
833
834/**
835 * handle a message from CP to DP
836 *
837 */
838void
839TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) {
840    msg->handle(this);
841    delete msg;
842}
843
844