trex_stateless_dp_core.cpp revision 857bdcf0
1/*
2 Itay Marom
3 Hanoch Haim
4 Cisco Systems, Inc.
5*/
6
7/*
8Copyright (c) 2015-2015 Cisco Systems, Inc.
9
10Licensed under the Apache License, Version 2.0 (the "License");
11you may not use this file except in compliance with the License.
12You may obtain a copy of the License at
13
14    http://www.apache.org/licenses/LICENSE-2.0
15
16Unless required by applicable law or agreed to in writing, software
17distributed under the License is distributed on an "AS IS" BASIS,
18WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19See the License for the specific language governing permissions and
20limitations under the License.
21*/
22#include <trex_stateless_dp_core.h>
23#include <trex_stateless_messaging.h>
24#include <trex_streams_compiler.h>
25#include <trex_stream_node.h>
26#include <trex_stream.h>
27
28#include <bp_sim.h>
29
30
31void CDpOneStream::Delete(CFlowGenListPerThread   * core){
32    assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE);
33    core->free_node((CGenNode *)m_node);
34    delete m_dp_stream;
35    m_node=0;
36    m_dp_stream=0;
37}
38
39void CDpOneStream::DeleteOnlyStream(){
40    assert(m_dp_stream);
41    delete m_dp_stream;
42    m_dp_stream=0;
43}
44
45int CGenNodeStateless::get_stream_id(){
46    if (m_state ==CGenNodeStateless::ss_FREE_RESUSE) {
47        return (-1); // not valid
48    }
49    assert(m_ref_stream_info);
50    return ((int)m_ref_stream_info->m_stream_id);
51}
52
53
54void CGenNodeStateless::DumpHeader(FILE *fd){
55    fprintf(fd," pkt_id, time, port , action , state, stream_id , stype , m-burst# , burst# \n");
56
57}
58void CGenNodeStateless::Dump(FILE *fd){
59    fprintf(fd," %2.4f, %3lu, %s,%s, %3d, %s, %3lu, %3lu  \n",
60            m_time,
61            (ulong)m_port_id,
62            "s-pkt", //action
63            get_stream_state_str(m_state ).c_str(),
64            get_stream_id(),   //stream_id
65            TrexStream::get_stream_type_str(m_stream_type).c_str(), //stype
66            (ulong)m_multi_bursts,
67            (ulong)m_single_burst
68            );
69}
70
71
72
73void CGenNodeStateless::refresh_vm_bss(){
74    if ( m_vm_flow_var ) {
75        StreamVmDp  * vm_s=m_ref_stream_info->m_vm_dp;
76        assert(vm_s);
77        memcpy(m_vm_flow_var,vm_s->get_bss(),vm_s->get_bss_size());
78    }
79}
80
81
82/**
83 * this function called when stream restart after it was inactive
84 */
85void CGenNodeStateless::refresh(){
86
87    /* refill the stream info */
88    m_single_burst    = m_single_burst_refill;
89    m_multi_bursts    = m_ref_stream_info->m_num_bursts;
90    m_state           = CGenNodeStateless::ss_ACTIVE;
91
92    /* refresh init value */
93#if 0
94    /* TBD should add a JSON varible for that */
95    refresh_vm_bss();
96#endif
97}
98
99
100void CGenNodeCommand::free_command(){
101
102    assert(m_cmd);
103    m_cmd->on_node_remove();
104    delete m_cmd;
105}
106
107
108std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state){
109    std::string res;
110
111    switch (stream_state) {
112    case CGenNodeStateless::ss_FREE_RESUSE :
113         res="FREE    ";
114        break;
115    case CGenNodeStateless::ss_INACTIVE :
116        res="INACTIVE ";
117        break;
118    case CGenNodeStateless::ss_ACTIVE :
119        res="ACTIVE   ";
120        break;
121    default:
122        res="Unknow   ";
123    };
124    return(res);
125}
126
127
128rte_mbuf_t   * CGenNodeStateless::alloc_node_with_vm(){
129
130    rte_mbuf_t        * m;
131    /* alloc small packet buffer*/
132    uint16_t prefix_size = prefix_header_size();
133    m = CGlobalInfo::pktmbuf_alloc( get_socket_id(), prefix_size );
134    if (m==0) {
135        return (m);
136    }
137    /* TBD remove this, should handle cases of error */
138    assert(m);
139    char *p=rte_pktmbuf_append(m, prefix_size);
140    memcpy( p ,m_original_packet_data_prefix, prefix_size);
141
142
143    /* run the VM program */
144    StreamDPVmInstructionsRunner runner;
145
146    runner.run( (uint32_t*)m_vm_flow_var,
147                m_vm_program_size,
148                m_vm_program,
149                m_vm_flow_var,
150                (uint8_t*)p);
151
152
153    rte_mbuf_t * m_const = get_const_mbuf();
154    if (  m_const != NULL) {
155        utl_rte_pktmbuf_add_after(m,m_const);
156    }
157    return (m);
158}
159
160
161void CGenNodeStateless::free_stl_node(){
162    /* if we have cache mbuf free it */
163    rte_mbuf_t * m=get_cache_mbuf();
164    if (m) {
165        rte_pktmbuf_free(m);
166        m_cache_mbuf=0;
167    }else{
168        /* non cache - must have an header */
169         m=get_const_mbuf();
170         if (m) {
171             rte_pktmbuf_free(m); /* reduce the ref counter */
172         }
173         free_prefix_header();
174    }
175    if (m_vm_flow_var) {
176        /* free flow var */
177        free(m_vm_flow_var);
178        m_vm_flow_var=0;
179    }
180}
181
182
183bool TrexStatelessDpPerPort::update_number_of_active_streams(uint32_t d){
184    m_active_streams-=d; /* reduce the number of streams */
185    if (m_active_streams == 0) {
186        return (true);
187    }
188    return (false);
189}
190
191bool TrexStatelessDpPerPort::resume_traffic(uint8_t port_id){
192
193    /* we are working with continues streams so we must be in transmit mode */
194    assert(m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE);
195
196    for (auto dp_stream : m_active_nodes) {
197        CGenNodeStateless * node =dp_stream.m_node;
198        assert(node->get_port_id() == port_id);
199        assert(node->is_pause() == true);
200        node->set_pause(false);
201    }
202    m_state = TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
203    return (true);
204}
205
206bool TrexStatelessDpPerPort::update_traffic(uint8_t port_id, double factor) {
207
208    assert( (m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING ||
209            (m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE)) );
210
211    for (auto dp_stream : m_active_nodes) {
212        CGenNodeStateless * node = dp_stream.m_node;
213        assert(node->get_port_id() == port_id);
214
215        node->update_rate(factor);
216    }
217
218    return (true);
219}
220
221bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){
222
223    /* we are working with continues streams so we must be in transmit mode */
224    assert(m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING);
225
226    for (auto dp_stream : m_active_nodes) {
227        CGenNodeStateless * node =dp_stream.m_node;
228        assert(node->get_port_id() == port_id);
229        assert(node->is_pause() == false);
230        node->set_pause(true);
231    }
232    m_state = TrexStatelessDpPerPort::ppSTATE_PAUSE;
233    return (true);
234}
235
236
237bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id,
238                                          bool stop_on_id,
239                                          int event_id){
240
241
242    if (m_state == TrexStatelessDpPerPort::ppSTATE_IDLE) {
243        assert(m_active_streams==0);
244        return false;
245    }
246
247    /* there could be race of stop after stop */
248    if ( stop_on_id ) {
249        if (event_id != m_event_id){
250            /* we can't stop it is an old message */
251            return false;
252        }
253    }
254
255    for (auto dp_stream : m_active_nodes) {
256        CGenNodeStateless * node =dp_stream.m_node;
257        assert(node->get_port_id() == port_id);
258        if ( node->get_state() == CGenNodeStateless::ss_ACTIVE) {
259            node->mark_for_free();
260            m_active_streams--;
261            dp_stream.DeleteOnlyStream();
262
263        }else{
264            dp_stream.Delete(m_core);
265        }
266    }
267
268    /* active stream should be zero */
269    assert(m_active_streams==0);
270    m_active_nodes.clear();
271    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
272    return (true);
273}
274
275
276void TrexStatelessDpPerPort::create(CFlowGenListPerThread   *  core){
277    m_core=core;
278    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
279    m_port_id=0;
280    m_active_streams=0;
281    m_active_nodes.clear();
282}
283
284
285
286void
287TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) {
288    m_thread_id = thread_id;
289    m_core = core;
290    m_local_port_offset = 2*core->getDualPortId();
291
292    CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
293
294    m_ring_from_cp = cp_dp->getRingCpToDp(thread_id);
295    m_ring_to_cp   = cp_dp->getRingDpToCp(thread_id);
296
297    m_state = STATE_IDLE;
298
299    int i;
300    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
301        m_ports[i].create(core);
302    }
303}
304
305
306/* move to the next stream, old stream move to INACTIVE */
307bool TrexStatelessDpCore::set_stateless_next_node(CGenNodeStateless * cur_node,
308                                                  CGenNodeStateless * next_node){
309
310    assert(cur_node);
311    TrexStatelessDpPerPort * lp_port = get_port_db(cur_node->m_port_id);
312    bool schedule =false;
313
314    bool to_stop_port=false;
315
316    if (next_node == NULL) {
317        /* there is no next stream , reduce the number of active streams*/
318        to_stop_port = lp_port->update_number_of_active_streams(1);
319
320    }else{
321        uint8_t state=next_node->get_state();
322
323        /* can't be FREE_RESUSE */
324        assert(state != CGenNodeStateless::ss_FREE_RESUSE);
325        if (state == CGenNodeStateless::ss_INACTIVE ) {
326
327            /* refill start info and scedule, no update in active streams  */
328            next_node->refresh();
329            schedule = true;
330
331        }else{
332            to_stop_port = lp_port->update_number_of_active_streams(1);
333        }
334    }
335
336    if ( to_stop_port ) {
337        /* call stop port explictly to move the state */
338        stop_traffic(cur_node->m_port_id,false,0);
339    }
340
341    return ( schedule );
342}
343
344
345
346/**
347 * in idle state loop, the processor most of the time sleeps
348 * and periodically checks for messages
349 *
350 * @author imarom (01-Nov-15)
351 */
352void
353TrexStatelessDpCore::idle_state_loop() {
354
355    while (m_state == STATE_IDLE) {
356        bool had_msg = periodic_check_for_cp_messages();
357        /* if no message - backoff for some time */
358        if (!had_msg) {
359            delay(200);
360        }
361    }
362}
363
364
365
366void TrexStatelessDpCore::quit_main_loop(){
367    m_core->set_terminate_mode(true); /* mark it as terminated */
368    m_state = STATE_TERMINATE;
369    add_global_duration(0.0001);
370}
371
372
373/**
374 * scehduler runs when traffic exists
375 * it will return when no more transmitting is done on this
376 * core
377 *
378 * @author imarom (01-Nov-15)
379 */
380void
381TrexStatelessDpCore::start_scheduler() {
382    /* creates a maintenace job using the scheduler */
383    CGenNode * node_sync = m_core->create_node() ;
384    node_sync->m_type = CGenNode::FLOW_SYNC;
385    node_sync->m_time = m_core->m_cur_time_sec + SYNC_TIME_OUT;
386    m_core->m_node_gen.add_node(node_sync);
387
388    double old_offset = 0.0;
389    m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset);
390    /* bail out in case of terminate */
391    if (m_state != TrexStatelessDpCore::STATE_TERMINATE) {
392        m_core->m_node_gen.close_file(m_core);
393        m_state = STATE_IDLE; /* we exit from all ports and we have nothing to do, we move to IDLE state */
394    }
395}
396
397
398void
399TrexStatelessDpCore::run_once(){
400
401    idle_state_loop();
402
403    if ( m_state == STATE_TERMINATE ){
404        return;
405    }
406
407    start_scheduler();
408}
409
410
411
412
413void
414TrexStatelessDpCore::start() {
415
416    while (true) {
417        run_once();
418
419        if ( m_core->is_terminated_by_master() ) {
420            break;
421        }
422    }
423}
424
425/* only if both port are idle we can exit */
426void
427TrexStatelessDpCore::schedule_exit(){
428
429    CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
430
431    node->m_type = CGenNode::COMMAND;
432
433    node->m_cmd = new TrexStatelessDpCanQuit();
434
435    /* make sure it will be scheduled after the current node */
436    node->m_time = m_core->m_cur_time_sec ;
437
438    m_core->m_node_gen.add_node((CGenNode *)node);
439}
440
441
442void
443TrexStatelessDpCore::add_global_duration(double duration){
444    if (duration > 0.0) {
445        CGenNode *node = m_core->create_node() ;
446
447        node->m_type = CGenNode::EXIT_SCHED;
448
449        /* make sure it will be scheduled after the current node */
450        node->m_time = m_core->m_cur_time_sec + duration ;
451
452        m_core->m_node_gen.add_node(node);
453    }
454}
455
456/* add per port exit */
457void
458TrexStatelessDpCore::add_port_duration(double duration,
459                                       uint8_t port_id,
460                                       int event_id){
461    if (duration > 0.0) {
462        CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
463
464        node->m_type = CGenNode::COMMAND;
465
466        /* make sure it will be scheduled after the current node */
467        node->m_time = m_core->m_cur_time_sec + duration ;
468
469        TrexStatelessDpStop * cmd=new TrexStatelessDpStop(port_id);
470
471
472        /* test this */
473        m_core->m_non_active_nodes++;
474        cmd->set_core_ptr(m_core);
475        cmd->set_event_id(event_id);
476        cmd->set_wait_for_event_id(true);
477
478        node->m_cmd = cmd;
479
480        m_core->m_node_gen.add_node((CGenNode *)node);
481    }
482}
483
484
485void
486TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port,
487                                TrexStream * stream,
488                                TrexStreamsCompiledObj *comp) {
489
490    CGenNodeStateless *node = m_core->create_node_sl();
491
492    /* add periodic */
493    node->m_cache_mbuf=0;
494    node->m_type = CGenNode::STATELESS_PKT;
495
496    /* clone the stream from control plane memory to DP memory */
497    node->m_ref_stream_info = stream->clone();
498    /* no need for this memory anymore on the control plane memory */
499    stream->release_dp_object();
500
501    node->m_next_stream=0; /* will be fixed later */
502
503
504    if ( stream->m_self_start ){
505        /* if self start it is in active mode */
506        node->m_state =CGenNodeStateless::ss_ACTIVE;
507        lp_port->m_active_streams++;
508    }else{
509        node->m_state =CGenNodeStateless::ss_INACTIVE;
510    }
511
512    node->m_time = m_core->m_cur_time_sec + usec_to_sec(stream->m_isg_usec);
513
514    pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(stream->m_port_id);
515    node->m_flags = 0;
516    node->m_src_port =0;
517    node->m_original_packet_data_prefix = 0;
518
519
520
521    /* set socket id */
522    node->set_socket_id(m_core->m_node_gen.m_socket_id);
523
524    /* build a mbuf from a packet */
525
526    uint16_t pkt_size = stream->m_pkt.len;
527    const uint8_t *stream_pkt = stream->m_pkt.binary;
528
529    node->m_pause =0;
530    node->m_stream_type = stream->m_type;
531    node->m_next_time_offset = 1.0 / stream->get_pps();
532
533    /* stateless specific fields */
534    switch ( stream->m_type ) {
535
536    case TrexStream::stCONTINUOUS :
537        node->m_single_burst=0;
538        node->m_single_burst_refill=0;
539        node->m_multi_bursts=0;
540        node->m_ibg_sec                 = 0.0;
541        break;
542
543    case TrexStream::stSINGLE_BURST :
544        node->m_stream_type             = TrexStream::stMULTI_BURST;
545        node->m_single_burst            = stream->m_burst_total_pkts;
546        node->m_single_burst_refill     = stream->m_burst_total_pkts;
547        node->m_multi_bursts            = 1;  /* single burst in multi burst of 1 */
548        node->m_ibg_sec                 = 0.0;
549        break;
550
551    case TrexStream::stMULTI_BURST :
552        node->m_single_burst        = stream->m_burst_total_pkts;
553        node->m_single_burst_refill = stream->m_burst_total_pkts;
554        node->m_multi_bursts        = stream->m_num_bursts;
555        node->m_ibg_sec             = usec_to_sec( stream->m_ibg_usec );
556        break;
557    default:
558
559        assert(0);
560    };
561
562    node->m_port_id = stream->m_port_id;
563
564    /* set dir 0 or 1 client or server */
565    node->set_mbuf_cache_dir(dir);
566
567
568    if (node->m_ref_stream_info->getDpVm() == NULL) {
569        /* no VM */
570
571        node->m_vm_flow_var =  NULL;
572        node->m_vm_program  =  NULL;
573        node->m_vm_program_size =0;
574
575                /* allocate const mbuf */
576        rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size);
577        assert(m);
578
579        char *p = rte_pktmbuf_append(m, pkt_size);
580        assert(p);
581        /* copy the packet */
582        memcpy(p,stream_pkt,pkt_size);
583
584        /* TBD repace the mac if req we should add flag  */
585        m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*) p);
586
587        /* set the packet as a readonly */
588        node->set_cache_mbuf(m);
589
590        node->m_original_packet_data_prefix =0;
591    }else{
592
593        /* set the program */
594        TrexStream * local_mem_stream = node->m_ref_stream_info;
595
596        StreamVmDp  * lpDpVm = local_mem_stream->getDpVm();
597
598        node->m_vm_flow_var      = lpDpVm->clone_bss(); /* clone the flow var */
599        node->m_vm_program       = lpDpVm->get_program(); /* same ref to the program */
600        node->m_vm_program_size  = lpDpVm->get_program_size();
601
602
603        /* we need to copy the object */
604        if ( pkt_size > lpDpVm->get_prefix_size() ) {
605            /* we need const packet */
606            uint16_t const_pkt_size  = pkt_size - lpDpVm->get_prefix_size() ;
607            rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), const_pkt_size );
608            assert(m);
609
610            char *p = rte_pktmbuf_append(m, const_pkt_size);
611            assert(p);
612
613            /* copy packet data */
614            memcpy(p,(stream_pkt + lpDpVm->get_prefix_size()),const_pkt_size);
615
616            node->set_const_mbuf(m);
617        }
618
619
620        if (lpDpVm->get_prefix_size() > pkt_size ) {
621            lpDpVm->set_prefix_size(pkt_size);
622        }
623
624        /* copy the headr */
625        uint16_t header_size = lpDpVm->get_prefix_size();
626        assert(header_size);
627        node->alloc_prefix_header(header_size);
628        uint8_t *p=node->m_original_packet_data_prefix;
629        assert(p);
630
631        memcpy(p,stream_pkt , header_size);
632        /* TBD repace the mac if req we should add flag  */
633        m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, p);
634    }
635
636
637    CDpOneStream one_stream;
638
639    one_stream.m_dp_stream = node->m_ref_stream_info;
640    one_stream.m_node =node;
641
642    lp_port->m_active_nodes.push_back(one_stream);
643
644    /* schedule only if active */
645    if (node->m_state == CGenNodeStateless::ss_ACTIVE) {
646        m_core->m_node_gen.add_node((CGenNode *)node);
647    }
648}
649
650void
651TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj,
652                                   double duration,
653                                   int event_id) {
654
655
656    TrexStatelessDpPerPort * lp_port=get_port_db(obj->get_port_id());
657    lp_port->m_active_streams = 0;
658    lp_port->set_event_id(event_id);
659
660    /* no nodes in the list */
661    assert(lp_port->m_active_nodes.size()==0);
662
663    for (auto single_stream : obj->get_objects()) {
664        /* all commands should be for the same port */
665        assert(obj->get_port_id() == single_stream.m_stream->m_port_id);
666        add_stream(lp_port,single_stream.m_stream,obj);
667    }
668
669    uint32_t nodes = lp_port->m_active_nodes.size();
670    /* find next stream */
671    assert(nodes == obj->get_objects().size());
672
673    int cnt=0;
674
675    /* set the next_stream pointer  */
676    for (auto single_stream : obj->get_objects()) {
677
678        if (single_stream.m_stream->is_dp_next_stream() ) {
679            int stream_id = single_stream.m_stream->m_next_stream_id;
680            assert(stream_id<nodes);
681            /* point to the next stream , stream_id is fixed */
682            lp_port->m_active_nodes[cnt].m_node->m_next_stream = lp_port->m_active_nodes[stream_id].m_node ;
683        }
684        cnt++;
685    }
686
687    lp_port->m_state =TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
688    m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
689
690
691    if ( duration > 0.0 ){
692        add_port_duration( duration ,obj->get_port_id(),event_id );
693    }
694
695}
696
697
698bool TrexStatelessDpCore::are_all_ports_idle(){
699
700    bool res=true;
701    int i;
702    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
703        if ( m_ports[i].m_state != TrexStatelessDpPerPort::ppSTATE_IDLE ){
704            res=false;
705        }
706    }
707    return (res);
708}
709
710
711void
712TrexStatelessDpCore::resume_traffic(uint8_t port_id){
713
714    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
715
716    lp_port->resume_traffic(port_id);
717}
718
719
720void
721TrexStatelessDpCore::pause_traffic(uint8_t port_id){
722
723    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
724
725    lp_port->pause_traffic(port_id);
726}
727
728void
729TrexStatelessDpCore::update_traffic(uint8_t port_id, double factor) {
730
731    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
732
733    lp_port->update_traffic(port_id, factor);
734}
735
736
737void
738TrexStatelessDpCore::stop_traffic(uint8_t port_id,
739                                  bool stop_on_id,
740                                  int event_id) {
741    /* we cannot remove nodes not from the top of the queue so
742       for every active node - make sure next time
743       the scheduler invokes it, it will be free */
744
745    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
746
747    if ( lp_port->stop_traffic(port_id,stop_on_id,event_id) == false){
748        /* nothing to do ! already stopped */
749        //printf(" skip .. %f\n",m_core->m_cur_time_sec);
750        return;
751    }
752
753#if 0
754    if ( are_all_ports_idle() ) {
755        /* just a place holder if we will need to do somthing in that case */
756    }
757#endif
758
759    /* inform the control plane we stopped - this might be a async stop
760       (streams ended)
761     */
762    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
763    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
764                                                                   port_id,
765                                                                   TrexDpPortEvent::EVENT_STOP,
766                                                                   lp_port->get_event_id());
767    ring->Enqueue((CGenNode *)event_msg);
768
769}
770
771/**
772 * handle a message from CP to DP
773 *
774 */
775void
776TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) {
777    msg->handle(this);
778    delete msg;
779}
780
781