trex_stateless_dp_core.cpp revision 17d58dba
1/*
2 Itay Marom
3 Hanoch Haim
4 Cisco Systems, Inc.
5*/
6
7/*
8Copyright (c) 2015-2016 Cisco Systems, Inc.
9
10Licensed under the Apache License, Version 2.0 (the "License");
11you may not use this file except in compliance with the License.
12You may obtain a copy of the License at
13
14    http://www.apache.org/licenses/LICENSE-2.0
15
16Unless required by applicable law or agreed to in writing, software
17distributed under the License is distributed on an "AS IS" BASIS,
18WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19See the License for the specific language governing permissions and
20limitations under the License.
21*/
22#include "bp_sim.h"
23#include "trex_stateless_dp_core.h"
24#include "trex_stateless_messaging.h"
25#include "trex_stream.h"
26#include "trex_stream_node.h"
27#include "trex_streams_compiler.h"
28#include "mbuf.h"
29
30
31class DPCoreWrapper : public CVirtualIF {
32public:
33
34    DPCoreWrapper() {
35        m_wrapped = nullptr;
36    }
37
38    void set_wrapped_object(CVirtualIF *wrapped) {
39        m_wrapped = wrapped;
40    }
41
42    CVirtualIF *get_wrapped_object() const {
43        return m_wrapped;
44    }
45
46    virtual int close_file(void) {
47        return m_wrapped->close_file();
48    }
49
50    virtual int flush_tx_queue(void) {
51        return m_wrapped->flush_tx_queue();
52    }
53
54    virtual int open_file(std::string file_name) {
55        return m_wrapped->open_file(file_name);
56    }
57
58    /* move to service mode */
59    virtual int send_node(CGenNode *node) {
60        return m_wrapped->send_node_service_mode(node);
61    }
62
63    virtual int update_mac_addr_from_global_cfg(pkt_dir_t dir, uint8_t *p) {
64        return m_wrapped->update_mac_addr_from_global_cfg(dir, p);
65    }
66
67    virtual pkt_dir_t port_id_to_dir(uint8_t port_id) {
68        return m_wrapped->port_id_to_dir(port_id);
69    }
70
71    virtual void send_one_pkt(pkt_dir_t dir, rte_mbuf_t *m) {
72        m_wrapped->send_one_pkt(dir, m);
73    }
74
75private:
76    CVirtualIF *m_wrapped;
77};
78
79
80void CGenNodeStateless::cache_mbuf_array_init(){
81    m_cache_size=0;
82    m_cache_array_cnt=0;
83}
84
85
86
87void CGenNodeStateless::cache_mbuf_array_copy(CGenNodeCacheMbuf *obj,
88                                              uint16_t size){
89
90    int i;
91    cache_mbuf_array_alloc(size);
92    for (i=0; i<size; i++) {
93        cache_mbuf_array_set(i,obj->m_array[i]);
94    }
95    cache_mbuf_array_set_const_mbuf(obj->m_mbuf_const);
96}
97
98
99rte_mbuf_t ** CGenNodeStateless::cache_mbuf_array_alloc(uint16_t size){
100
101    uint32_t buf_size = CGenNodeCacheMbuf::get_object_size(size);
102    /* TBD  replace with align, zero API */
103    m_cache_mbuf = (void *)malloc(buf_size);
104    assert(m_cache_mbuf);
105    memset(m_cache_mbuf,0,buf_size);
106
107    m_flags |= SL_NODE_CONST_MBUF_CACHE_ARRAY;
108    m_cache_size=size;
109    m_cache_array_cnt=0;
110    return ((rte_mbuf_t **)m_cache_mbuf);
111}
112
113void CGenNodeStateless::cache_mbuf_array_free(){
114
115    assert(m_cache_mbuf);
116    int i;
117    for (i=0; i<(int)m_cache_size; i++) {
118        rte_mbuf_t * m=cache_mbuf_array_get((uint16_t)i);
119        assert(m);
120        rte_pktmbuf_free(m);
121    }
122
123    /* free the const */
124    rte_mbuf_t * m=cache_mbuf_array_get_const_mbuf() ;
125    if (m) {
126        rte_pktmbuf_free(m);
127    }
128
129    free(m_cache_mbuf);
130    m_cache_mbuf=0;
131}
132
133
134rte_mbuf_t * CGenNodeStateless::cache_mbuf_array_get(uint16_t index){
135
136    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
137    return (p->m_array[index]);
138}
139
140void CGenNodeStateless::cache_mbuf_array_set_const_mbuf(rte_mbuf_t * m){
141    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
142    p->m_mbuf_const=m;
143}
144
145rte_mbuf_t * CGenNodeStateless::cache_mbuf_array_get_const_mbuf(){
146    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
147    return (p->m_mbuf_const);
148}
149
150
151void CGenNodeStateless::cache_mbuf_array_set(uint16_t index,
152                                             rte_mbuf_t * m){
153    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
154    p->m_array[index]=m;
155}
156
157
158void CDpOneStream::Delete(CFlowGenListPerThread   * core){
159    assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE);
160    core->free_node((CGenNode *)m_node);
161    delete m_dp_stream;
162    m_node=0;
163    m_dp_stream=0;
164}
165
166void CDpOneStream::DeleteOnlyStream(){
167    assert(m_dp_stream);
168    delete m_dp_stream;
169    m_dp_stream=0;
170}
171
172int CGenNodeStateless::get_stream_id(){
173    if (m_state ==CGenNodeStateless::ss_FREE_RESUSE) {
174        return (-1); // not valid
175    }
176    assert(m_ref_stream_info);
177    return ((int)m_ref_stream_info->m_stream_id);
178}
179
180
181void CGenNodeStateless::DumpHeader(FILE *fd){
182    fprintf(fd," pkt_id, time, port , action , state, stream_id , stype , m-burst# , burst# \n");
183
184}
185void CGenNodeStateless::Dump(FILE *fd){
186    fprintf(fd," %2.4f, %3lu, %s,%s, %3d, %s, %3lu, %3lu  \n",
187            m_time,
188            (ulong)m_port_id,
189            "s-pkt", //action
190            get_stream_state_str(m_state ).c_str(),
191            get_stream_id(),   //stream_id
192            TrexStream::get_stream_type_str(m_stream_type).c_str(), //stype
193            (ulong)m_multi_bursts,
194            (ulong)m_single_burst
195            );
196}
197
198
199void CGenNodeStateless::generate_random_seed() {
200    /* seed can be provided by the user */
201    uint32_t unique_seed;
202    if (m_ref_stream_info->m_random_seed) {
203        unique_seed = m_ref_stream_info->m_random_seed;
204    } else {
205        unsigned int tmp = (unsigned int)time(NULL);
206        unique_seed = rand_r(&tmp);
207    }
208
209    /* per thread divergence */
210    unique_seed = (unique_seed * ( (m_thread_id + 1) * 514229 ) ) & 0xFFFFFFFF;
211
212    /* set random */
213    set_random_seed(unique_seed);
214}
215
216
217void CGenNodeStateless::refresh_vm_bss() {
218    if ( m_vm_flow_var ) {
219        StreamVmDp  * vm_s=m_ref_stream_info->m_vm_dp;
220        assert(vm_s);
221        memcpy(m_vm_flow_var,vm_s->get_bss(),vm_s->get_bss_size());
222
223        if ( vm_s->is_random_seed() ) {
224            generate_random_seed();
225        }
226
227    }
228}
229
230
231
232/**
233 * this function called when stream restart after it was inactive
234 */
235void CGenNodeStateless::refresh(){
236
237    /* refill the stream info */
238    m_single_burst    = m_single_burst_refill;
239    m_multi_bursts    = m_ref_stream_info->m_num_bursts;
240    m_state           = CGenNodeStateless::ss_ACTIVE;
241
242    /* refresh init value */
243#if 0
244    /* TBD should add a JSON varible for that */
245    refresh_vm_bss();
246#endif
247}
248
249
250void CGenNodeCommand::free_command(){
251
252    assert(m_cmd);
253    m_cmd->on_node_remove();
254    delete m_cmd;
255}
256
257
258std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state){
259    std::string res;
260
261    switch (stream_state) {
262    case CGenNodeStateless::ss_FREE_RESUSE :
263         res="FREE    ";
264        break;
265    case CGenNodeStateless::ss_INACTIVE :
266        res="INACTIVE ";
267        break;
268    case CGenNodeStateless::ss_ACTIVE :
269        res="ACTIVE   ";
270        break;
271    default:
272        res="Unknow   ";
273    };
274    return(res);
275}
276
277/*
278 * Allocate mbuf for flow stat (and latency) info sending
279 * m - Original mbuf (can be complicated mbuf data structure)
280 * fsp_head - return pointer in which the flow stat info should be filled
281 * is_const - is the given mbuf const
282 * return new mbuf structure in which the fsp_head can be written. If needed, orginal mbuf is freed.
283 */
284rte_mbuf_t * CGenNodeStateless::alloc_flow_stat_mbuf(rte_mbuf_t *m, struct flow_stat_payload_header *&fsp_head
285                                                     , bool is_const) {
286    rte_mbuf_t *m_ret = NULL, *m_lat = NULL;
287    uint16_t fsp_head_size = sizeof(struct flow_stat_payload_header);
288
289    if (is_const) {
290        // const mbuf case
291        if (rte_pktmbuf_data_len(m) > 128) {
292            m_ret = CGlobalInfo::pktmbuf_alloc_small(get_socket_id());
293            assert(m_ret);
294            // alloc mbuf just for the latency header
295            m_lat = CGlobalInfo::pktmbuf_alloc( get_socket_id(), fsp_head_size);
296            assert(m_lat);
297            fsp_head = (struct flow_stat_payload_header *)rte_pktmbuf_append(m_lat, fsp_head_size);
298            rte_pktmbuf_attach(m_ret, m);
299            rte_pktmbuf_trim(m_ret, sizeof(struct flow_stat_payload_header));
300            utl_rte_pktmbuf_add_after2(m_ret, m_lat);
301            // ref count was updated when we took the (const) mbuf, and again in rte_pktmbuf_attach
302            // so need do decrease now, to avoid leak.
303            rte_pktmbuf_refcnt_update(m, -1);
304            return m_ret;
305        } else {
306            // Short packet. Just copy all bytes.
307            m_ret = CGlobalInfo::pktmbuf_alloc( get_socket_id(), rte_pktmbuf_data_len(m) );
308            assert(m_ret);
309            char *p = rte_pktmbuf_mtod(m, char*);
310            char *p_new = rte_pktmbuf_append(m_ret, rte_pktmbuf_data_len(m));
311            memcpy(p_new , p, rte_pktmbuf_data_len(m));
312            fsp_head = (struct flow_stat_payload_header *)(p_new + rte_pktmbuf_data_len(m) - fsp_head_size);
313            rte_pktmbuf_free(m);
314            return m_ret;
315        }
316    } else {
317        // Field engine (vm)
318        if (rte_pktmbuf_is_contiguous(m)) {
319            // one, r/w mbuf
320            char *p = rte_pktmbuf_mtod(m, char*);
321            fsp_head = (struct flow_stat_payload_header *)(p + rte_pktmbuf_data_len(m) - fsp_head_size);
322            return m;
323        } else {
324            // We have: r/w --> read only.
325            // Changing to:
326            // (original) r/w -> (new) indirect (direct is original read_only, after trimming last bytes) -> (new) latency info
327            rte_mbuf_t *m_read_only = m->next, *m_indirect;
328
329            m_indirect = CGlobalInfo::pktmbuf_alloc_small(get_socket_id());
330            assert(m_indirect);
331            // alloc mbuf just for the latency header
332            m_lat = CGlobalInfo::pktmbuf_alloc( get_socket_id(), fsp_head_size);
333            assert(m_lat);
334            fsp_head = (struct flow_stat_payload_header *)rte_pktmbuf_append(m_lat, fsp_head_size);
335            utl_rte_pktmbuf_chain_with_indirect(m, m_indirect, m_read_only, m_lat);
336            m_indirect->data_len = (uint16_t)(m_indirect->data_len - fsp_head_size);
337            return m;
338        }
339    }
340}
341
342// test the const case of alloc_flow_stat_mbuf. The more complicated non const case is tested in the simulation.
343bool CGenNodeStateless::alloc_flow_stat_mbuf_test_const() {
344    rte_mbuf_t *m, *m_test;
345    uint16_t sizes[2] = {64, 500};
346    uint16_t size;
347    struct flow_stat_payload_header *fsp_head;
348    char *p;
349
350    set_socket_id(0);
351    for (int test_num = 0; test_num < sizeof(sizes)/sizeof(sizes[0]); test_num++) {
352        size = sizes[test_num];
353        m = CGlobalInfo::pktmbuf_alloc(get_socket_id(), size);
354        p = rte_pktmbuf_append(m, size);
355        for (int i = 0; i < size; i++) {
356            p[i] = (char)i;
357        }
358        m_test = alloc_flow_stat_mbuf(m, fsp_head, true);
359        p = rte_pktmbuf_mtod(m_test, char*);
360        assert(rte_pktmbuf_pkt_len(m_test) == size);
361        for (int i = 0; i < rte_pktmbuf_pkt_len(m_test) - sizeof(*fsp_head); i++) {
362            assert(p[i] == (char)i);
363        }
364        // verify fsp_head points correctly
365        if (size > 128) { // should match threshould in alloc_flow_stat_mbuf
366            assert(rte_pktmbuf_data_len(m_test) == size - sizeof(*fsp_head));
367            assert(rte_pktmbuf_data_len(m_test->next) == sizeof(*fsp_head));
368            assert((char *)fsp_head == rte_pktmbuf_mtod((m_test->next), char*));
369        } else {
370            assert(rte_pktmbuf_data_len(m_test) == size);
371            assert (((char *)fsp_head) + sizeof (*fsp_head) == p + rte_pktmbuf_data_len(m_test));
372        }
373        rte_pktmbuf_free(m_test);
374    }
375    return true;
376}
377
378rte_mbuf_t   * CGenNodeStateless::alloc_node_with_vm(){
379
380    rte_mbuf_t        * m;
381    /* alloc small packet buffer*/
382    uint16_t prefix_size = prefix_header_size();
383    m = CGlobalInfo::pktmbuf_alloc( get_socket_id(), prefix_size );
384    if (m==0) {
385        return (m);
386    }
387    /* TBD remove this, should handle cases of error */
388    assert(m);
389    char *p=rte_pktmbuf_append(m, prefix_size);
390    memcpy( p ,m_original_packet_data_prefix, prefix_size);
391
392
393    /* run the VM program */
394    StreamDPVmInstructionsRunner runner;
395    runner.set_mbuf(m);
396
397    runner.run( (uint32_t*)m_vm_flow_var,
398                m_vm_program_size,
399                m_vm_program,
400                m_vm_flow_var,
401                (uint8_t*)p);
402
403    uint16_t pkt_new_size=runner.get_new_pkt_size();
404    if ( likely( pkt_new_size == 0) ) {
405        /* no packet size change */
406        rte_mbuf_t * m_const = get_const_mbuf();
407        if (  m_const != NULL) {
408            utl_rte_pktmbuf_add_after(m,m_const);
409        }
410        return (m);
411    }
412
413    /* packet size change there are a few changes */
414    rte_mbuf_t * m_const = get_const_mbuf();
415    if ( (m_const == 0 ) || (pkt_new_size<=prefix_size) ) {
416        /* one mbuf , just trim it */
417        m->data_len = pkt_new_size;
418        m->pkt_len  = pkt_new_size;
419        return (m);
420    }
421
422    rte_mbuf_t * mi= CGlobalInfo::pktmbuf_alloc_small(get_socket_id());
423    assert(mi);
424    rte_pktmbuf_attach(mi,m_const);
425    utl_rte_pktmbuf_add_after2(m,mi);
426
427    if ( pkt_new_size < m->pkt_len) {
428        /* need to trim it */
429        mi->data_len = (pkt_new_size - prefix_size);
430        m->pkt_len   = pkt_new_size;
431    }
432    return (m);
433}
434
435void CGenNodeStateless::free_stl_vm_buf(){
436        rte_mbuf_t * m ;
437         m=get_const_mbuf();
438         if (m) {
439             rte_pktmbuf_free(m); /* reduce the ref counter */
440             /* clear the const marker */
441             clear_const_mbuf();
442         }
443
444         free_prefix_header();
445
446         if (m_vm_flow_var) {
447             /* free flow var */
448             free(m_vm_flow_var);
449             m_vm_flow_var=0;
450         }
451}
452
453
454
455void CGenNodeStateless::free_stl_node(){
456
457    if ( is_cache_mbuf_array() ){
458        /* do we have cache of mbuf pre allocated */
459        cache_mbuf_array_free();
460    }else{
461        /* if we have cache mbuf free it */
462        rte_mbuf_t * m=get_cache_mbuf();
463        if (m) {
464                rte_pktmbuf_free(m);
465                m_cache_mbuf=0;
466        }
467    }
468    free_stl_vm_buf();
469}
470
471
472bool TrexStatelessDpPerPort::update_number_of_active_streams(uint32_t d){
473    m_active_streams-=d; /* reduce the number of streams */
474    if (m_active_streams == 0) {
475        return (true);
476    }
477    return (false);
478}
479
480bool TrexStatelessDpPerPort::resume_traffic(uint8_t port_id){
481
482    /* we are working with continues streams so we must be in transmit mode */
483    assert(m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE);
484
485    for (auto dp_stream : m_active_nodes) {
486        CGenNodeStateless * node =dp_stream.m_node;
487        assert(node->get_port_id() == port_id);
488        assert(node->is_pause() == true);
489        node->set_pause(false);
490    }
491    m_state = TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
492    return (true);
493}
494
495bool TrexStatelessDpPerPort::update_traffic(uint8_t port_id, double factor) {
496
497    assert( (m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING ||
498            (m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE)) );
499
500    for (auto dp_stream : m_active_nodes) {
501        CGenNodeStateless * node = dp_stream.m_node;
502        assert(node->get_port_id() == port_id);
503
504        node->update_rate(factor);
505    }
506
507    return (true);
508}
509
510bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){
511
512    /* we are working with continues streams so we must be in transmit mode */
513    assert(m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING);
514
515    for (auto dp_stream : m_active_nodes) {
516        CGenNodeStateless * node =dp_stream.m_node;
517        assert(node->get_port_id() == port_id);
518        assert(node->is_pause() == false);
519        node->set_pause(true);
520    }
521    m_state = TrexStatelessDpPerPort::ppSTATE_PAUSE;
522    return (true);
523}
524
525bool TrexStatelessDpPerPort::push_pcap(uint8_t port_id,
526                                       const std::string &pcap_filename,
527                                       double ipg_usec,
528                                       double min_ipg_sec,
529                                       double speedup,
530                                       uint32_t count,
531                                       bool is_dual) {
532
533    /* push pcap can only happen on an idle port from the core prespective */
534    assert(m_state == TrexStatelessDpPerPort::ppSTATE_IDLE);
535
536    CGenNodePCAP *pcap_node = m_core->allocate_pcap_node();
537    if (!pcap_node) {
538        return (false);
539    }
540
541    pkt_dir_t dir          = m_core->m_node_gen.m_v_if->port_id_to_dir(port_id);
542    socket_id_t socket_id  = m_core->m_node_gen.m_socket_id;
543
544    /* main port */
545    uint8_t mac_addr[12];
546    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, mac_addr);
547
548    /* for dual */
549    uint8_t slave_mac_addr[12];
550    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir ^ 0x1, slave_mac_addr);
551
552    bool rc = pcap_node->create(port_id,
553                                dir,
554                                socket_id,
555                                mac_addr,
556                                slave_mac_addr,
557                                pcap_filename,
558                                ipg_usec,
559                                min_ipg_sec,
560                                speedup,
561                                count,
562                                is_dual);
563    if (!rc) {
564        m_core->free_node((CGenNode *)pcap_node);
565        return (false);
566    }
567
568    /* schedule the node for now */
569    pcap_node->m_time = m_core->m_cur_time_sec;
570    m_core->m_node_gen.add_node((CGenNode *)pcap_node);
571
572    /* hold a pointer to the node */
573    assert(m_active_pcap_node == NULL);
574    m_active_pcap_node = pcap_node;
575
576    m_state = TrexStatelessDpPerPort::ppSTATE_PCAP_TX;
577    return (true);
578}
579
580
581bool TrexStatelessDpPerPort::stop_traffic(uint8_t  port_id,
582                                          bool     stop_on_id,
583                                          int      event_id){
584
585
586    if (m_state == TrexStatelessDpPerPort::ppSTATE_IDLE) {
587        assert(m_active_streams==0);
588        return false;
589    }
590
591    /* there could be race of stop after stop */
592    if ( stop_on_id ) {
593        if (event_id != m_event_id){
594            /* we can't stop it is an old message */
595            return false;
596        }
597    }
598
599    for (auto dp_stream : m_active_nodes) {
600        CGenNodeStateless * node =dp_stream.m_node;
601        assert(node->get_port_id() == port_id);
602        if ( node->get_state() == CGenNodeStateless::ss_ACTIVE) {
603            node->mark_for_free();
604            m_active_streams--;
605            dp_stream.DeleteOnlyStream();
606
607        }else{
608            dp_stream.Delete(m_core);
609        }
610    }
611
612    /* check for active PCAP node */
613    if (m_active_pcap_node) {
614        /* when got async stop from outside or duration */
615        if (m_active_pcap_node->is_active()) {
616            m_active_pcap_node->mark_for_free();
617        } else {
618            /* graceful stop - node was put out by the scheduler */
619            m_core->free_node( (CGenNode *)m_active_pcap_node);
620        }
621
622        m_active_pcap_node = NULL;
623    }
624
625    /* active stream should be zero */
626    assert(m_active_streams==0);
627    m_active_nodes.clear();
628    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
629    return (true);
630}
631
632
633void TrexStatelessDpPerPort::create(CFlowGenListPerThread   *  core){
634    m_core=core;
635    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
636    m_active_streams=0;
637    m_active_nodes.clear();
638    m_active_pcap_node = NULL;
639}
640
641
642TrexStatelessDpCore::TrexStatelessDpCore() {
643    m_thread_id       = 0;
644    m_core            = NULL;
645    m_duration        = -1;
646    m_is_service_mode = NULL;
647    m_wrapper         = new DPCoreWrapper();
648}
649
650TrexStatelessDpCore::~TrexStatelessDpCore() {
651    delete m_wrapper;
652}
653
654
655void
656TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) {
657    m_thread_id = thread_id;
658    m_core = core;
659    m_local_port_offset = 2*core->getDualPortId();
660
661    CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
662
663    m_ring_from_cp = cp_dp->getRingCpToDp(thread_id);
664    m_ring_to_cp   = cp_dp->getRingDpToCp(thread_id);
665
666    m_state = STATE_IDLE;
667
668    int i;
669    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
670        m_ports[i].create(core);
671    }
672}
673
674
675/* move to the next stream, old stream move to INACTIVE */
676bool TrexStatelessDpCore::set_stateless_next_node(CGenNodeStateless * cur_node,
677                                                  CGenNodeStateless * next_node){
678
679    assert(cur_node);
680    TrexStatelessDpPerPort * lp_port = get_port_db(cur_node->m_port_id);
681    bool schedule =false;
682
683    bool to_stop_port=false;
684
685    if (next_node == NULL) {
686        /* there is no next stream , reduce the number of active streams*/
687        to_stop_port = lp_port->update_number_of_active_streams(1);
688
689    }else{
690        uint8_t state=next_node->get_state();
691
692        /* can't be FREE_RESUSE */
693        assert(state != CGenNodeStateless::ss_FREE_RESUSE);
694        if (state == CGenNodeStateless::ss_INACTIVE ) {
695
696            if (cur_node->m_action_counter > 0) {
697                cur_node->m_action_counter--;
698                if (cur_node->m_action_counter==0) {
699                    to_stop_port = lp_port->update_number_of_active_streams(1);
700                }else{
701                    /* refill start info and scedule, no update in active streams  */
702                    next_node->refresh();
703                    schedule = true;
704                }
705            }else{
706                /* refill start info and scedule, no update in active streams  */
707                next_node->refresh();
708                schedule = true;
709            }
710
711        }else{
712            to_stop_port = lp_port->update_number_of_active_streams(1);
713        }
714    }
715
716    if ( to_stop_port ) {
717        /* call stop port explictly to move the state */
718        stop_traffic(cur_node->m_port_id,false,0);
719    }
720
721    return ( schedule );
722}
723
724
725
726/**
727 * in idle state loop, the processor most of the time sleeps
728 * and periodically checks for messages
729 *
730 * @author imarom (01-Nov-15)
731 */
732void
733TrexStatelessDpCore::idle_state_loop() {
734
735    const int SHORT_DELAY_MS    = 2;
736    const int LONG_DELAY_MS     = 50;
737    const int DEEP_SLEEP_LIMIT  = 2000;
738
739    int counter = 0;
740
741    while (m_state == STATE_IDLE) {
742        m_core->tickle();
743
744        bool had_msg = m_core->check_msgs();
745        if (had_msg) {
746            counter = 0;
747            continue;
748        }
749
750        /* enter deep sleep only if enough time had passed */
751        if (counter < DEEP_SLEEP_LIMIT) {
752            delay(SHORT_DELAY_MS);
753            counter++;
754        } else {
755            delay(LONG_DELAY_MS);
756        }
757
758    }
759}
760
761
762
763void TrexStatelessDpCore::quit_main_loop(){
764    m_core->set_terminate_mode(true); /* mark it as terminated */
765    m_state = STATE_TERMINATE;
766    add_global_duration(0.0001);
767}
768
769
770/**
771 * scehduler runs when traffic exists
772 * it will return when no more transmitting is done on this
773 * core
774 *
775 * @author imarom (01-Nov-15)
776 */
777void
778TrexStatelessDpCore::start_scheduler() {
779
780    /* creates a maintenace job using the scheduler */
781    CGenNode * node_sync = m_core->create_node() ;
782    node_sync->m_type = CGenNode::FLOW_SYNC;
783    node_sync->m_time = m_core->m_cur_time_sec + SYNC_TIME_OUT;
784
785    m_core->m_node_gen.add_node(node_sync);
786
787    double old_offset = 0.0;
788    m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset);
789    /* bail out in case of terminate */
790    if (m_state != TrexStatelessDpCore::STATE_TERMINATE) {
791        m_core->m_node_gen.close_file(m_core);
792        m_state = STATE_IDLE; /* we exit from all ports and we have nothing to do, we move to IDLE state */
793    }
794}
795
796
797void
798TrexStatelessDpCore::run_once(){
799
800    idle_state_loop();
801
802    if ( m_state == STATE_TERMINATE ){
803        return;
804    }
805
806    start_scheduler();
807}
808
809
810
811
812void
813TrexStatelessDpCore::start() {
814
815    while (true) {
816        run_once();
817
818        if ( m_core->is_terminated_by_master() ) {
819            break;
820        }
821    }
822}
823
824/* only if both port are idle we can exit */
825void
826TrexStatelessDpCore::schedule_exit(){
827
828    CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
829
830    node->m_type = CGenNode::COMMAND;
831
832    node->m_cmd = new TrexStatelessDpCanQuit();
833
834    /* make sure it will be scheduled after the current node */
835    node->m_time = m_core->m_cur_time_sec ;
836
837    m_core->m_node_gen.add_node((CGenNode *)node);
838}
839
840
841void
842TrexStatelessDpCore::add_global_duration(double duration){
843    if (duration > 0.0) {
844        CGenNode *node = m_core->create_node() ;
845
846        node->m_type = CGenNode::EXIT_SCHED;
847
848        /* make sure it will be scheduled after the current node */
849        node->m_time = m_core->m_cur_time_sec + duration ;
850
851        m_core->m_node_gen.add_node(node);
852    }
853}
854
855/* add per port exit */
856void
857TrexStatelessDpCore::add_port_duration(double duration,
858                                       uint8_t port_id,
859                                       int event_id){
860    if (duration > 0.0) {
861        CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
862
863        node->m_type = CGenNode::COMMAND;
864
865        /* make sure it will be scheduled after the current node */
866        node->m_time = m_core->m_cur_time_sec + duration ;
867
868        TrexStatelessDpStop * cmd=new TrexStatelessDpStop(port_id);
869
870
871        /* test this */
872        m_core->m_non_active_nodes++;
873        cmd->set_core_ptr(m_core);
874        cmd->set_event_id(event_id);
875        cmd->set_wait_for_event_id(true);
876
877        node->m_cmd = cmd;
878
879        m_core->m_node_gen.add_node((CGenNode *)node);
880    }
881}
882
883
884void TrexStatelessDpCore::update_mac_addr(TrexStream * stream,
885                                          CGenNodeStateless *node,
886                                          pkt_dir_t dir,
887                                          char *raw_pkt){
888    bool              ov_src = stream->get_override_src_mac_by_pkt_data();
889    TrexStream::stream_dst_mac_t  ov_dst = stream->get_override_dst_mac_mode();
890
891
892    if ( (ov_src == true) && (ov_dst == TrexStream::stPKT) ) {
893        /* nothing to do, take from the packet both */
894        return;
895    }
896
897        /* take from cfg_file */
898    if ( (ov_src == false) &&
899         (ov_dst == TrexStream::stCFG_FILE) ){
900
901          m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*)raw_pkt);
902          return;
903    }
904
905    /* save the pkt*/
906    char tmp_pkt[12];
907    memcpy(tmp_pkt,raw_pkt,12);
908
909    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*)raw_pkt);
910
911    if ((ov_src == true) && (ov_dst == TrexStream::stCFG_FILE)) {
912        memcpy(raw_pkt+6,tmp_pkt+6,6);
913    }
914
915    if ((ov_src == false) && (ov_dst == TrexStream::stPKT)) {
916        memcpy(raw_pkt,tmp_pkt,6);
917    }
918}
919
920
921void TrexStatelessDpCore::replay_vm_into_cache(TrexStream * stream,
922                                               CGenNodeStateless *node){
923
924    uint16_t      cache_size = stream->m_cache_size;
925    assert(cache_size>0);
926    rte_mbuf_t * m=0;
927
928    uint32_t buf_size = CGenNodeCacheMbuf::get_object_size(cache_size);
929    CGenNodeCacheMbuf * p = (CGenNodeCacheMbuf *)malloc(buf_size);
930    assert(p);
931    memset(p,0,buf_size);
932
933    int i;
934    for (i=0; i<cache_size; i++) {
935        p->m_array[i] =  node->alloc_node_with_vm();
936    }
937    /* save const */
938    m=node->get_const_mbuf();
939    if (m) {
940        p->m_mbuf_const=m;
941        rte_pktmbuf_refcnt_update(m,1);
942    }
943
944    /* free all VM and const mbuf */
945    node->free_stl_vm_buf();
946
947    /* copy to local node meory */
948    node->cache_mbuf_array_copy(p,cache_size);
949
950    /* free the memory */
951    free(p);
952}
953
954
955void
956TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port,
957                                TrexStream * stream,
958                                TrexStreamsCompiledObj *comp) {
959
960    CGenNodeStateless *node = m_core->create_node_sl();
961
962    node->m_thread_id = m_thread_id;
963    node->cache_mbuf_array_init();
964    node->m_batch_size=0;
965
966    /* add periodic */
967    node->m_cache_mbuf=0;
968    node->m_type = CGenNode::STATELESS_PKT;
969
970    node->m_action_counter = stream->m_action_count;
971
972    /* clone the stream from control plane memory to DP memory */
973    node->m_ref_stream_info = stream->clone();
974    /* no need for this memory anymore on the control plane memory */
975    stream->release_dp_object();
976
977    node->m_next_stream=0; /* will be fixed later */
978
979    if ( stream->m_self_start ){
980        /* if self start it is in active mode */
981        node->m_state =CGenNodeStateless::ss_ACTIVE;
982        lp_port->m_active_streams++;
983    }else{
984        node->m_state =CGenNodeStateless::ss_INACTIVE;
985    }
986
987    node->m_time = m_core->m_cur_time_sec + stream->get_start_delay_sec();
988
989    pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(stream->m_port_id);
990    node->m_flags = 0;
991    node->m_src_port =0;
992    node->m_original_packet_data_prefix = 0;
993
994    if (stream->m_rx_check.m_enabled) {
995        node->set_stat_needed();
996        uint8_t hw_id = stream->m_rx_check.m_hw_id;
997        assert (hw_id < MAX_FLOW_STATS + MAX_FLOW_STATS_PAYLOAD);
998        node->set_stat_hw_id(hw_id);
999        // no support for cache with flow stat payload rules
1000        if ((TrexPlatformApi::driver_stat_cap_e)stream->m_rx_check.m_rule_type == TrexPlatformApi::IF_STAT_PAYLOAD) {
1001            stream->m_cache_size = 0;
1002        }
1003    }
1004
1005    /* set socket id */
1006    node->set_socket_id(m_core->m_node_gen.m_socket_id);
1007
1008    /* build a mbuf from a packet */
1009
1010    uint16_t pkt_size = stream->m_pkt.len;
1011    const uint8_t *stream_pkt = stream->m_pkt.binary;
1012
1013    node->m_pause =0;
1014    node->m_stream_type = stream->m_type;
1015    node->m_next_time_offset = 1.0 / stream->get_pps();
1016    node->m_null_stream = (stream->m_null_stream ? 1 : 0);
1017
1018    /* stateless specific fields */
1019    switch ( stream->m_type ) {
1020
1021    case TrexStream::stCONTINUOUS :
1022        node->m_single_burst=0;
1023        node->m_single_burst_refill=0;
1024        node->m_multi_bursts=0;
1025        break;
1026
1027    case TrexStream::stSINGLE_BURST :
1028        node->m_stream_type             = TrexStream::stMULTI_BURST;
1029        node->m_single_burst            = stream->m_burst_total_pkts;
1030        node->m_single_burst_refill     = stream->m_burst_total_pkts;
1031        node->m_multi_bursts            = 1;  /* single burst in multi burst of 1 */
1032        break;
1033
1034    case TrexStream::stMULTI_BURST :
1035        node->m_single_burst        = stream->m_burst_total_pkts;
1036        node->m_single_burst_refill = stream->m_burst_total_pkts;
1037        node->m_multi_bursts        = stream->m_num_bursts;
1038        break;
1039    default:
1040
1041        assert(0);
1042    };
1043
1044    node->m_port_id = stream->m_port_id;
1045
1046    /* set dir 0 or 1 client or server */
1047    node->set_mbuf_cache_dir(dir);
1048
1049
1050    if (node->m_ref_stream_info->getDpVm() == NULL) {
1051        /* no VM */
1052
1053        node->m_vm_flow_var =  NULL;
1054        node->m_vm_program  =  NULL;
1055        node->m_vm_program_size =0;
1056
1057                /* allocate const mbuf */
1058        rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size);
1059        assert(m);
1060
1061        char *p = rte_pktmbuf_append(m, pkt_size);
1062        assert(p);
1063        /* copy the packet */
1064        memcpy(p,stream_pkt,pkt_size);
1065
1066        update_mac_addr(stream,node,dir,p);
1067
1068        /* set the packet as a readonly */
1069        node->set_cache_mbuf(m);
1070
1071        node->m_original_packet_data_prefix =0;
1072    }else{
1073
1074        /* set the program */
1075        TrexStream * local_mem_stream = node->m_ref_stream_info;
1076
1077        StreamVmDp  * lpDpVm = local_mem_stream->getDpVm();
1078
1079        node->m_vm_flow_var      = lpDpVm->clone_bss(); /* clone the flow var */
1080        node->m_vm_program       = lpDpVm->get_program(); /* same ref to the program */
1081        node->m_vm_program_size  = lpDpVm->get_program_size();
1082
1083        /* generate random seed if needed*/
1084        if (lpDpVm->is_random_seed()) {
1085            node->generate_random_seed();
1086        }
1087
1088        /* we need to copy the object */
1089        if ( pkt_size > lpDpVm->get_prefix_size() ) {
1090            /* we need const packet */
1091            uint16_t const_pkt_size  = pkt_size - lpDpVm->get_prefix_size() ;
1092            rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), const_pkt_size );
1093            assert(m);
1094
1095            char *p = rte_pktmbuf_append(m, const_pkt_size);
1096            assert(p);
1097
1098            /* copy packet data */
1099            memcpy(p,(stream_pkt + lpDpVm->get_prefix_size()),const_pkt_size);
1100
1101            node->set_const_mbuf(m);
1102        }
1103
1104
1105        if ( lpDpVm->is_pkt_size_var() ) {
1106            // mark the node as varible size
1107            node->set_var_pkt_size();
1108        }
1109
1110
1111        if (lpDpVm->get_prefix_size() > pkt_size ) {
1112            lpDpVm->set_prefix_size(pkt_size);
1113        }
1114
1115        /* copy the headr */
1116        uint16_t header_size = lpDpVm->get_prefix_size();
1117        assert(header_size);
1118        node->alloc_prefix_header(header_size);
1119        uint8_t *p=node->m_original_packet_data_prefix;
1120        assert(p);
1121
1122        memcpy(p,stream_pkt , header_size);
1123
1124        update_mac_addr(stream,node,dir,(char *)p);
1125
1126        if (stream->m_cache_size > 0 ) {
1127            /* we need to create cache of objects */
1128            replay_vm_into_cache(stream, node);
1129        }
1130    }
1131
1132
1133    CDpOneStream one_stream;
1134
1135    one_stream.m_dp_stream = node->m_ref_stream_info;
1136    one_stream.m_node =node;
1137
1138    lp_port->m_active_nodes.push_back(one_stream);
1139
1140    /* schedule only if active */
1141    if (node->m_state == CGenNodeStateless::ss_ACTIVE) {
1142        m_core->m_node_gen.add_node((CGenNode *)node);
1143    }
1144}
1145
1146void
1147TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj,
1148                                   double duration,
1149                                   int event_id) {
1150
1151
1152    TrexStatelessDpPerPort * lp_port=get_port_db(obj->get_port_id());
1153    lp_port->m_active_streams = 0;
1154    lp_port->set_event_id(event_id);
1155
1156    /* update cur time */
1157    if ( CGlobalInfo::is_realtime()  ){
1158        m_core->m_cur_time_sec = now_sec() + SCHD_OFFSET_DTIME ;
1159    }
1160
1161    /* no nodes in the list */
1162    assert(lp_port->m_active_nodes.size()==0);
1163
1164    for (auto single_stream : obj->get_objects()) {
1165        /* all commands should be for the same port */
1166        assert(obj->get_port_id() == single_stream.m_stream->m_port_id);
1167        add_stream(lp_port,single_stream.m_stream,obj);
1168    }
1169
1170    uint32_t nodes = lp_port->m_active_nodes.size();
1171    /* find next stream */
1172    assert(nodes == obj->get_objects().size());
1173
1174    int cnt=0;
1175
1176    /* set the next_stream pointer  */
1177    for (auto single_stream : obj->get_objects()) {
1178
1179        if (single_stream.m_stream->is_dp_next_stream() ) {
1180            int stream_id = single_stream.m_stream->m_next_stream_id;
1181            assert(stream_id<nodes);
1182            /* point to the next stream , stream_id is fixed */
1183            lp_port->m_active_nodes[cnt].m_node->m_next_stream = lp_port->m_active_nodes[stream_id].m_node ;
1184        }
1185        cnt++;
1186    }
1187
1188    lp_port->m_state =TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
1189    m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
1190
1191
1192    if ( duration > 0.0 ){
1193        add_port_duration( duration ,obj->get_port_id(),event_id );
1194    }
1195
1196}
1197
1198
1199bool TrexStatelessDpCore::are_all_ports_idle(){
1200
1201    bool res=true;
1202    int i;
1203    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
1204        if ( m_ports[i].m_state != TrexStatelessDpPerPort::ppSTATE_IDLE ){
1205            res=false;
1206        }
1207    }
1208    return (res);
1209}
1210
1211
1212void
1213TrexStatelessDpCore::resume_traffic(uint8_t port_id){
1214
1215    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1216
1217    lp_port->resume_traffic(port_id);
1218}
1219
1220
1221void
1222TrexStatelessDpCore::pause_traffic(uint8_t port_id){
1223
1224    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1225
1226    lp_port->pause_traffic(port_id);
1227}
1228
1229void
1230TrexStatelessDpCore::push_pcap(uint8_t port_id,
1231                               int event_id,
1232                               const std::string &pcap_filename,
1233                               double ipg_usec,
1234                               double m_min_ipg_sec,
1235                               double speedup,
1236                               uint32_t count,
1237                               double duration,
1238                               bool is_dual) {
1239
1240    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1241
1242    lp_port->set_event_id(event_id);
1243
1244    /* delegate the command to the port */
1245    bool rc = lp_port->push_pcap(port_id, pcap_filename, ipg_usec, m_min_ipg_sec, speedup, count, is_dual);
1246    if (!rc) {
1247        /* report back that we stopped */
1248        CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
1249        TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
1250                                                                       port_id,
1251                                                                       event_id,
1252                                                                       false);
1253        ring->Enqueue((CGenNode *)event_msg);
1254        return;
1255    }
1256
1257
1258    if (duration > 0.0) {
1259        add_port_duration(duration, port_id, event_id);
1260    }
1261
1262     m_state = TrexStatelessDpCore::STATE_PCAP_TX;
1263}
1264
1265void
1266TrexStatelessDpCore::update_traffic(uint8_t port_id, double factor) {
1267
1268    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1269
1270    lp_port->update_traffic(port_id, factor);
1271}
1272
1273
1274void
1275TrexStatelessDpCore::stop_traffic(uint8_t  port_id,
1276                                  bool     stop_on_id,
1277                                  int      event_id) {
1278    /* we cannot remove nodes not from the top of the queue so
1279       for every active node - make sure next time
1280       the scheduler invokes it, it will be free */
1281
1282    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1283    if ( lp_port->stop_traffic(port_id,stop_on_id,event_id) == false){
1284        return;
1285    }
1286
1287    /* flush the TX queue before sending done message to the CP */
1288    m_core->flush_tx_queue();
1289
1290    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
1291    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
1292                                                                   port_id,
1293                                                                   lp_port->get_event_id());
1294    ring->Enqueue((CGenNode *)event_msg);
1295
1296}
1297
1298/**
1299 * handle a message from CP to DP
1300 *
1301 */
1302void
1303TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) {
1304    msg->handle(this);
1305    delete msg;
1306}
1307
1308void
1309TrexStatelessDpCore::barrier(uint8_t port_id, int event_id) {
1310
1311    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
1312    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
1313                                                                   port_id,
1314                                                                   event_id);
1315    ring->Enqueue((CGenNode *)event_msg);
1316}
1317
1318void
1319TrexStatelessDpCore::set_service_mode(uint8_t port_id, bool enabled) {
1320    /* ignore the same message */
1321    if (enabled == m_is_service_mode) {
1322        return;
1323    }
1324
1325    if (enabled) {
1326        /* sanity */
1327        assert(m_core->m_node_gen.m_v_if != m_wrapper);
1328
1329        /* set the wrapper object and make the VIF point to it */
1330        m_wrapper->set_wrapped_object(m_core->m_node_gen.m_v_if);
1331        m_core->m_node_gen.m_v_if = m_wrapper;
1332        m_is_service_mode = true;
1333
1334    } else {
1335        /* sanity */
1336        assert(m_core->m_node_gen.m_v_if == m_wrapper);
1337
1338        /* restore the wrapped object and make the VIF point to it */
1339        m_core->m_node_gen.m_v_if = m_wrapper->get_wrapped_object();
1340        m_is_service_mode = false;
1341    }
1342}
1343
1344
1345/**
1346 * PCAP node
1347 */
1348bool CGenNodePCAP::create(uint8_t port_id,
1349                          pkt_dir_t dir,
1350                          socket_id_t socket_id,
1351                          const uint8_t *mac_addr,
1352                          const uint8_t *slave_mac_addr,
1353                          const std::string &pcap_filename,
1354                          double ipg_usec,
1355                          double min_ipg_sec,
1356                          double speedup,
1357                          uint32_t count,
1358                          bool is_dual) {
1359    std::stringstream ss;
1360
1361    m_type       = CGenNode::PCAP_PKT;
1362    m_flags      = 0;
1363    m_src_port   = 0;
1364    m_port_id    = port_id;
1365    m_count      = count;
1366    m_is_dual    = is_dual;
1367    m_dir        = dir;
1368    m_min_ipg_sec    = min_ipg_sec;
1369
1370    /* increase timeout of WD due to io */
1371    TrexWatchDog::IOFunction::io_begin();
1372
1373    /* mark this node as slow path */
1374    set_slow_path(true);
1375
1376    if (ipg_usec != -1) {
1377        /* fixed IPG */
1378        m_ipg_sec = std::max(min_ipg_sec, usec_to_sec(ipg_usec / speedup));
1379        m_speedup = 0;
1380    } else {
1381        /* packet IPG */
1382        m_ipg_sec = -1;
1383        m_speedup  = speedup;
1384    }
1385
1386    /* copy MAC addr info */
1387    memcpy(m_mac_addr, mac_addr, 12);
1388    memcpy(m_slave_mac_addr, slave_mac_addr, 12);
1389
1390
1391    set_socket_id(socket_id);
1392
1393    /* create the PCAP reader */
1394    m_reader = CCapReaderFactory::CreateReader((char *)pcap_filename.c_str(), 0, ss);
1395    if (!m_reader) {
1396        return false;
1397    }
1398
1399    m_raw_packet = new CCapPktRaw();
1400    if ( m_reader->ReadPacket(m_raw_packet) == false ){
1401        return false;
1402    }
1403
1404    /* set the dir */
1405    set_mbuf_dir(dir);
1406
1407    /* update the direction (for dual mode) */
1408    update_pkt_dir();
1409
1410    /* this is the reference time */
1411    m_last_pkt_time = m_raw_packet->get_time();
1412
1413    /* ready */
1414    m_state = PCAP_ACTIVE;
1415
1416    return true;
1417}
1418
1419/**
1420 * cleanup for PCAP node
1421 *
1422 * @author imarom (08-May-16)
1423 */
1424void CGenNodePCAP::destroy() {
1425
1426    if (m_raw_packet) {
1427        delete m_raw_packet;
1428        m_raw_packet = NULL;
1429    }
1430
1431    if (m_reader) {
1432        delete m_reader;
1433        m_reader = NULL;
1434    }
1435
1436    /* end of io, return normal timeout of WD */
1437    TrexWatchDog::IOFunction::io_end();
1438
1439    m_state = PCAP_INVALID;
1440}
1441
1442