1/*
2 Itay Marom
3 Hanoch Haim
4 Cisco Systems, Inc.
5*/
6
7/*
8Copyright (c) 2015-2016 Cisco Systems, Inc.
9
10Licensed under the Apache License, Version 2.0 (the "License");
11you may not use this file except in compliance with the License.
12You may obtain a copy of the License at
13
14    http://www.apache.org/licenses/LICENSE-2.0
15
16Unless required by applicable law or agreed to in writing, software
17distributed under the License is distributed on an "AS IS" BASIS,
18WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19See the License for the specific language governing permissions and
20limitations under the License.
21*/
22#include "bp_sim.h"
23#include "trex_stateless_dp_core.h"
24#include "trex_stateless_messaging.h"
25#include "trex_stream.h"
26#include "trex_stream_node.h"
27#include "trex_streams_compiler.h"
28#include "mbuf.h"
29
30/**
31 * a wrapper for service mode
32 * it will move the fast send_node virtual call
33 * to send_node_service_mode which does capturing
34 *
35 */
36class ServiceModeWrapper : public CVirtualIF {
37public:
38
39    ServiceModeWrapper() {
40        m_wrapped = nullptr;
41    }
42
43    void set_wrapped_object(CVirtualIF *wrapped) {
44        m_wrapped = wrapped;
45    }
46
47    CVirtualIF *get_wrapped_object() const {
48        return m_wrapped;
49    }
50
51    virtual int close_file(void) {
52        return m_wrapped->close_file();
53    }
54
55    virtual int flush_tx_queue(void) {
56        return m_wrapped->flush_tx_queue();
57    }
58
59    virtual int open_file(std::string file_name) {
60        return m_wrapped->open_file(file_name);
61    }
62
63    /* move to service mode */
64    virtual int send_node(CGenNode *node) {
65        return m_wrapped->send_node_service_mode(node);
66    }
67
68    virtual int update_mac_addr_from_global_cfg(pkt_dir_t dir, uint8_t *p) {
69        return m_wrapped->update_mac_addr_from_global_cfg(dir, p);
70    }
71
72    virtual pkt_dir_t port_id_to_dir(uint8_t port_id) {
73        return m_wrapped->port_id_to_dir(port_id);
74    }
75
76    virtual void send_one_pkt(pkt_dir_t dir, rte_mbuf_t *m) {
77        m_wrapped->send_one_pkt(dir, m);
78    }
79
80private:
81    CVirtualIF *m_wrapped;
82};
83
84
85void CGenNodeStateless::cache_mbuf_array_init(){
86    m_cache_size=0;
87    m_cache_array_cnt=0;
88}
89
90
91
92void CGenNodeStateless::cache_mbuf_array_copy(CGenNodeCacheMbuf *obj,
93                                              uint16_t size){
94
95    int i;
96    cache_mbuf_array_alloc(size);
97    for (i=0; i<size; i++) {
98        cache_mbuf_array_set(i,obj->m_array[i]);
99    }
100    cache_mbuf_array_set_const_mbuf(obj->m_mbuf_const);
101}
102
103
104rte_mbuf_t ** CGenNodeStateless::cache_mbuf_array_alloc(uint16_t size){
105
106    uint32_t buf_size = CGenNodeCacheMbuf::get_object_size(size);
107    /* TBD  replace with align, zero API */
108    m_cache_mbuf = (void *)malloc(buf_size);
109    assert(m_cache_mbuf);
110    memset(m_cache_mbuf,0,buf_size);
111
112    m_flags |= SL_NODE_CONST_MBUF_CACHE_ARRAY;
113    m_cache_size=size;
114    m_cache_array_cnt=0;
115    return ((rte_mbuf_t **)m_cache_mbuf);
116}
117
118void CGenNodeStateless::cache_mbuf_array_free(){
119
120    assert(m_cache_mbuf);
121    int i;
122    for (i=0; i<(int)m_cache_size; i++) {
123        rte_mbuf_t * m=cache_mbuf_array_get((uint16_t)i);
124        assert(m);
125        rte_pktmbuf_free(m);
126    }
127
128    /* free the const */
129    rte_mbuf_t * m=cache_mbuf_array_get_const_mbuf() ;
130    if (m) {
131        rte_pktmbuf_free(m);
132    }
133
134    free(m_cache_mbuf);
135    m_cache_mbuf=0;
136}
137
138
139rte_mbuf_t * CGenNodeStateless::cache_mbuf_array_get(uint16_t index){
140
141    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
142    return (p->m_array[index]);
143}
144
145void CGenNodeStateless::cache_mbuf_array_set_const_mbuf(rte_mbuf_t * m){
146    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
147    p->m_mbuf_const=m;
148}
149
150rte_mbuf_t * CGenNodeStateless::cache_mbuf_array_get_const_mbuf(){
151    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
152    return (p->m_mbuf_const);
153}
154
155
156void CGenNodeStateless::cache_mbuf_array_set(uint16_t index,
157                                             rte_mbuf_t * m){
158    CGenNodeCacheMbuf *p =(CGenNodeCacheMbuf *) m_cache_mbuf;
159    p->m_array[index]=m;
160}
161
162
163void CDpOneStream::Delete(CFlowGenListPerThread   * core){
164    assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE);
165    core->free_node((CGenNode *)m_node);
166    delete m_dp_stream;
167    m_node=0;
168    m_dp_stream=0;
169}
170
171void CDpOneStream::DeleteOnlyStream(){
172    assert(m_dp_stream);
173    delete m_dp_stream;
174    m_dp_stream=0;
175}
176
177int CGenNodeStateless::get_stream_id(){
178    if (m_state ==CGenNodeStateless::ss_FREE_RESUSE) {
179        return (-1); // not valid
180    }
181    assert(m_ref_stream_info);
182    return ((int)m_ref_stream_info->m_stream_id);
183}
184
185
186void CGenNodeStateless::DumpHeader(FILE *fd){
187    fprintf(fd," pkt_id, time, port , action , state, stream_id , stype , m-burst# , burst# \n");
188
189}
190void CGenNodeStateless::Dump(FILE *fd){
191    fprintf(fd," %2.4f, %3lu, %s,%s, %3d, %s, %3lu, %3lu  \n",
192            m_time,
193            (ulong)m_port_id,
194            "s-pkt", //action
195            get_stream_state_str(m_state ).c_str(),
196            get_stream_id(),   //stream_id
197            TrexStream::get_stream_type_str(m_stream_type).c_str(), //stype
198            (ulong)m_multi_bursts,
199            (ulong)m_single_burst
200            );
201}
202
203
204void CGenNodeStateless::generate_random_seed() {
205    /* seed can be provided by the user */
206    uint32_t unique_seed;
207    if (m_ref_stream_info->m_random_seed) {
208        unique_seed = m_ref_stream_info->m_random_seed;
209    } else {
210        unsigned int tmp = (unsigned int)time(NULL);
211        unique_seed = rand_r(&tmp);
212    }
213
214    /* per thread divergence */
215    unique_seed = (unique_seed * ( (m_thread_id + 1) * 514229 ) ) & 0xFFFFFFFF;
216
217    /* set random */
218    set_random_seed(unique_seed);
219}
220
221
222void CGenNodeStateless::refresh_vm_bss() {
223    if ( m_vm_flow_var ) {
224        StreamVmDp  * vm_s=m_ref_stream_info->m_vm_dp;
225        assert(vm_s);
226        memcpy(m_vm_flow_var,vm_s->get_bss(),vm_s->get_bss_size());
227
228        if ( vm_s->is_random_seed() ) {
229            generate_random_seed();
230        }
231
232    }
233}
234
235
236
237/**
238 * this function called when stream restart after it was inactive
239 */
240void CGenNodeStateless::refresh(){
241
242    /* refill the stream info */
243    m_single_burst    = m_single_burst_refill;
244    m_multi_bursts    = m_ref_stream_info->m_num_bursts;
245    m_state           = CGenNodeStateless::ss_ACTIVE;
246
247    /* refresh init value */
248#if 0
249    /* TBD should add a JSON varible for that */
250    refresh_vm_bss();
251#endif
252}
253
254
255void CGenNodeCommand::free_command(){
256
257    assert(m_cmd);
258    m_cmd->on_node_remove();
259    delete m_cmd;
260}
261
262
263std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state){
264    std::string res;
265
266    switch (stream_state) {
267    case CGenNodeStateless::ss_FREE_RESUSE :
268         res="FREE    ";
269        break;
270    case CGenNodeStateless::ss_INACTIVE :
271        res="INACTIVE ";
272        break;
273    case CGenNodeStateless::ss_ACTIVE :
274        res="ACTIVE   ";
275        break;
276    default:
277        res="Unknow   ";
278    };
279    return(res);
280}
281
282/*
283 * Allocate mbuf for flow stat (and latency) info sending
284 * m - Original mbuf (can be complicated mbuf data structure)
285 * fsp_head - return pointer in which the flow stat info should be filled
286 * is_const - is the given mbuf const
287 * return new mbuf structure in which the fsp_head can be written. If needed, orginal mbuf is freed.
288 */
289rte_mbuf_t * CGenNodeStateless::alloc_flow_stat_mbuf(rte_mbuf_t *m, struct flow_stat_payload_header *&fsp_head
290                                                     , bool is_const) {
291    rte_mbuf_t *m_ret = NULL, *m_lat = NULL;
292    uint16_t fsp_head_size = sizeof(struct flow_stat_payload_header);
293
294    if (is_const) {
295        // const mbuf case
296        if (rte_pktmbuf_data_len(m) > 128) {
297            m_ret = CGlobalInfo::pktmbuf_alloc_small(get_socket_id());
298            assert(m_ret);
299            // alloc mbuf just for the latency header
300            m_lat = CGlobalInfo::pktmbuf_alloc( get_socket_id(), fsp_head_size);
301            assert(m_lat);
302            fsp_head = (struct flow_stat_payload_header *)rte_pktmbuf_append(m_lat, fsp_head_size);
303            rte_pktmbuf_attach(m_ret, m);
304            rte_pktmbuf_trim(m_ret, sizeof(struct flow_stat_payload_header));
305            utl_rte_pktmbuf_add_after2(m_ret, m_lat);
306            // ref count was updated when we took the (const) mbuf, and again in rte_pktmbuf_attach
307            // so need do decrease now, to avoid leak.
308            rte_pktmbuf_refcnt_update(m, -1);
309            return m_ret;
310        } else {
311            // Short packet. Just copy all bytes.
312            m_ret = CGlobalInfo::pktmbuf_alloc( get_socket_id(), rte_pktmbuf_data_len(m) );
313            assert(m_ret);
314            char *p = rte_pktmbuf_mtod(m, char*);
315            char *p_new = rte_pktmbuf_append(m_ret, rte_pktmbuf_data_len(m));
316            memcpy(p_new , p, rte_pktmbuf_data_len(m));
317            fsp_head = (struct flow_stat_payload_header *)(p_new + rte_pktmbuf_data_len(m) - fsp_head_size);
318            rte_pktmbuf_free(m);
319            return m_ret;
320        }
321    } else {
322        // Field engine (vm)
323        if (rte_pktmbuf_is_contiguous(m)) {
324            // one, r/w mbuf
325            char *p = rte_pktmbuf_mtod(m, char*);
326            fsp_head = (struct flow_stat_payload_header *)(p + rte_pktmbuf_data_len(m) - fsp_head_size);
327            return m;
328        } else {
329            // We have: r/w --> read only.
330            // Changing to:
331            // (original) r/w -> (new) indirect (direct is original read_only, after trimming last bytes) -> (new) latency info
332            rte_mbuf_t *m_read_only = m->next, *m_indirect;
333
334            m_indirect = CGlobalInfo::pktmbuf_alloc_small(get_socket_id());
335            assert(m_indirect);
336            // alloc mbuf just for the latency header
337            m_lat = CGlobalInfo::pktmbuf_alloc( get_socket_id(), fsp_head_size);
338            assert(m_lat);
339            fsp_head = (struct flow_stat_payload_header *)rte_pktmbuf_append(m_lat, fsp_head_size);
340            utl_rte_pktmbuf_chain_with_indirect(m, m_indirect, m_read_only, m_lat);
341            m_indirect->data_len = (uint16_t)(m_indirect->data_len - fsp_head_size);
342            return m;
343        }
344    }
345}
346
347// test the const case of alloc_flow_stat_mbuf. The more complicated non const case is tested in the simulation.
348bool CGenNodeStateless::alloc_flow_stat_mbuf_test_const() {
349    rte_mbuf_t *m, *m_test;
350    uint16_t sizes[2] = {64, 500};
351    uint16_t size;
352    struct flow_stat_payload_header *fsp_head;
353    char *p;
354
355    set_socket_id(0);
356    for (int test_num = 0; test_num < sizeof(sizes)/sizeof(sizes[0]); test_num++) {
357        size = sizes[test_num];
358        m = CGlobalInfo::pktmbuf_alloc(get_socket_id(), size);
359        p = rte_pktmbuf_append(m, size);
360        for (int i = 0; i < size; i++) {
361            p[i] = (char)i;
362        }
363        m_test = alloc_flow_stat_mbuf(m, fsp_head, true);
364        p = rte_pktmbuf_mtod(m_test, char*);
365        assert(rte_pktmbuf_pkt_len(m_test) == size);
366        for (int i = 0; i < rte_pktmbuf_pkt_len(m_test) - sizeof(*fsp_head); i++) {
367            assert(p[i] == (char)i);
368        }
369        // verify fsp_head points correctly
370        if (size > 128) { // should match threshould in alloc_flow_stat_mbuf
371            assert(rte_pktmbuf_data_len(m_test) == size - sizeof(*fsp_head));
372            assert(rte_pktmbuf_data_len(m_test->next) == sizeof(*fsp_head));
373            assert((char *)fsp_head == rte_pktmbuf_mtod((m_test->next), char*));
374        } else {
375            assert(rte_pktmbuf_data_len(m_test) == size);
376            assert (((char *)fsp_head) + sizeof (*fsp_head) == p + rte_pktmbuf_data_len(m_test));
377        }
378        rte_pktmbuf_free(m_test);
379    }
380    return true;
381}
382
383rte_mbuf_t   * CGenNodeStateless::alloc_node_with_vm(){
384
385    rte_mbuf_t        * m;
386    /* alloc small packet buffer*/
387    uint16_t prefix_size = prefix_header_size();
388    m = CGlobalInfo::pktmbuf_alloc( get_socket_id(), prefix_size );
389    if (m==0) {
390        return (m);
391    }
392    /* TBD remove this, should handle cases of error */
393    assert(m);
394    char *p=rte_pktmbuf_append(m, prefix_size);
395    memcpy( p ,m_original_packet_data_prefix, prefix_size);
396
397
398    /* run the VM program */
399    StreamDPVmInstructionsRunner runner;
400    runner.set_mbuf(m);
401
402    runner.run( (uint32_t*)m_vm_flow_var,
403                m_vm_program_size,
404                m_vm_program,
405                m_vm_flow_var,
406                (uint8_t*)p);
407
408    uint16_t pkt_new_size=runner.get_new_pkt_size();
409    if ( likely( pkt_new_size == 0) ) {
410        /* no packet size change */
411        rte_mbuf_t * m_const = get_const_mbuf();
412        if (  m_const != NULL) {
413            utl_rte_pktmbuf_add_after(m,m_const);
414        }
415        return (m);
416    }
417
418    /* packet size change there are a few changes */
419    rte_mbuf_t * m_const = get_const_mbuf();
420    if ( (m_const == 0 ) || (pkt_new_size<=prefix_size) ) {
421        /* one mbuf , just trim it */
422        m->data_len = pkt_new_size;
423        m->pkt_len  = pkt_new_size;
424        return (m);
425    }
426
427    rte_mbuf_t * mi= CGlobalInfo::pktmbuf_alloc_small(get_socket_id());
428    assert(mi);
429    rte_pktmbuf_attach(mi,m_const);
430    utl_rte_pktmbuf_add_after2(m,mi);
431
432    if ( pkt_new_size < m->pkt_len) {
433        /* need to trim it */
434        mi->data_len = (pkt_new_size - prefix_size);
435        m->pkt_len   = pkt_new_size;
436    }
437    return (m);
438}
439
440void CGenNodeStateless::free_stl_vm_buf(){
441        rte_mbuf_t * m ;
442         m=get_const_mbuf();
443         if (m) {
444             rte_pktmbuf_free(m); /* reduce the ref counter */
445             /* clear the const marker */
446             clear_const_mbuf();
447         }
448
449         free_prefix_header();
450
451         if (m_vm_flow_var) {
452             /* free flow var */
453             free(m_vm_flow_var);
454             m_vm_flow_var=0;
455         }
456}
457
458
459
460void CGenNodeStateless::free_stl_node(){
461
462    if ( is_cache_mbuf_array() ){
463        /* do we have cache of mbuf pre allocated */
464        cache_mbuf_array_free();
465    }else{
466        /* if we have cache mbuf free it */
467        rte_mbuf_t * m=get_cache_mbuf();
468        if (m) {
469                rte_pktmbuf_free(m);
470                m_cache_mbuf=0;
471        }
472    }
473    free_stl_vm_buf();
474}
475
476
477bool TrexStatelessDpPerPort::update_number_of_active_streams(uint32_t d){
478    m_active_streams-=d; /* reduce the number of streams */
479    if (m_active_streams == 0) {
480        return (true);
481    }
482    return (false);
483}
484
485bool TrexStatelessDpPerPort::resume_traffic(uint8_t port_id){
486
487    /* we are working with continues streams so we must be in transmit mode */
488    assert(m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE);
489
490    for (auto dp_stream : m_active_nodes) {
491        CGenNodeStateless * node =dp_stream.m_node;
492        assert(node->get_port_id() == port_id);
493        assert(node->is_pause() == true);
494        node->set_pause(false);
495    }
496    m_state = TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
497    return (true);
498}
499
500bool TrexStatelessDpPerPort::update_traffic(uint8_t port_id, double factor) {
501
502    assert( (m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING ||
503            (m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE)) );
504
505    for (auto dp_stream : m_active_nodes) {
506        CGenNodeStateless * node = dp_stream.m_node;
507        assert(node->get_port_id() == port_id);
508
509        node->update_rate(factor);
510    }
511
512    return (true);
513}
514
515bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){
516
517    /* we are working with continues streams so we must be in transmit mode */
518    assert(m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING);
519
520    for (auto dp_stream : m_active_nodes) {
521        CGenNodeStateless * node =dp_stream.m_node;
522        assert(node->get_port_id() == port_id);
523        assert(node->is_pause() == false);
524        node->set_pause(true);
525    }
526    m_state = TrexStatelessDpPerPort::ppSTATE_PAUSE;
527    return (true);
528}
529
530bool TrexStatelessDpPerPort::push_pcap(uint8_t port_id,
531                                       const std::string &pcap_filename,
532                                       double ipg_usec,
533                                       double min_ipg_sec,
534                                       double speedup,
535                                       uint32_t count,
536                                       bool is_dual) {
537
538    /* push pcap can only happen on an idle port from the core prespective */
539    assert(m_state == TrexStatelessDpPerPort::ppSTATE_IDLE);
540
541    CGenNodePCAP *pcap_node = m_core->allocate_pcap_node();
542    if (!pcap_node) {
543        return (false);
544    }
545
546    pkt_dir_t dir          = m_core->m_node_gen.m_v_if->port_id_to_dir(port_id);
547    socket_id_t socket_id  = m_core->m_node_gen.m_socket_id;
548
549    /* main port */
550    uint8_t mac_addr[12];
551    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, mac_addr);
552
553    /* for dual */
554    uint8_t slave_mac_addr[12];
555    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir ^ 0x1, slave_mac_addr);
556
557    bool rc = pcap_node->create(port_id,
558                                dir,
559                                socket_id,
560                                mac_addr,
561                                slave_mac_addr,
562                                pcap_filename,
563                                ipg_usec,
564                                min_ipg_sec,
565                                speedup,
566                                count,
567                                is_dual);
568    if (!rc) {
569        m_core->free_node((CGenNode *)pcap_node);
570        return (false);
571    }
572
573    /* schedule the node for now */
574    pcap_node->m_time = m_core->m_cur_time_sec;
575    m_core->m_node_gen.add_node((CGenNode *)pcap_node);
576
577    /* hold a pointer to the node */
578    assert(m_active_pcap_node == NULL);
579    m_active_pcap_node = pcap_node;
580
581    m_state = TrexStatelessDpPerPort::ppSTATE_PCAP_TX;
582    return (true);
583}
584
585
586bool TrexStatelessDpPerPort::stop_traffic(uint8_t  port_id,
587                                          bool     stop_on_id,
588                                          int      event_id){
589
590
591    if (m_state == TrexStatelessDpPerPort::ppSTATE_IDLE) {
592        assert(m_active_streams==0);
593        return false;
594    }
595
596    /* there could be race of stop after stop */
597    if ( stop_on_id ) {
598        if (event_id != m_event_id){
599            /* we can't stop it is an old message */
600            return false;
601        }
602    }
603
604    for (auto dp_stream : m_active_nodes) {
605        CGenNodeStateless * node =dp_stream.m_node;
606        assert(node->get_port_id() == port_id);
607        if ( node->get_state() == CGenNodeStateless::ss_ACTIVE) {
608            node->mark_for_free();
609            m_active_streams--;
610            dp_stream.DeleteOnlyStream();
611
612        }else{
613            dp_stream.Delete(m_core);
614        }
615    }
616
617    /* check for active PCAP node */
618    if (m_active_pcap_node) {
619        /* when got async stop from outside or duration */
620        if (m_active_pcap_node->is_active()) {
621            m_active_pcap_node->mark_for_free();
622        } else {
623            /* graceful stop - node was put out by the scheduler */
624            m_core->free_node( (CGenNode *)m_active_pcap_node);
625        }
626
627        m_active_pcap_node = NULL;
628    }
629
630    /* active stream should be zero */
631    assert(m_active_streams==0);
632    m_active_nodes.clear();
633    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
634    return (true);
635}
636
637
638void TrexStatelessDpPerPort::create(CFlowGenListPerThread   *  core){
639    m_core=core;
640    m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
641    m_active_streams=0;
642    m_active_nodes.clear();
643    m_active_pcap_node = NULL;
644}
645
646
647TrexStatelessDpCore::TrexStatelessDpCore() {
648    m_thread_id       = 0;
649    m_core            = NULL;
650    m_duration        = -1;
651    m_is_service_mode = NULL;
652    m_wrapper         = new ServiceModeWrapper();
653}
654
655TrexStatelessDpCore::~TrexStatelessDpCore() {
656    delete m_wrapper;
657}
658
659
660void
661TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) {
662    m_thread_id = thread_id;
663    m_core = core;
664    m_local_port_offset = 2*core->getDualPortId();
665
666    CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
667
668    m_ring_from_cp = cp_dp->getRingCpToDp(thread_id);
669    m_ring_to_cp   = cp_dp->getRingDpToCp(thread_id);
670
671    m_state = STATE_IDLE;
672
673    int i;
674    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
675        m_ports[i].create(core);
676    }
677}
678
679
680/* move to the next stream, old stream move to INACTIVE */
681bool TrexStatelessDpCore::set_stateless_next_node(CGenNodeStateless * cur_node,
682                                                  CGenNodeStateless * next_node){
683
684    assert(cur_node);
685    TrexStatelessDpPerPort * lp_port = get_port_db(cur_node->m_port_id);
686    bool schedule =false;
687
688    bool to_stop_port=false;
689
690    if (next_node == NULL) {
691        /* there is no next stream , reduce the number of active streams*/
692        to_stop_port = lp_port->update_number_of_active_streams(1);
693
694    }else{
695        uint8_t state=next_node->get_state();
696
697        /* can't be FREE_RESUSE */
698        assert(state != CGenNodeStateless::ss_FREE_RESUSE);
699        if (state == CGenNodeStateless::ss_INACTIVE ) {
700
701            if (cur_node->m_action_counter > 0) {
702                cur_node->m_action_counter--;
703                if (cur_node->m_action_counter==0) {
704                    to_stop_port = lp_port->update_number_of_active_streams(1);
705                }else{
706                    /* refill start info and scedule, no update in active streams  */
707                    next_node->refresh();
708                    schedule = true;
709                }
710            }else{
711                /* refill start info and scedule, no update in active streams  */
712                next_node->refresh();
713                schedule = true;
714            }
715
716        }else{
717            to_stop_port = lp_port->update_number_of_active_streams(1);
718        }
719    }
720
721    if ( to_stop_port ) {
722        /* call stop port explictly to move the state */
723        stop_traffic(cur_node->m_port_id,false,0);
724    }
725
726    return ( schedule );
727}
728
729
730
731/**
732 * in idle state loop, the processor most of the time sleeps
733 * and periodically checks for messages
734 *
735 * @author imarom (01-Nov-15)
736 */
737void
738TrexStatelessDpCore::idle_state_loop() {
739
740    const int SHORT_DELAY_MS    = 2;
741    const int LONG_DELAY_MS     = 50;
742    const int DEEP_SLEEP_LIMIT  = 2000;
743
744    int counter = 0;
745
746    while (m_state == STATE_IDLE) {
747        m_core->tickle();
748
749        bool had_msg = m_core->check_msgs();
750        if (had_msg) {
751            counter = 0;
752            continue;
753        }
754
755        /* enter deep sleep only if enough time had passed */
756        if (counter < DEEP_SLEEP_LIMIT) {
757            delay(SHORT_DELAY_MS);
758            counter++;
759        } else {
760            delay(LONG_DELAY_MS);
761        }
762
763    }
764}
765
766
767
768void TrexStatelessDpCore::quit_main_loop(){
769    m_core->set_terminate_mode(true); /* mark it as terminated */
770    m_state = STATE_TERMINATE;
771    add_global_duration(0.0001);
772}
773
774
775/**
776 * scehduler runs when traffic exists
777 * it will return when no more transmitting is done on this
778 * core
779 *
780 * @author imarom (01-Nov-15)
781 */
782void
783TrexStatelessDpCore::start_scheduler() {
784
785    /* creates a maintenace job using the scheduler */
786    CGenNode * node_sync = m_core->create_node() ;
787    node_sync->m_type = CGenNode::FLOW_SYNC;
788    node_sync->m_time = m_core->m_cur_time_sec + SYNC_TIME_OUT;
789
790    m_core->m_node_gen.add_node(node_sync);
791
792    double old_offset = 0.0;
793    m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset);
794    /* bail out in case of terminate */
795    if (m_state != TrexStatelessDpCore::STATE_TERMINATE) {
796        m_core->m_node_gen.close_file(m_core);
797        m_state = STATE_IDLE; /* we exit from all ports and we have nothing to do, we move to IDLE state */
798    }
799}
800
801
802void
803TrexStatelessDpCore::run_once(){
804
805    idle_state_loop();
806
807    if ( m_state == STATE_TERMINATE ){
808        return;
809    }
810
811    start_scheduler();
812}
813
814
815
816
817void
818TrexStatelessDpCore::start() {
819
820    while (true) {
821        run_once();
822
823        if ( m_core->is_terminated_by_master() ) {
824            break;
825        }
826    }
827}
828
829/* only if both port are idle we can exit */
830void
831TrexStatelessDpCore::schedule_exit(){
832
833    CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
834
835    node->m_type = CGenNode::COMMAND;
836
837    node->m_cmd = new TrexStatelessDpCanQuit();
838
839    /* make sure it will be scheduled after the current node */
840    node->m_time = m_core->m_cur_time_sec ;
841
842    m_core->m_node_gen.add_node((CGenNode *)node);
843}
844
845
846void
847TrexStatelessDpCore::add_global_duration(double duration){
848    if (duration > 0.0) {
849        CGenNode *node = m_core->create_node() ;
850
851        node->m_type = CGenNode::EXIT_SCHED;
852
853        /* make sure it will be scheduled after the current node */
854        node->m_time = m_core->m_cur_time_sec + duration ;
855
856        m_core->m_node_gen.add_node(node);
857    }
858}
859
860/* add per port exit */
861void
862TrexStatelessDpCore::add_port_duration(double duration,
863                                       uint8_t port_id,
864                                       int event_id){
865    if (duration > 0.0) {
866        CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
867
868        node->m_type = CGenNode::COMMAND;
869
870        /* make sure it will be scheduled after the current node */
871        node->m_time = m_core->m_cur_time_sec + duration ;
872
873        TrexStatelessDpStop * cmd=new TrexStatelessDpStop(port_id);
874
875
876        /* test this */
877        m_core->m_non_active_nodes++;
878        cmd->set_core_ptr(m_core);
879        cmd->set_event_id(event_id);
880        cmd->set_wait_for_event_id(true);
881
882        node->m_cmd = cmd;
883
884        m_core->m_node_gen.add_node((CGenNode *)node);
885    }
886}
887
888
889void TrexStatelessDpCore::update_mac_addr(TrexStream * stream,
890                                          CGenNodeStateless *node,
891                                          pkt_dir_t dir,
892                                          char *raw_pkt){
893    bool              ov_src = stream->get_override_src_mac_by_pkt_data();
894    TrexStream::stream_dst_mac_t  ov_dst = stream->get_override_dst_mac_mode();
895
896
897    if ( (ov_src == true) && (ov_dst == TrexStream::stPKT) ) {
898        /* nothing to do, take from the packet both */
899        return;
900    }
901
902        /* take from cfg_file */
903    if ( (ov_src == false) &&
904         (ov_dst == TrexStream::stCFG_FILE) ){
905
906          m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*)raw_pkt);
907          return;
908    }
909
910    /* save the pkt*/
911    char tmp_pkt[12];
912    memcpy(tmp_pkt,raw_pkt,12);
913
914    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*)raw_pkt);
915
916    if ((ov_src == true) && (ov_dst == TrexStream::stCFG_FILE)) {
917        memcpy(raw_pkt+6,tmp_pkt+6,6);
918    }
919
920    if ((ov_src == false) && (ov_dst == TrexStream::stPKT)) {
921        memcpy(raw_pkt,tmp_pkt,6);
922    }
923}
924
925
926void TrexStatelessDpCore::replay_vm_into_cache(TrexStream * stream,
927                                               CGenNodeStateless *node){
928
929    uint16_t      cache_size = stream->m_cache_size;
930    assert(cache_size>0);
931    rte_mbuf_t * m=0;
932
933    uint32_t buf_size = CGenNodeCacheMbuf::get_object_size(cache_size);
934    CGenNodeCacheMbuf * p = (CGenNodeCacheMbuf *)malloc(buf_size);
935    assert(p);
936    memset(p,0,buf_size);
937
938    int i;
939    for (i=0; i<cache_size; i++) {
940        p->m_array[i] =  node->alloc_node_with_vm();
941    }
942    /* save const */
943    m=node->get_const_mbuf();
944    if (m) {
945        p->m_mbuf_const=m;
946        rte_pktmbuf_refcnt_update(m,1);
947    }
948
949    /* free all VM and const mbuf */
950    node->free_stl_vm_buf();
951
952    /* copy to local node meory */
953    node->cache_mbuf_array_copy(p,cache_size);
954
955    /* free the memory */
956    free(p);
957}
958
959
960void
961TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port,
962                                TrexStream * stream,
963                                TrexStreamsCompiledObj *comp) {
964
965    CGenNodeStateless *node = m_core->create_node_sl();
966
967    node->m_thread_id = m_thread_id;
968    node->cache_mbuf_array_init();
969    node->m_batch_size=0;
970
971    /* add periodic */
972    node->m_cache_mbuf=0;
973    node->m_type = CGenNode::STATELESS_PKT;
974
975    node->m_action_counter = stream->m_action_count;
976
977    /* clone the stream from control plane memory to DP memory */
978    node->m_ref_stream_info = stream->clone();
979    /* no need for this memory anymore on the control plane memory */
980    stream->release_dp_object();
981
982    node->m_next_stream=0; /* will be fixed later */
983
984    if ( stream->m_self_start ){
985        /* if self start it is in active mode */
986        node->m_state =CGenNodeStateless::ss_ACTIVE;
987        lp_port->m_active_streams++;
988    }else{
989        node->m_state =CGenNodeStateless::ss_INACTIVE;
990    }
991
992    node->m_time = m_core->m_cur_time_sec + stream->get_start_delay_sec();
993
994    pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(stream->m_port_id);
995    node->m_flags = 0;
996    node->m_src_port =0;
997    node->m_original_packet_data_prefix = 0;
998
999    if (stream->m_rx_check.m_enabled) {
1000        node->set_stat_needed();
1001        uint16_t hw_id = stream->m_rx_check.m_hw_id;
1002        assert (hw_id < MAX_FLOW_STATS + MAX_FLOW_STATS_PAYLOAD);
1003        node->set_stat_hw_id(hw_id);
1004        // no support for cache with flow stat payload rules
1005        if ((TrexPlatformApi::driver_stat_cap_e)stream->m_rx_check.m_rule_type == TrexPlatformApi::IF_STAT_PAYLOAD) {
1006            stream->m_cache_size = 0;
1007        }
1008    }
1009
1010    /* set socket id */
1011    node->set_socket_id(m_core->m_node_gen.m_socket_id);
1012
1013    /* build a mbuf from a packet */
1014
1015    uint16_t pkt_size = stream->m_pkt.len;
1016    const uint8_t *stream_pkt = stream->m_pkt.binary;
1017
1018    node->m_pause =0;
1019    node->m_stream_type = stream->m_type;
1020    node->m_next_time_offset = 1.0 / stream->get_pps();
1021    node->m_null_stream = (stream->m_null_stream ? 1 : 0);
1022
1023    /* stateless specific fields */
1024    switch ( stream->m_type ) {
1025
1026    case TrexStream::stCONTINUOUS :
1027        node->m_single_burst=0;
1028        node->m_single_burst_refill=0;
1029        node->m_multi_bursts=0;
1030        break;
1031
1032    case TrexStream::stSINGLE_BURST :
1033        node->m_stream_type             = TrexStream::stMULTI_BURST;
1034        node->m_single_burst            = stream->m_burst_total_pkts;
1035        node->m_single_burst_refill     = stream->m_burst_total_pkts;
1036        node->m_multi_bursts            = 1;  /* single burst in multi burst of 1 */
1037        break;
1038
1039    case TrexStream::stMULTI_BURST :
1040        node->m_single_burst        = stream->m_burst_total_pkts;
1041        node->m_single_burst_refill = stream->m_burst_total_pkts;
1042        node->m_multi_bursts        = stream->m_num_bursts;
1043        break;
1044    default:
1045
1046        assert(0);
1047    };
1048
1049    node->m_port_id = stream->m_port_id;
1050
1051    /* set dir 0 or 1 client or server */
1052    node->set_mbuf_cache_dir(dir);
1053
1054
1055    if (node->m_ref_stream_info->getDpVm() == NULL) {
1056        /* no VM */
1057
1058        node->m_vm_flow_var =  NULL;
1059        node->m_vm_program  =  NULL;
1060        node->m_vm_program_size =0;
1061
1062                /* allocate const mbuf */
1063        rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size);
1064        assert(m);
1065
1066        char *p = rte_pktmbuf_append(m, pkt_size);
1067        assert(p);
1068        /* copy the packet */
1069        memcpy(p,stream_pkt,pkt_size);
1070
1071        update_mac_addr(stream,node,dir,p);
1072
1073        /* set the packet as a readonly */
1074        node->set_cache_mbuf(m);
1075
1076        node->m_original_packet_data_prefix =0;
1077    }else{
1078
1079        /* set the program */
1080        TrexStream * local_mem_stream = node->m_ref_stream_info;
1081
1082        StreamVmDp  * lpDpVm = local_mem_stream->getDpVm();
1083
1084        node->m_vm_flow_var      = lpDpVm->clone_bss(); /* clone the flow var */
1085        node->m_vm_program       = lpDpVm->get_program(); /* same ref to the program */
1086        node->m_vm_program_size  = lpDpVm->get_program_size();
1087
1088        /* generate random seed if needed*/
1089        if (lpDpVm->is_random_seed()) {
1090            node->generate_random_seed();
1091        }
1092
1093        /* we need to copy the object */
1094        if ( pkt_size > lpDpVm->get_prefix_size() ) {
1095            /* we need const packet */
1096            uint16_t const_pkt_size  = pkt_size - lpDpVm->get_prefix_size() ;
1097            rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), const_pkt_size );
1098            assert(m);
1099
1100            char *p = rte_pktmbuf_append(m, const_pkt_size);
1101            assert(p);
1102
1103            /* copy packet data */
1104            memcpy(p,(stream_pkt + lpDpVm->get_prefix_size()),const_pkt_size);
1105
1106            node->set_const_mbuf(m);
1107        }
1108
1109
1110        if ( lpDpVm->is_pkt_size_var() ) {
1111            // mark the node as varible size
1112            node->set_var_pkt_size();
1113        }
1114
1115
1116        if (lpDpVm->get_prefix_size() > pkt_size ) {
1117            lpDpVm->set_prefix_size(pkt_size);
1118        }
1119
1120        /* copy the headr */
1121        uint16_t header_size = lpDpVm->get_prefix_size();
1122        assert(header_size);
1123        node->alloc_prefix_header(header_size);
1124        uint8_t *p=node->m_original_packet_data_prefix;
1125        assert(p);
1126
1127        memcpy(p,stream_pkt , header_size);
1128
1129        update_mac_addr(stream,node,dir,(char *)p);
1130
1131        if (stream->m_cache_size > 0 ) {
1132            /* we need to create cache of objects */
1133            replay_vm_into_cache(stream, node);
1134        }
1135    }
1136
1137
1138    CDpOneStream one_stream;
1139
1140    one_stream.m_dp_stream = node->m_ref_stream_info;
1141    one_stream.m_node =node;
1142
1143    lp_port->m_active_nodes.push_back(one_stream);
1144
1145    /* schedule only if active */
1146    if (node->m_state == CGenNodeStateless::ss_ACTIVE) {
1147        m_core->m_node_gen.add_node((CGenNode *)node);
1148    }
1149}
1150
1151void
1152TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj,
1153                                   double duration,
1154                                   int event_id) {
1155
1156
1157    TrexStatelessDpPerPort * lp_port=get_port_db(obj->get_port_id());
1158    lp_port->m_active_streams = 0;
1159    lp_port->set_event_id(event_id);
1160
1161    /* update cur time */
1162    if ( CGlobalInfo::is_realtime()  ){
1163        m_core->m_cur_time_sec = now_sec() + SCHD_OFFSET_DTIME ;
1164    }
1165
1166    /* no nodes in the list */
1167    assert(lp_port->m_active_nodes.size()==0);
1168
1169    for (auto single_stream : obj->get_objects()) {
1170        /* all commands should be for the same port */
1171        assert(obj->get_port_id() == single_stream.m_stream->m_port_id);
1172        add_stream(lp_port,single_stream.m_stream,obj);
1173    }
1174
1175    uint32_t nodes = lp_port->m_active_nodes.size();
1176    /* find next stream */
1177    assert(nodes == obj->get_objects().size());
1178
1179    int cnt=0;
1180
1181    /* set the next_stream pointer  */
1182    for (auto single_stream : obj->get_objects()) {
1183
1184        if (single_stream.m_stream->is_dp_next_stream() ) {
1185            int stream_id = single_stream.m_stream->m_next_stream_id;
1186            assert(stream_id<nodes);
1187            /* point to the next stream , stream_id is fixed */
1188            lp_port->m_active_nodes[cnt].m_node->m_next_stream = lp_port->m_active_nodes[stream_id].m_node ;
1189        }
1190        cnt++;
1191    }
1192
1193    lp_port->m_state =TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
1194    m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
1195
1196
1197    if ( duration > 0.0 ){
1198        add_port_duration( duration ,obj->get_port_id(),event_id );
1199    }
1200
1201}
1202
1203
1204bool TrexStatelessDpCore::are_all_ports_idle(){
1205
1206    bool res=true;
1207    int i;
1208    for (i=0; i<NUM_PORTS_PER_CORE; i++) {
1209        if ( m_ports[i].m_state != TrexStatelessDpPerPort::ppSTATE_IDLE ){
1210            res=false;
1211        }
1212    }
1213    return (res);
1214}
1215
1216
1217void
1218TrexStatelessDpCore::resume_traffic(uint8_t port_id){
1219
1220    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1221
1222    lp_port->resume_traffic(port_id);
1223}
1224
1225
1226void
1227TrexStatelessDpCore::pause_traffic(uint8_t port_id){
1228
1229    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1230
1231    lp_port->pause_traffic(port_id);
1232}
1233
1234void
1235TrexStatelessDpCore::push_pcap(uint8_t port_id,
1236                               int event_id,
1237                               const std::string &pcap_filename,
1238                               double ipg_usec,
1239                               double m_min_ipg_sec,
1240                               double speedup,
1241                               uint32_t count,
1242                               double duration,
1243                               bool is_dual) {
1244
1245    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1246
1247    lp_port->set_event_id(event_id);
1248
1249    /* delegate the command to the port */
1250    bool rc = lp_port->push_pcap(port_id, pcap_filename, ipg_usec, m_min_ipg_sec, speedup, count, is_dual);
1251    if (!rc) {
1252        /* report back that we stopped */
1253        CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
1254        TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
1255                                                                       port_id,
1256                                                                       event_id,
1257                                                                       false);
1258        ring->Enqueue((CGenNode *)event_msg);
1259        return;
1260    }
1261
1262
1263    if (duration > 0.0) {
1264        add_port_duration(duration, port_id, event_id);
1265    }
1266
1267     m_state = TrexStatelessDpCore::STATE_PCAP_TX;
1268}
1269
1270void
1271TrexStatelessDpCore::update_traffic(uint8_t port_id, double factor) {
1272
1273    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1274
1275    lp_port->update_traffic(port_id, factor);
1276}
1277
1278
1279void
1280TrexStatelessDpCore::stop_traffic(uint8_t  port_id,
1281                                  bool     stop_on_id,
1282                                  int      event_id) {
1283    /* we cannot remove nodes not from the top of the queue so
1284       for every active node - make sure next time
1285       the scheduler invokes it, it will be free */
1286
1287    TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
1288    if ( lp_port->stop_traffic(port_id,stop_on_id,event_id) == false){
1289        return;
1290    }
1291
1292    /* flush the TX queue before sending done message to the CP */
1293    m_core->flush_tx_queue();
1294
1295    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
1296    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
1297                                                                   port_id,
1298                                                                   lp_port->get_event_id());
1299    ring->Enqueue((CGenNode *)event_msg);
1300
1301}
1302
1303/**
1304 * handle a message from CP to DP
1305 *
1306 */
1307void
1308TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) {
1309    msg->handle(this);
1310    delete msg;
1311}
1312
1313void
1314TrexStatelessDpCore::barrier(uint8_t port_id, int event_id) {
1315
1316    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
1317    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
1318                                                                   port_id,
1319                                                                   event_id);
1320    ring->Enqueue((CGenNode *)event_msg);
1321}
1322
1323void
1324TrexStatelessDpCore::set_service_mode(uint8_t port_id, bool enabled) {
1325    /* ignore the same message */
1326    if (enabled == m_is_service_mode) {
1327        return;
1328    }
1329
1330    if (enabled) {
1331        /* sanity */
1332        assert(m_core->m_node_gen.m_v_if != m_wrapper);
1333
1334        /* set the wrapper object and make the VIF point to it */
1335        m_wrapper->set_wrapped_object(m_core->m_node_gen.m_v_if);
1336        m_core->m_node_gen.m_v_if = m_wrapper;
1337        m_is_service_mode = true;
1338
1339    } else {
1340        /* sanity */
1341        assert(m_core->m_node_gen.m_v_if == m_wrapper);
1342
1343        /* restore the wrapped object and make the VIF point to it */
1344        m_core->m_node_gen.m_v_if = m_wrapper->get_wrapped_object();
1345        m_is_service_mode = false;
1346    }
1347}
1348
1349
1350/**
1351 * PCAP node
1352 */
1353bool CGenNodePCAP::create(uint8_t port_id,
1354                          pkt_dir_t dir,
1355                          socket_id_t socket_id,
1356                          const uint8_t *mac_addr,
1357                          const uint8_t *slave_mac_addr,
1358                          const std::string &pcap_filename,
1359                          double ipg_usec,
1360                          double min_ipg_sec,
1361                          double speedup,
1362                          uint32_t count,
1363                          bool is_dual) {
1364    std::stringstream ss;
1365
1366    m_type       = CGenNode::PCAP_PKT;
1367    m_flags      = 0;
1368    m_src_port   = 0;
1369    m_port_id    = port_id;
1370    m_count      = count;
1371    m_is_dual    = is_dual;
1372    m_dir        = dir;
1373    m_min_ipg_sec    = min_ipg_sec;
1374
1375    /* increase timeout of WD due to io */
1376    TrexWatchDog::IOFunction::io_begin();
1377
1378    /* mark this node as slow path */
1379    set_slow_path(true);
1380
1381    if (ipg_usec != -1) {
1382        /* fixed IPG */
1383        m_ipg_sec = std::max(min_ipg_sec, usec_to_sec(ipg_usec / speedup));
1384        m_speedup = 0;
1385    } else {
1386        /* packet IPG */
1387        m_ipg_sec = -1;
1388        m_speedup  = speedup;
1389    }
1390
1391    /* copy MAC addr info */
1392    memcpy(m_mac_addr, mac_addr, 12);
1393    memcpy(m_slave_mac_addr, slave_mac_addr, 12);
1394
1395
1396    set_socket_id(socket_id);
1397
1398    /* create the PCAP reader */
1399    m_reader = CCapReaderFactory::CreateReader((char *)pcap_filename.c_str(), 0, ss);
1400    if (!m_reader) {
1401        return false;
1402    }
1403
1404    m_raw_packet = new CCapPktRaw();
1405    if ( m_reader->ReadPacket(m_raw_packet) == false ){
1406        return false;
1407    }
1408
1409    /* set the dir */
1410    set_mbuf_dir(dir);
1411
1412    /* update the direction (for dual mode) */
1413    update_pkt_dir();
1414
1415    /* this is the reference time */
1416    m_last_pkt_time = m_raw_packet->get_time();
1417
1418    /* ready */
1419    m_state = PCAP_ACTIVE;
1420
1421    return true;
1422}
1423
1424/**
1425 * cleanup for PCAP node
1426 *
1427 * @author imarom (08-May-16)
1428 */
1429void CGenNodePCAP::destroy() {
1430
1431    if (m_raw_packet) {
1432        delete m_raw_packet;
1433        m_raw_packet = NULL;
1434    }
1435
1436    if (m_reader) {
1437        delete m_reader;
1438        m_reader = NULL;
1439    }
1440
1441    /* end of io, return normal timeout of WD */
1442    TrexWatchDog::IOFunction::io_end();
1443
1444    m_state = PCAP_INVALID;
1445}
1446
1447