trex_stateless_dp_core.cpp revision a1364603
1/*
2 Itay Marom
3 Hanoch Haim
4 Cisco Systems, Inc.
5*/
6
7/*
8Copyright (c) 2015-2015 Cisco Systems, Inc.
9
10Licensed under the Apache License, Version 2.0 (the "License");
11you may not use this file except in compliance with the License.
12You may obtain a copy of the License at
13
14    http://www.apache.org/licenses/LICENSE-2.0
15
16Unless required by applicable law or agreed to in writing, software
17distributed under the License is distributed on an "AS IS" BASIS,
18WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19See the License for the specific language governing permissions and
20limitations under the License.
21*/
22#include <trex_stateless_dp_core.h>
23#include <trex_stateless_messaging.h>
24#include <trex_streams_compiler.h>
25#include <trex_stream_node.h>
26#include <trex_stream.h>
27
28#include <bp_sim.h>
29
30
31void CDpOneStream::Delete(CFlowGenListPerThread   * core){
32    assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE);
33    core->free_node((CGenNode *)m_node);
34    delete m_dp_stream;
35    m_node=0;
36    m_dp_stream=0;
37}
38
39void CDpOneStream::DeleteOnlyStream(){
40    assert(m_dp_stream);
41    delete m_dp_stream;
42    m_dp_stream=0;
43}
44
45int CGenNodeStateless::get_stream_id(){
46    if (m_state ==CGenNodeStateless::ss_FREE_RESUSE) {
47        return (-1); // not valid
48    }
49    assert(m_ref_stream_info);
50    return ((int)m_ref_stream_info->m_stream_id);
51}
52
53
54void CGenNodeStateless::DumpHeader(FILE *fd){
55    fprintf(fd," pkt_id, time, port , action , state, stream_id , stype , m-burst# , burst# \n");
56
57}
58void CGenNodeStateless::Dump(FILE *fd){
59    fprintf(fd," %2.4f, %3lu, %s,%s, %3d, %s, %3lu, %3lu  \n",
60            m_time,
61            (ulong)m_port_id,
62            "s-pkt", //action
63            get_stream_state_str(m_state ).c_str(),
64            get_stream_id(),   //stream_id
65            TrexStream::get_stream_type_str(m_stream_type).c_str(), //stype
66            (ulong)m_multi_bursts,
67            (ulong)m_single_burst
68            );
69}
70
71
72
73void CGenNodeStateless::refresh_vm_bss(){
74    if ( m_vm_flow_var ) {
75        StreamVmDp  * vm_s=m_ref_stream_info->m_vm_dp;
76        assert(vm_s);
77        memcpy(m_vm_flow_var,vm_s->get_bss(),vm_s->get_bss_size());
78    }
79}
80
81
82/**
83 * this function called when stream restart after it was inactive
84 */
85void CGenNodeStateless::refresh(){
86
87    /* refill the stream info */
88    m_single_burst    = m_single_burst_refill;
89    m_multi_bursts    = m_ref_stream_info->m_num_bursts;
90    m_state           = CGenNodeStateless::ss_ACTIVE;
91
92    /* refresh init value */
93#if 0
94    /* TBD should add a JSON varible for that */
95    refresh_vm_bss();
96#endif
97}
98
99
100void CGenNodeCommand::free_command(){
101
102    assert(m_cmd);
103    m_cmd->on_node_remove();
104    delete m_cmd;
105}
106
107
108std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state){
109    std::string res;
110
111    switch (stream_state) {
112    case CGenNodeStateless::ss_FREE_RESUSE :
113         res="FREE    ";
114        break;
115    case CGenNodeStateless::ss_INACTIVE :
116        res="INACTIVE ";
117        break;
118    case CGenNodeStateless::ss_ACTIVE :
119        res="ACTIVE   ";
120        break;
121    default:
122        res="Unknow   ";
123    };
124    return(res);
125}
126
127
128rte_mbuf_t   * CGenNodeStateless::alloc_node_with_vm(){
129
130    rte_mbuf_t        * m;
131    /* alloc small packet buffer*/
132    uint16_t prefix_size = prefix_header_size();
133    m = CGlobalInfo::pktmbuf_alloc( get_socket_id(), prefix_size );
134    if (m==0) {
135        return (m);
136    }
137    /* TBD remove this, should handle cases of error */
138    assert(m);
139    char *p=rte_pktmbuf_append(m, prefix_size);
140    memcpy( p ,m_original_packet_data_prefix, prefix_size);
141
142
143    /* run the VM program */
144    StreamDPVmInstructionsRunner runner;
145
146    runner.run( (uint32_t*)m_vm_flow_var,
147                m_vm_program_size,
148                m_vm_program,
149                m_vm_flow_var,
150                (uint8_t*)p);
151
152    uint16_t pkt_new_size=runner.get_new_pkt_size();
153    if ( likely( pkt_new_size == 0) ) {
154        /* no packet size change */
155        rte_mbuf_t * m_const = get_const_mbuf();
156        if (  m_const != NULL) {
157            utl_rte_pktmbuf_add_after(m,m_const);
158        }
159        return (m);
160    }
161
162    /* packet size change there are a few changes */
163    rte_mbuf_t * m_const = get_const_mbuf();
164    if ( (m_const == 0 ) || (pkt_new_size<=prefix_size) ) {
165        /* one mbuf , just trim it */
166        m->data_len = pkt_new_size;
167        m->pkt_len  = pkt_new_size;
168        return (m);
169    }
170
171    rte_mbuf_t * mi= CGlobalInfo::pktmbuf_alloc_small(get_socket_id());
172    assert(mi);
173    rte_pktmbuf_attach(mi,m_const);
174    utl_rte_pktmbuf_add_after(m,mi);
175
176    if ( pkt_new_size < m->pkt_len) {
177        /* need to trim it */
178        mi->data_len = (pkt_new_size - prefix_size);
179        m->pkt_len   = pkt_new_size;
180    }
181    return (m);
182}
183
184
185void CGenNodeStateless::free_stl_node(){
186    /* if we have cache mbuf free it */
187    rte_mbuf_t * m=get_cache_mbuf();
188    if (m) {
189        rte_pktmbuf_free(m);
190        m_cache_mbuf=0;
191    }else{
192        /* non cache - must have an header */
193         m=get_const_mbuf();
194         if (m) {
195             rte_pktmbuf_free(m); /* reduce the ref counter */
196         }
197         free_prefix_header();
198    }
199    if (m_vm_flow_var) {
200        /* free flow var */
201        free(m_vm_flow_var);
202        m_vm_flow_var=0;
203    }
204}
205
206
207bool TrexStatelessDpPerPort::update_number_of_active_streams(uint32_t d){
208    m_active_streams-=d; /* reduce the number of streams */
209    if (m_active_streams == 0) {
210        return (true);
211    }
212    return (false);
213}
214
215bool TrexStatelessDpPerPort::resume_traffic(uint8_t port_id){
216
217    /* we are working with continues streams so we must be in transmit mode */
218    assert(m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE);
219
220    for (auto dp_stream : m_active_nodes) {
221        CGenNodeStateless * node =dp_stream.m_node;
222        assert(node->get_port_id() == port_id);
223        assert(node->is_pause() == true);
224        node->set_pause(false);
225    }
226    m_state = TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
227    return (true);
228}
229
230bool TrexStatelessDpPerPort::update_traffic(uint8_t port_id, double factor) {
231
232    assert( (m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING ||
233            (m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE)) );
234
235    for (auto dp_stream : m_active_nodes) {
236        CGenNodeStateless * node = dp_stream.m_node;
237        assert(node->get_port_id() == port_id);
238
239        node->update_rate(factor);
240    }
241
242    return (true);
243}
244
245bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){
246
247    /* we are working with continues streams so we must be in transmit mode */
248    assert(m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING);
249
250    for (auto dp_stream : m_active_nodes) {
251        CGenNodeStateless * node =dp_stream.m_node;
252        assert(node->get_port_id() == port_id);
253        assert(node->is_pause() == false);
254        node->set_pause(true);
255    }
256    m_state = TrexStatelessDpPerPort::ppSTATE_PAUSE;
257    return (true);
258}
259
260
261bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id,
262                                          bool stop_on_id,
263                                          int event_id){
264
265
266    if (m_state == TrexStatelessDpPerPort::ppSTATE_IDLE) {
267        assert(m_active_streams==0);
268        return false;
269    }
270
271    /* there could be race of stop after stop */
272    if ( stop_on_id ) {
273        if (event_id != m_event_id){
274            /* we can't stop it is an old message */
275            return false;
276        }
277    }
278
279    for (auto dp_stream : m_active_nodes) {
280        CGenNodeStateless * node =dp_stream.m_node;
281        assert(node->get_port_id() == port_id);
282        if ( node->get_state() == CGenNodeStateless::ss_ACTIVE) {
283            node->mark_for_free();
284            m_active_streams--;
285            dp_stream.DeleteOnlyStream();
286
287        }else{
288            dp_stream.Delete(m_core);
289        }
290    }
291
292    /* active stream should be zero */
293    assert(m_active_streams==0);
294    m_active_nodes.clear();
295    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
296    return (true);
297}
298
299
300void TrexStatelessDpPerPort::create(CFlowGenListPerThread   *  core){
301    m_core=core;
302    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
303    m_port_id=0;
304    m_active_streams=0;
305    m_active_nodes.clear();
306}
307
308
309
310void
311TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) {
312    m_thread_id = thread_id;
313    m_core = core;
314    m_local_port_offset = 2*core->getDualPortId();
315
316    CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
317
318    m_ring_from_cp = cp_dp->getRingCpToDp(thread_id);
319    m_ring_to_cp   = cp_dp->getRingDpToCp(thread_id);
320
321    m_state = STATE_IDLE;
322
323    int i;
324    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
325        m_ports[i].create(core);
326    }
327}
328
329
330/* move to the next stream, old stream move to INACTIVE */
331bool TrexStatelessDpCore::set_stateless_next_node(CGenNodeStateless * cur_node,
332                                                  CGenNodeStateless * next_node){
333
334    assert(cur_node);
335    TrexStatelessDpPerPort * lp_port = get_port_db(cur_node->m_port_id);
336    bool schedule =false;
337
338    bool to_stop_port=false;
339
340    if (next_node == NULL) {
341        /* there is no next stream , reduce the number of active streams*/
342        to_stop_port = lp_port->update_number_of_active_streams(1);
343
344    }else{
345        uint8_t state=next_node->get_state();
346
347        /* can't be FREE_RESUSE */
348        assert(state != CGenNodeStateless::ss_FREE_RESUSE);
349        if (state == CGenNodeStateless::ss_INACTIVE ) {
350
351            /* refill start info and scedule, no update in active streams  */
352            next_node->refresh();
353            schedule = true;
354
355        }else{
356            to_stop_port = lp_port->update_number_of_active_streams(1);
357        }
358    }
359
360    if ( to_stop_port ) {
361        /* call stop port explictly to move the state */
362        stop_traffic(cur_node->m_port_id,false,0);
363    }
364
365    return ( schedule );
366}
367
368
369
370/**
371 * in idle state loop, the processor most of the time sleeps
372 * and periodically checks for messages
373 *
374 * @author imarom (01-Nov-15)
375 */
376void
377TrexStatelessDpCore::idle_state_loop() {
378
379    while (m_state == STATE_IDLE) {
380        periodic_check_for_cp_messages();
381        delay(200);
382    }
383}
384
385
386
387void TrexStatelessDpCore::quit_main_loop(){
388    m_core->set_terminate_mode(true); /* mark it as terminated */
389    m_state = STATE_TERMINATE;
390    add_global_duration(0.0001);
391}
392
393
394/**
395 * scehduler runs when traffic exists
396 * it will return when no more transmitting is done on this
397 * core
398 *
399 * @author imarom (01-Nov-15)
400 */
401void
402TrexStatelessDpCore::start_scheduler() {
403    /* creates a maintenace job using the scheduler */
404    CGenNode * node_sync = m_core->create_node() ;
405    node_sync->m_type = CGenNode::FLOW_SYNC;
406    node_sync->m_time = m_core->m_cur_time_sec + SYNC_TIME_OUT;
407    m_core->m_node_gen.add_node(node_sync);
408
409    double old_offset = 0.0;
410    m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset);
411    /* bail out in case of terminate */
412    if (m_state != TrexStatelessDpCore::STATE_TERMINATE) {
413        m_core->m_node_gen.close_file(m_core);
414        m_state = STATE_IDLE; /* we exit from all ports and we have nothing to do, we move to IDLE state */
415    }
416}
417
418
419void
420TrexStatelessDpCore::run_once(){
421
422    idle_state_loop();
423
424    if ( m_state == STATE_TERMINATE ){
425        return;
426    }
427
428    start_scheduler();
429}
430
431
432
433
434void
435TrexStatelessDpCore::start() {
436
437    while (true) {
438        run_once();
439
440        if ( m_core->is_terminated_by_master() ) {
441            break;
442        }
443    }
444}
445
446/* only if both port are idle we can exit */
447void
448TrexStatelessDpCore::schedule_exit(){
449
450    CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
451
452    node->m_type = CGenNode::COMMAND;
453
454    node->m_cmd = new TrexStatelessDpCanQuit();
455
456    /* make sure it will be scheduled after the current node */
457    node->m_time = m_core->m_cur_time_sec ;
458
459    m_core->m_node_gen.add_node((CGenNode *)node);
460}
461
462
463void
464TrexStatelessDpCore::add_global_duration(double duration){
465    if (duration > 0.0) {
466        CGenNode *node = m_core->create_node() ;
467
468        node->m_type = CGenNode::EXIT_SCHED;
469
470        /* make sure it will be scheduled after the current node */
471        node->m_time = m_core->m_cur_time_sec + duration ;
472
473        m_core->m_node_gen.add_node(node);
474    }
475}
476
477/* add per port exit */
478void
479TrexStatelessDpCore::add_port_duration(double duration,
480                                       uint8_t port_id,
481                                       int event_id){
482    if (duration > 0.0) {
483        CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
484
485        node->m_type = CGenNode::COMMAND;
486
487        /* make sure it will be scheduled after the current node */
488        node->m_time = m_core->m_cur_time_sec + duration ;
489
490        TrexStatelessDpStop * cmd=new TrexStatelessDpStop(port_id);
491
492
493        /* test this */
494        m_core->m_non_active_nodes++;
495        cmd->set_core_ptr(m_core);
496        cmd->set_event_id(event_id);
497        cmd->set_wait_for_event_id(true);
498
499        node->m_cmd = cmd;
500
501        m_core->m_node_gen.add_node((CGenNode *)node);
502    }
503}
504
505
506void
507TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port,
508                                TrexStream * stream,
509                                TrexStreamsCompiledObj *comp) {
510
511    CGenNodeStateless *node = m_core->create_node_sl();
512
513    /* add periodic */
514    node->m_cache_mbuf=0;
515    node->m_type = CGenNode::STATELESS_PKT;
516
517    /* clone the stream from control plane memory to DP memory */
518    node->m_ref_stream_info = stream->clone();
519    /* no need for this memory anymore on the control plane memory */
520    stream->release_dp_object();
521
522    node->m_next_stream=0; /* will be fixed later */
523
524
525    if ( stream->m_self_start ){
526        /* if self start it is in active mode */
527        node->m_state =CGenNodeStateless::ss_ACTIVE;
528        lp_port->m_active_streams++;
529    }else{
530        node->m_state =CGenNodeStateless::ss_INACTIVE;
531    }
532
533    node->m_time = m_core->m_cur_time_sec + usec_to_sec(stream->m_isg_usec);
534
535    pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(stream->m_port_id);
536    node->m_flags = 0;
537    node->m_src_port =0;
538    node->m_original_packet_data_prefix = 0;
539
540
541
542    /* set socket id */
543    node->set_socket_id(m_core->m_node_gen.m_socket_id);
544
545    /* build a mbuf from a packet */
546
547    uint16_t pkt_size = stream->m_pkt.len;
548    const uint8_t *stream_pkt = stream->m_pkt.binary;
549
550    node->m_pause =0;
551    node->m_stream_type = stream->m_type;
552    node->m_next_time_offset = 1.0 / stream->get_pps();
553
554    /* stateless specific fields */
555    switch ( stream->m_type ) {
556
557    case TrexStream::stCONTINUOUS :
558        node->m_single_burst=0;
559        node->m_single_burst_refill=0;
560        node->m_multi_bursts=0;
561        node->m_ibg_sec                 = 0.0;
562        break;
563
564    case TrexStream::stSINGLE_BURST :
565        node->m_stream_type             = TrexStream::stMULTI_BURST;
566        node->m_single_burst            = stream->m_burst_total_pkts;
567        node->m_single_burst_refill     = stream->m_burst_total_pkts;
568        node->m_multi_bursts            = 1;  /* single burst in multi burst of 1 */
569        node->m_ibg_sec                 = 0.0;
570        break;
571
572    case TrexStream::stMULTI_BURST :
573        node->m_single_burst        = stream->m_burst_total_pkts;
574        node->m_single_burst_refill = stream->m_burst_total_pkts;
575        node->m_multi_bursts        = stream->m_num_bursts;
576        node->m_ibg_sec             = usec_to_sec( stream->m_ibg_usec );
577        break;
578    default:
579
580        assert(0);
581    };
582
583    node->m_port_id = stream->m_port_id;
584
585    /* set dir 0 or 1 client or server */
586    node->set_mbuf_cache_dir(dir);
587
588
589    if (node->m_ref_stream_info->getDpVm() == NULL) {
590        /* no VM */
591
592        node->m_vm_flow_var =  NULL;
593        node->m_vm_program  =  NULL;
594        node->m_vm_program_size =0;
595
596                /* allocate const mbuf */
597        rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size);
598        assert(m);
599
600        char *p = rte_pktmbuf_append(m, pkt_size);
601        assert(p);
602        /* copy the packet */
603        memcpy(p,stream_pkt,pkt_size);
604
605        /* TBD repace the mac if req we should add flag  */
606        m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*) p);
607
608        /* set the packet as a readonly */
609        node->set_cache_mbuf(m);
610
611        node->m_original_packet_data_prefix =0;
612    }else{
613
614        /* set the program */
615        TrexStream * local_mem_stream = node->m_ref_stream_info;
616
617        StreamVmDp  * lpDpVm = local_mem_stream->getDpVm();
618
619        node->m_vm_flow_var      = lpDpVm->clone_bss(); /* clone the flow var */
620        node->m_vm_program       = lpDpVm->get_program(); /* same ref to the program */
621        node->m_vm_program_size  = lpDpVm->get_program_size();
622
623
624        /* we need to copy the object */
625        if ( pkt_size > lpDpVm->get_prefix_size() ) {
626            /* we need const packet */
627            uint16_t const_pkt_size  = pkt_size - lpDpVm->get_prefix_size() ;
628            rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), const_pkt_size );
629            assert(m);
630
631            char *p = rte_pktmbuf_append(m, const_pkt_size);
632            assert(p);
633
634            /* copy packet data */
635            memcpy(p,(stream_pkt + lpDpVm->get_prefix_size()),const_pkt_size);
636
637            node->set_const_mbuf(m);
638        }
639
640
641        if ( lpDpVm->is_pkt_size_var() ) {
642            // mark the node as varible size
643            node->set_var_pkt_size();
644        }
645
646
647        if (lpDpVm->get_prefix_size() > pkt_size ) {
648            lpDpVm->set_prefix_size(pkt_size);
649        }
650
651        /* copy the headr */
652        uint16_t header_size = lpDpVm->get_prefix_size();
653        assert(header_size);
654        node->alloc_prefix_header(header_size);
655        uint8_t *p=node->m_original_packet_data_prefix;
656        assert(p);
657
658        memcpy(p,stream_pkt , header_size);
659        /* TBD repace the mac if req we should add flag  */
660        m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, p);
661    }
662
663
664    CDpOneStream one_stream;
665
666    one_stream.m_dp_stream = node->m_ref_stream_info;
667    one_stream.m_node =node;
668
669    lp_port->m_active_nodes.push_back(one_stream);
670
671    /* schedule only if active */
672    if (node->m_state == CGenNodeStateless::ss_ACTIVE) {
673        m_core->m_node_gen.add_node((CGenNode *)node);
674    }
675}
676
677void
678TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj,
679                                   double duration,
680                                   int event_id) {
681
682
683    TrexStatelessDpPerPort * lp_port=get_port_db(obj->get_port_id());
684    lp_port->m_active_streams = 0;
685    lp_port->set_event_id(event_id);
686
687    /* no nodes in the list */
688    assert(lp_port->m_active_nodes.size()==0);
689
690    for (auto single_stream : obj->get_objects()) {
691        /* all commands should be for the same port */
692        assert(obj->get_port_id() == single_stream.m_stream->m_port_id);
693        add_stream(lp_port,single_stream.m_stream,obj);
694    }
695
696    uint32_t nodes = lp_port->m_active_nodes.size();
697    /* find next stream */
698    assert(nodes == obj->get_objects().size());
699
700    int cnt=0;
701
702    /* set the next_stream pointer  */
703    for (auto single_stream : obj->get_objects()) {
704
705        if (single_stream.m_stream->is_dp_next_stream() ) {
706            int stream_id = single_stream.m_stream->m_next_stream_id;
707            assert(stream_id<nodes);
708            /* point to the next stream , stream_id is fixed */
709            lp_port->m_active_nodes[cnt].m_node->m_next_stream = lp_port->m_active_nodes[stream_id].m_node ;
710        }
711        cnt++;
712    }
713
714    lp_port->m_state =TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
715    m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
716
717
718    if ( duration > 0.0 ){
719        add_port_duration( duration ,obj->get_port_id(),event_id );
720    }
721
722}
723
724
725bool TrexStatelessDpCore::are_all_ports_idle(){
726
727    bool res=true;
728    int i;
729    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
730        if ( m_ports[i].m_state != TrexStatelessDpPerPort::ppSTATE_IDLE ){
731            res=false;
732        }
733    }
734    return (res);
735}
736
737
738void
739TrexStatelessDpCore::resume_traffic(uint8_t port_id){
740
741    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
742
743    lp_port->resume_traffic(port_id);
744}
745
746
747void
748TrexStatelessDpCore::pause_traffic(uint8_t port_id){
749
750    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
751
752    lp_port->pause_traffic(port_id);
753}
754
755void
756TrexStatelessDpCore::update_traffic(uint8_t port_id, double factor) {
757
758    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
759
760    lp_port->update_traffic(port_id, factor);
761}
762
763
764void
765TrexStatelessDpCore::stop_traffic(uint8_t port_id,
766                                  bool stop_on_id,
767                                  int event_id) {
768    /* we cannot remove nodes not from the top of the queue so
769       for every active node - make sure next time
770       the scheduler invokes it, it will be free */
771
772    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
773
774    if ( lp_port->stop_traffic(port_id,stop_on_id,event_id) == false){
775        /* nothing to do ! already stopped */
776        //printf(" skip .. %f\n",m_core->m_cur_time_sec);
777        return;
778    }
779
780#if 0
781    if ( are_all_ports_idle() ) {
782        /* just a place holder if we will need to do somthing in that case */
783    }
784#endif
785
786    /* inform the control plane we stopped - this might be a async stop
787       (streams ended)
788     */
789    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
790    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
791                                                                   port_id,
792                                                                   TrexDpPortEvent::EVENT_STOP,
793                                                                   lp_port->get_event_id());
794    ring->Enqueue((CGenNode *)event_msg);
795
796}
797
798/**
799 * handle a message from CP to DP
800 *
801 */
802void
803TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) {
804    msg->handle(this);
805    delete msg;
806}
807
808