trex_stateless_dp_core.cpp revision ecbb10f1
1/*
2 Itay Marom
3 Hanoch Haim
4 Cisco Systems, Inc.
5*/
6
7/*
8Copyright (c) 2015-2015 Cisco Systems, Inc.
9
10Licensed under the Apache License, Version 2.0 (the "License");
11you may not use this file except in compliance with the License.
12You may obtain a copy of the License at
13
14    http://www.apache.org/licenses/LICENSE-2.0
15
16Unless required by applicable law or agreed to in writing, software
17distributed under the License is distributed on an "AS IS" BASIS,
18WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19See the License for the specific language governing permissions and
20limitations under the License.
21*/
22#include <trex_stateless_dp_core.h>
23#include <trex_stateless_messaging.h>
24#include <trex_streams_compiler.h>
25#include <trex_stream_node.h>
26#include <trex_stream.h>
27
28#include <bp_sim.h>
29
30
31void CDpOneStream::Delete(CFlowGenListPerThread   * core){
32    assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE);
33    core->free_node((CGenNode *)m_node);
34    delete m_dp_stream;
35    m_node=0;
36    m_dp_stream=0;
37}
38
39void CDpOneStream::DeleteOnlyStream(){
40    assert(m_dp_stream);
41    delete m_dp_stream;
42    m_dp_stream=0;
43}
44
45int CGenNodeStateless::get_stream_id(){
46    if (m_state ==CGenNodeStateless::ss_FREE_RESUSE) {
47        return (-1); // not valid
48    }
49    assert(m_ref_stream_info);
50    return ((int)m_ref_stream_info->m_stream_id);
51}
52
53
54void CGenNodeStateless::DumpHeader(FILE *fd){
55    fprintf(fd," pkt_id, time, port , action , state, stream_id , stype , m-burst# , burst# \n");
56
57}
58void CGenNodeStateless::Dump(FILE *fd){
59    fprintf(fd," %2.4f, %3lu, %s,%s, %3d, %s, %3lu, %3lu  \n",
60            m_time,
61            (ulong)m_port_id,
62            "s-pkt", //action
63            get_stream_state_str(m_state ).c_str(),
64            get_stream_id(),   //stream_id
65            TrexStream::get_stream_type_str(m_stream_type).c_str(), //stype
66            (ulong)m_multi_bursts,
67            (ulong)m_single_burst
68            );
69}
70
71
72
73void CGenNodeStateless::refresh_vm_bss(){
74    if ( m_vm_flow_var ) {
75        StreamVmDp  * vm_s=m_ref_stream_info->m_vm_dp;
76        assert(vm_s);
77        memcpy(m_vm_flow_var,vm_s->get_bss(),vm_s->get_bss_size());
78    }
79}
80
81
82/**
83 * this function called when stream restart after it was inactive
84 */
85void CGenNodeStateless::refresh(){
86
87    /* refill the stream info */
88    m_single_burst    = m_single_burst_refill;
89    m_multi_bursts    = m_ref_stream_info->m_num_bursts;
90    m_state           = CGenNodeStateless::ss_ACTIVE;
91
92    /* refresh init value */
93#if 0
94    /* TBD should add a JSON varible for that */
95    refresh_vm_bss();
96#endif
97}
98
99
100void CGenNodeCommand::free_command(){
101
102    assert(m_cmd);
103    m_cmd->on_node_remove();
104    delete m_cmd;
105}
106
107
108std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state){
109    std::string res;
110
111    switch (stream_state) {
112    case CGenNodeStateless::ss_FREE_RESUSE :
113         res="FREE    ";
114        break;
115    case CGenNodeStateless::ss_INACTIVE :
116        res="INACTIVE ";
117        break;
118    case CGenNodeStateless::ss_ACTIVE :
119        res="ACTIVE   ";
120        break;
121    default:
122        res="Unknow   ";
123    };
124    return(res);
125}
126
127
128rte_mbuf_t   * CGenNodeStateless::alloc_node_with_vm(){
129
130    rte_mbuf_t        * m;
131    /* alloc small packet buffer*/
132    uint16_t prefix_size = prefix_header_size();
133    m = CGlobalInfo::pktmbuf_alloc( get_socket_id(), prefix_size );
134    if (m==0) {
135        return (m);
136    }
137    /* TBD remove this, should handle cases of error */
138    assert(m);
139    char *p=rte_pktmbuf_append(m, prefix_size);
140    memcpy( p ,m_original_packet_data_prefix, prefix_size);
141
142
143    /* run the VM program */
144    StreamDPVmInstructionsRunner runner;
145
146    runner.run( (uint32_t*)m_vm_flow_var,
147                m_vm_program_size,
148                m_vm_program,
149                m_vm_flow_var,
150                (uint8_t*)p);
151
152    uint16_t pkt_new_size=runner.get_new_pkt_size();
153    if ( likely( pkt_new_size == 0) ) {
154        /* no packet size change */
155        rte_mbuf_t * m_const = get_const_mbuf();
156        if (  m_const != NULL) {
157            utl_rte_pktmbuf_add_after(m,m_const);
158        }
159        return (m);
160    }
161
162    /* packet size change there are a few changes */
163    rte_mbuf_t * m_const = get_const_mbuf();
164    if ( (m_const == 0 ) || (pkt_new_size<=prefix_size) ) {
165        /* one mbuf , just trim it */
166        m->data_len = pkt_new_size;
167        m->pkt_len  = pkt_new_size;
168        return (m);
169    }
170
171    rte_mbuf_t * mi= CGlobalInfo::pktmbuf_alloc_small(get_socket_id());
172    assert(mi);
173    rte_pktmbuf_attach(mi,m_const);
174    utl_rte_pktmbuf_add_after2(m,mi);
175
176    if ( pkt_new_size < m->pkt_len) {
177        /* need to trim it */
178        mi->data_len = (pkt_new_size - prefix_size);
179        m->pkt_len   = pkt_new_size;
180    }
181    return (m);
182}
183
184
185void CGenNodeStateless::free_stl_node(){
186    /* if we have cache mbuf free it */
187    rte_mbuf_t * m=get_cache_mbuf();
188    if (m) {
189        rte_pktmbuf_free(m);
190        m_cache_mbuf=0;
191    }else{
192        /* non cache - must have an header */
193         m=get_const_mbuf();
194         if (m) {
195             rte_pktmbuf_free(m); /* reduce the ref counter */
196         }
197         free_prefix_header();
198    }
199    if (m_vm_flow_var) {
200        /* free flow var */
201        free(m_vm_flow_var);
202        m_vm_flow_var=0;
203    }
204}
205
206
207bool TrexStatelessDpPerPort::update_number_of_active_streams(uint32_t d){
208    m_active_streams-=d; /* reduce the number of streams */
209    if (m_active_streams == 0) {
210        return (true);
211    }
212    return (false);
213}
214
215bool TrexStatelessDpPerPort::resume_traffic(uint8_t port_id){
216
217    /* we are working with continues streams so we must be in transmit mode */
218    assert(m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE);
219
220    for (auto dp_stream : m_active_nodes) {
221        CGenNodeStateless * node =dp_stream.m_node;
222        assert(node->get_port_id() == port_id);
223        assert(node->is_pause() == true);
224        node->set_pause(false);
225    }
226    m_state = TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
227    return (true);
228}
229
230bool TrexStatelessDpPerPort::update_traffic(uint8_t port_id, double factor) {
231
232    assert( (m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING ||
233            (m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE)) );
234
235    for (auto dp_stream : m_active_nodes) {
236        CGenNodeStateless * node = dp_stream.m_node;
237        assert(node->get_port_id() == port_id);
238
239        node->update_rate(factor);
240    }
241
242    return (true);
243}
244
245bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){
246
247    /* we are working with continues streams so we must be in transmit mode */
248    assert(m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING);
249
250    for (auto dp_stream : m_active_nodes) {
251        CGenNodeStateless * node =dp_stream.m_node;
252        assert(node->get_port_id() == port_id);
253        assert(node->is_pause() == false);
254        node->set_pause(true);
255    }
256    m_state = TrexStatelessDpPerPort::ppSTATE_PAUSE;
257    return (true);
258}
259
260
261bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id,
262                                          bool stop_on_id,
263                                          int event_id){
264
265
266    if (m_state == TrexStatelessDpPerPort::ppSTATE_IDLE) {
267        assert(m_active_streams==0);
268        return false;
269    }
270
271    /* there could be race of stop after stop */
272    if ( stop_on_id ) {
273        if (event_id != m_event_id){
274            /* we can't stop it is an old message */
275            return false;
276        }
277    }
278
279    for (auto dp_stream : m_active_nodes) {
280        CGenNodeStateless * node =dp_stream.m_node;
281        assert(node->get_port_id() == port_id);
282        if ( node->get_state() == CGenNodeStateless::ss_ACTIVE) {
283            node->mark_for_free();
284            m_active_streams--;
285            dp_stream.DeleteOnlyStream();
286
287        }else{
288            dp_stream.Delete(m_core);
289        }
290    }
291
292    /* active stream should be zero */
293    assert(m_active_streams==0);
294    m_active_nodes.clear();
295    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
296    return (true);
297}
298
299
300void TrexStatelessDpPerPort::create(CFlowGenListPerThread   *  core){
301    m_core=core;
302    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
303    m_port_id=0;
304    m_active_streams=0;
305    m_active_nodes.clear();
306}
307
308
309
310void
311TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) {
312    m_thread_id = thread_id;
313    m_core = core;
314    m_local_port_offset = 2*core->getDualPortId();
315
316    CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
317
318    m_ring_from_cp = cp_dp->getRingCpToDp(thread_id);
319    m_ring_to_cp   = cp_dp->getRingDpToCp(thread_id);
320
321    m_state = STATE_IDLE;
322
323    int i;
324    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
325        m_ports[i].create(core);
326    }
327}
328
329
330/* move to the next stream, old stream move to INACTIVE */
331bool TrexStatelessDpCore::set_stateless_next_node(CGenNodeStateless * cur_node,
332                                                  CGenNodeStateless * next_node){
333
334    assert(cur_node);
335    TrexStatelessDpPerPort * lp_port = get_port_db(cur_node->m_port_id);
336    bool schedule =false;
337
338    bool to_stop_port=false;
339
340    if (next_node == NULL) {
341        /* there is no next stream , reduce the number of active streams*/
342        to_stop_port = lp_port->update_number_of_active_streams(1);
343
344    }else{
345        uint8_t state=next_node->get_state();
346
347        /* can't be FREE_RESUSE */
348        assert(state != CGenNodeStateless::ss_FREE_RESUSE);
349        if (state == CGenNodeStateless::ss_INACTIVE ) {
350
351            if (cur_node->m_action_counter > 0) {
352                cur_node->m_action_counter--;
353                if (cur_node->m_action_counter==0) {
354                    to_stop_port = lp_port->update_number_of_active_streams(1);
355                }else{
356                    /* refill start info and scedule, no update in active streams  */
357                    next_node->refresh();
358                    schedule = true;
359                }
360            }else{
361                /* refill start info and scedule, no update in active streams  */
362                next_node->refresh();
363                schedule = true;
364            }
365
366        }else{
367            to_stop_port = lp_port->update_number_of_active_streams(1);
368        }
369    }
370
371    if ( to_stop_port ) {
372        /* call stop port explictly to move the state */
373        stop_traffic(cur_node->m_port_id,false,0);
374    }
375
376    return ( schedule );
377}
378
379
380
381/**
382 * in idle state loop, the processor most of the time sleeps
383 * and periodically checks for messages
384 *
385 * @author imarom (01-Nov-15)
386 */
387void
388TrexStatelessDpCore::idle_state_loop() {
389
390    while (m_state == STATE_IDLE) {
391        bool had_msg = periodic_check_for_cp_messages();
392        /* if no message - backoff for some time */
393        if (!had_msg) {
394            delay(200);
395        }
396    }
397}
398
399
400
401void TrexStatelessDpCore::quit_main_loop(){
402    m_core->set_terminate_mode(true); /* mark it as terminated */
403    m_state = STATE_TERMINATE;
404    add_global_duration(0.0001);
405}
406
407
408/**
409 * scehduler runs when traffic exists
410 * it will return when no more transmitting is done on this
411 * core
412 *
413 * @author imarom (01-Nov-15)
414 */
415void
416TrexStatelessDpCore::start_scheduler() {
417    /* creates a maintenace job using the scheduler */
418    CGenNode * node_sync = m_core->create_node() ;
419    node_sync->m_type = CGenNode::FLOW_SYNC;
420    node_sync->m_time = m_core->m_cur_time_sec + SYNC_TIME_OUT;
421    m_core->m_node_gen.add_node(node_sync);
422
423    double old_offset = 0.0;
424    m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset);
425    /* bail out in case of terminate */
426    if (m_state != TrexStatelessDpCore::STATE_TERMINATE) {
427        m_core->m_node_gen.close_file(m_core);
428        m_state = STATE_IDLE; /* we exit from all ports and we have nothing to do, we move to IDLE state */
429    }
430}
431
432
433void
434TrexStatelessDpCore::run_once(){
435
436    idle_state_loop();
437
438    if ( m_state == STATE_TERMINATE ){
439        return;
440    }
441
442    start_scheduler();
443}
444
445
446
447
448void
449TrexStatelessDpCore::start() {
450
451    while (true) {
452        run_once();
453
454        if ( m_core->is_terminated_by_master() ) {
455            break;
456        }
457    }
458}
459
460/* only if both port are idle we can exit */
461void
462TrexStatelessDpCore::schedule_exit(){
463
464    CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
465
466    node->m_type = CGenNode::COMMAND;
467
468    node->m_cmd = new TrexStatelessDpCanQuit();
469
470    /* make sure it will be scheduled after the current node */
471    node->m_time = m_core->m_cur_time_sec ;
472
473    m_core->m_node_gen.add_node((CGenNode *)node);
474}
475
476
477void
478TrexStatelessDpCore::add_global_duration(double duration){
479    if (duration > 0.0) {
480        CGenNode *node = m_core->create_node() ;
481
482        node->m_type = CGenNode::EXIT_SCHED;
483
484        /* make sure it will be scheduled after the current node */
485        node->m_time = m_core->m_cur_time_sec + duration ;
486
487        m_core->m_node_gen.add_node(node);
488    }
489}
490
491/* add per port exit */
492void
493TrexStatelessDpCore::add_port_duration(double duration,
494                                       uint8_t port_id,
495                                       int event_id){
496    if (duration > 0.0) {
497        CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
498
499        node->m_type = CGenNode::COMMAND;
500
501        /* make sure it will be scheduled after the current node */
502        node->m_time = m_core->m_cur_time_sec + duration ;
503
504        TrexStatelessDpStop * cmd=new TrexStatelessDpStop(port_id);
505
506
507        /* test this */
508        m_core->m_non_active_nodes++;
509        cmd->set_core_ptr(m_core);
510        cmd->set_event_id(event_id);
511        cmd->set_wait_for_event_id(true);
512
513        node->m_cmd = cmd;
514
515        m_core->m_node_gen.add_node((CGenNode *)node);
516    }
517}
518
519
520void TrexStatelessDpCore::update_mac_addr(TrexStream * stream,
521                                          CGenNodeStateless *node,
522                                          pkt_dir_t dir,
523                                          char *raw_pkt){
524    bool              ov_src = stream->get_override_src_mac_by_pkt_data();
525    TrexStream::stream_dst_mac_t  ov_dst = stream->get_override_dst_mac_mode();
526
527
528    if ( (ov_src == true) && (ov_dst == TrexStream::stPKT) ) {
529        /* nothing to do, take from the packet both */
530        return;
531    }
532
533        /* take from cfg_file */
534    if ( (ov_src == false) &&
535         (ov_dst == TrexStream::stCFG_FILE) ){
536
537          m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*)raw_pkt);
538          return;
539    }
540
541    /* save the pkt*/
542    char tmp_pkt[12];
543    memcpy(tmp_pkt,raw_pkt,12);
544
545    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*)raw_pkt);
546
547    if ((ov_src == true) && (ov_dst == TrexStream::stCFG_FILE)) {
548        memcpy(raw_pkt+6,tmp_pkt+6,6);
549    }
550
551    if ((ov_src == false) && (ov_dst == TrexStream::stPKT)) {
552        memcpy(raw_pkt,tmp_pkt,6);
553    }
554}
555
556
557void
558TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port,
559                                TrexStream * stream,
560                                TrexStreamsCompiledObj *comp) {
561
562    CGenNodeStateless *node = m_core->create_node_sl();
563
564    /* add periodic */
565    node->m_cache_mbuf=0;
566    node->m_type = CGenNode::STATELESS_PKT;
567
568    node->m_action_counter = stream->m_action_count;
569
570    /* clone the stream from control plane memory to DP memory */
571    node->m_ref_stream_info = stream->clone();
572    /* no need for this memory anymore on the control plane memory */
573    stream->release_dp_object();
574
575    node->m_next_stream=0; /* will be fixed later */
576
577
578    if ( stream->m_self_start ){
579        /* if self start it is in active mode */
580        node->m_state =CGenNodeStateless::ss_ACTIVE;
581        lp_port->m_active_streams++;
582    }else{
583        node->m_state =CGenNodeStateless::ss_INACTIVE;
584    }
585
586    node->m_time = m_core->m_cur_time_sec + usec_to_sec(stream->m_isg_usec);
587
588    pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(stream->m_port_id);
589    node->m_flags = 0;
590    node->m_src_port =0;
591    node->m_original_packet_data_prefix = 0;
592
593
594
595    /* set socket id */
596    node->set_socket_id(m_core->m_node_gen.m_socket_id);
597
598    /* build a mbuf from a packet */
599
600    uint16_t pkt_size = stream->m_pkt.len;
601    const uint8_t *stream_pkt = stream->m_pkt.binary;
602
603    node->m_pause =0;
604    node->m_stream_type = stream->m_type;
605    node->m_next_time_offset = 1.0 / stream->get_pps();
606
607    /* stateless specific fields */
608    switch ( stream->m_type ) {
609
610    case TrexStream::stCONTINUOUS :
611        node->m_single_burst=0;
612        node->m_single_burst_refill=0;
613        node->m_multi_bursts=0;
614        break;
615
616    case TrexStream::stSINGLE_BURST :
617        node->m_stream_type             = TrexStream::stMULTI_BURST;
618        node->m_single_burst            = stream->m_burst_total_pkts;
619        node->m_single_burst_refill     = stream->m_burst_total_pkts;
620        node->m_multi_bursts            = 1;  /* single burst in multi burst of 1 */
621        break;
622
623    case TrexStream::stMULTI_BURST :
624        node->m_single_burst        = stream->m_burst_total_pkts;
625        node->m_single_burst_refill = stream->m_burst_total_pkts;
626        node->m_multi_bursts        = stream->m_num_bursts;
627        break;
628    default:
629
630        assert(0);
631    };
632
633    node->m_port_id = stream->m_port_id;
634
635    /* set dir 0 or 1 client or server */
636    node->set_mbuf_cache_dir(dir);
637
638
639    if (node->m_ref_stream_info->getDpVm() == NULL) {
640        /* no VM */
641
642        node->m_vm_flow_var =  NULL;
643        node->m_vm_program  =  NULL;
644        node->m_vm_program_size =0;
645
646                /* allocate const mbuf */
647        rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size);
648        assert(m);
649
650        char *p = rte_pktmbuf_append(m, pkt_size);
651        assert(p);
652        /* copy the packet */
653        memcpy(p,stream_pkt,pkt_size);
654
655        update_mac_addr(stream,node,dir,p);
656
657        /* set the packet as a readonly */
658        node->set_cache_mbuf(m);
659
660        node->m_original_packet_data_prefix =0;
661    }else{
662
663        /* set the program */
664        TrexStream * local_mem_stream = node->m_ref_stream_info;
665
666        StreamVmDp  * lpDpVm = local_mem_stream->getDpVm();
667
668        node->m_vm_flow_var      = lpDpVm->clone_bss(); /* clone the flow var */
669        node->m_vm_program       = lpDpVm->get_program(); /* same ref to the program */
670        node->m_vm_program_size  = lpDpVm->get_program_size();
671
672
673        /* we need to copy the object */
674        if ( pkt_size > lpDpVm->get_prefix_size() ) {
675            /* we need const packet */
676            uint16_t const_pkt_size  = pkt_size - lpDpVm->get_prefix_size() ;
677            rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), const_pkt_size );
678            assert(m);
679
680            char *p = rte_pktmbuf_append(m, const_pkt_size);
681            assert(p);
682
683            /* copy packet data */
684            memcpy(p,(stream_pkt + lpDpVm->get_prefix_size()),const_pkt_size);
685
686            node->set_const_mbuf(m);
687        }
688
689
690        if ( lpDpVm->is_pkt_size_var() ) {
691            // mark the node as varible size
692            node->set_var_pkt_size();
693        }
694
695
696        if (lpDpVm->get_prefix_size() > pkt_size ) {
697            lpDpVm->set_prefix_size(pkt_size);
698        }
699
700        /* copy the headr */
701        uint16_t header_size = lpDpVm->get_prefix_size();
702        assert(header_size);
703        node->alloc_prefix_header(header_size);
704        uint8_t *p=node->m_original_packet_data_prefix;
705        assert(p);
706
707        memcpy(p,stream_pkt , header_size);
708
709        update_mac_addr(stream,node,dir,(char *)p);
710    }
711
712
713    CDpOneStream one_stream;
714
715    one_stream.m_dp_stream = node->m_ref_stream_info;
716    one_stream.m_node =node;
717
718    lp_port->m_active_nodes.push_back(one_stream);
719
720    /* schedule only if active */
721    if (node->m_state == CGenNodeStateless::ss_ACTIVE) {
722        m_core->m_node_gen.add_node((CGenNode *)node);
723    }
724}
725
726void
727TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj,
728                                   double duration,
729                                   int event_id) {
730
731
732    TrexStatelessDpPerPort * lp_port=get_port_db(obj->get_port_id());
733    lp_port->m_active_streams = 0;
734    lp_port->set_event_id(event_id);
735
736    /* no nodes in the list */
737    assert(lp_port->m_active_nodes.size()==0);
738
739    for (auto single_stream : obj->get_objects()) {
740        /* all commands should be for the same port */
741        assert(obj->get_port_id() == single_stream.m_stream->m_port_id);
742        add_stream(lp_port,single_stream.m_stream,obj);
743    }
744
745    uint32_t nodes = lp_port->m_active_nodes.size();
746    /* find next stream */
747    assert(nodes == obj->get_objects().size());
748
749    int cnt=0;
750
751    /* set the next_stream pointer  */
752    for (auto single_stream : obj->get_objects()) {
753
754        if (single_stream.m_stream->is_dp_next_stream() ) {
755            int stream_id = single_stream.m_stream->m_next_stream_id;
756            assert(stream_id<nodes);
757            /* point to the next stream , stream_id is fixed */
758            lp_port->m_active_nodes[cnt].m_node->m_next_stream = lp_port->m_active_nodes[stream_id].m_node ;
759        }
760        cnt++;
761    }
762
763    lp_port->m_state =TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
764    m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
765
766
767    if ( duration > 0.0 ){
768        add_port_duration( duration ,obj->get_port_id(),event_id );
769    }
770
771}
772
773
774bool TrexStatelessDpCore::are_all_ports_idle(){
775
776    bool res=true;
777    int i;
778    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
779        if ( m_ports[i].m_state != TrexStatelessDpPerPort::ppSTATE_IDLE ){
780            res=false;
781        }
782    }
783    return (res);
784}
785
786
787void
788TrexStatelessDpCore::resume_traffic(uint8_t port_id){
789
790    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
791
792    lp_port->resume_traffic(port_id);
793}
794
795
796void
797TrexStatelessDpCore::pause_traffic(uint8_t port_id){
798
799    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
800
801    lp_port->pause_traffic(port_id);
802}
803
804void
805TrexStatelessDpCore::update_traffic(uint8_t port_id, double factor) {
806
807    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
808
809    lp_port->update_traffic(port_id, factor);
810}
811
812
813void
814TrexStatelessDpCore::stop_traffic(uint8_t port_id,
815                                  bool stop_on_id,
816                                  int event_id) {
817    /* we cannot remove nodes not from the top of the queue so
818       for every active node - make sure next time
819       the scheduler invokes it, it will be free */
820
821    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
822
823    if ( lp_port->stop_traffic(port_id,stop_on_id,event_id) == false){
824        /* nothing to do ! already stopped */
825        //printf(" skip .. %f\n",m_core->m_cur_time_sec);
826        return;
827    }
828
829#if 0
830    if ( are_all_ports_idle() ) {
831        /* just a place holder if we will need to do somthing in that case */
832    }
833#endif
834
835    /* inform the control plane we stopped - this might be a async stop
836       (streams ended)
837     */
838    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
839    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
840                                                                   port_id,
841                                                                   TrexDpPortEvent::EVENT_STOP,
842                                                                   lp_port->get_event_id());
843    ring->Enqueue((CGenNode *)event_msg);
844
845}
846
847/**
848 * handle a message from CP to DP
849 *
850 */
851void
852TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) {
853    msg->handle(this);
854    delete msg;
855}
856
857