trex_stateless_rx_port_mngr.h revision 02ab06b1
1/*
2  Itay Marom
3  Cisco Systems, Inc.
4*/
5
6/*
7  Copyright (c) 2016-2016 Cisco Systems, Inc.
8
9  Licensed under the Apache License, Version 2.0 (the "License");
10  you may not use this file except in compliance with the License.
11  You may obtain a copy of the License at
12
13  http://www.apache.org/licenses/LICENSE-2.0
14
15  Unless required by applicable law or agreed to in writing, software
16  distributed under the License is distributed on an "AS IS" BASIS,
17  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  See the License for the specific language governing permissions and
19  limitations under the License.
20*/
21
22#ifndef __TREX_STATELESS_RX_PORT_MNGR_H__
23#define __TREX_STATELESS_RX_PORT_MNGR_H__
24
25#include <stdint.h>
26#include "common/base64.h"
27
28#include "common/captureFile.h"
29
30/**
31 * describes a single saved RX packet
32 *
33 */
34class RxPacket {
35public:
36
37    RxPacket(const rte_mbuf_t *m) {
38        /* assume single part packet */
39        assert(m->nb_segs == 1);
40
41        m_size = m->pkt_len;
42        const uint8_t *p = rte_pktmbuf_mtod(m, uint8_t *);
43
44        m_raw = (uint8_t *)malloc(m_size);
45        memcpy(m_raw, p, m_size);
46    }
47
48    /* RVO here - no performance impact */
49    const std::string to_base64_str() const {
50        return base64_encode(m_raw, m_size);
51    }
52
53    ~RxPacket() {
54        if (m_raw) {
55            delete m_raw;
56        }
57    }
58
59private:
60
61    uint8_t *m_raw;
62    uint16_t m_size;
63};
64
65/**
66 * a simple cyclic buffer to hold RX packets
67 *
68 */
69class RxPacketBuffer {
70public:
71
72    RxPacketBuffer(int limit) {
73        m_buffer = nullptr;
74        m_head   = 0;
75        m_tail   = 0;
76        m_limit  = limit;
77
78        m_buffer = new RxPacket*[limit](); // zeroed
79    }
80
81    ~RxPacketBuffer() {
82        assert(m_buffer);
83
84        while (!is_empty()) {
85            RxPacket *pkt = pop();
86            delete pkt;
87        }
88        delete [] m_buffer;
89    }
90
91    bool is_empty() const {
92        return (m_head == m_tail);
93    }
94
95    bool is_full() const {
96        return ( next(m_head) == m_tail);
97    }
98
99    int get_limit() const {
100        return m_limit;
101    }
102
103    void push(RxPacket *pkt) {
104        if (is_full()) {
105            delete pop();
106        }
107        m_buffer[m_head] = pkt;
108        m_head = next(m_head);
109    }
110
111    /**
112     * generate a JSON output of the queue
113     *
114     */
115    Json::Value to_json() {
116
117        Json::Value output = Json::arrayValue;
118
119        int tmp = m_tail;
120        while (tmp != m_head) {
121            RxPacket *pkt = m_buffer[tmp];
122            output.append(pkt->to_base64_str());
123            tmp = next(tmp);
124        }
125
126        return output;
127    }
128
129private:
130    int next(int v) const {
131        return ( (v + 1) % m_limit );
132    }
133
134    /* pop in case of full queue - internal usage */
135    RxPacket * pop() {
136        assert(!is_empty());
137        RxPacket *pkt = m_buffer[m_tail];
138        m_tail = next(m_tail);
139        return pkt;
140    }
141
142    int m_head;
143    int m_tail;
144    int m_limit;
145    RxPacket **m_buffer;
146};
147
148/**
149 * RX packet recorder to PCAP file
150 *
151 */
152class RXPacketRecorder {
153public:
154    RXPacketRecorder();
155    ~RXPacketRecorder();
156    void start(const std::string &pcap, uint32_t limit);
157    void stop();
158    void handle_pkt(const rte_mbuf_t *m);
159
160private:
161    CFileWriterBase  *m_writer;
162    CCapPktRaw        m_pkt;
163    dsec_t            m_epoch;
164    uint32_t          m_limit;
165};
166
167
168/**
169 * per port RX features manager
170 *
171 * @author imarom (10/30/2016)
172 */
173class RXPortManager {
174public:
175    enum features_t {
176        LATENCY = 0x1,
177        RECORD  = 0x2,
178        QUEUE   = 0x4
179    };
180
181    RXPortManager() {
182        m_features = 0;
183        m_pkt_buffer = NULL;
184        set_feature(LATENCY);
185    }
186
187    void start_recorder(const std::string &pcap, uint32_t limit_pkts) {
188        m_recorder.start(pcap, limit_pkts);
189        set_feature(RECORD);
190    }
191
192    void stop_recorder() {
193        m_recorder.stop();
194        unset_feature(RECORD);
195    }
196
197    void start_queue(uint32_t limit) {
198        if (m_pkt_buffer) {
199            delete m_pkt_buffer;
200        }
201        m_pkt_buffer = new RxPacketBuffer(limit);
202        set_feature(QUEUE);
203    }
204
205    void stop_queue() {
206        if (m_pkt_buffer) {
207            delete m_pkt_buffer;
208            m_pkt_buffer = NULL;
209        }
210        unset_feature(QUEUE);
211    }
212
213    RxPacketBuffer *get_pkt_buffer() {
214        if (!is_feature_set(QUEUE)) {
215            return NULL;
216        }
217
218        assert(m_pkt_buffer);
219
220        /* take the current */
221        RxPacketBuffer *old_buffer = m_pkt_buffer;
222
223        /* allocate a new buffer */
224        m_pkt_buffer = new RxPacketBuffer(old_buffer->get_limit());
225
226        return old_buffer;
227    }
228
229    void handle_pkt(const rte_mbuf_t *m) {
230        /* fast path */
231        if (no_features_set()) {
232            return;
233        }
234
235        if (is_feature_set(RECORD)) {
236            m_recorder.handle_pkt(m);
237        }
238
239        if (is_feature_set(QUEUE)) {
240            m_pkt_buffer->push(new RxPacket(m));
241        }
242    }
243
244private:
245
246    void set_feature(features_t feature) {
247        m_features |= feature;
248    }
249
250    void unset_feature(features_t feature) {
251        m_features &= (~feature);
252    }
253
254    bool is_feature_set(features_t feature) {
255        return ( (m_features & feature) == feature );
256    }
257
258    bool no_features_set() {
259        return (m_features == 0);
260    }
261
262    uint32_t            m_features;
263    RXPacketRecorder    m_recorder;
264    RxPacketBuffer     *m_pkt_buffer;
265};
266
267
268
269#endif /* __TREX_STATELESS_RX_PORT_MNGR_H__ */
270
271