trex_stateless_rx_port_mngr.cpp revision 051a334b
1/*
2  Itay Marom
3  Cisco Systems, Inc.
4*/
5
6/*
7  Copyright (c) 2016-2016 Cisco Systems, Inc.
8
9  Licensed under the Apache License, Version 2.0 (the "License");
10  you may not use this file except in compliance with the License.
11  You may obtain a copy of the License at
12
13  http://www.apache.org/licenses/LICENSE-2.0
14
15  Unless required by applicable law or agreed to in writing, software
16  distributed under the License is distributed on an "AS IS" BASIS,
17  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  See the License for the specific language governing permissions and
19  limitations under the License.
20*/
21#include "bp_sim.h"
22#include "trex_stateless_rx_port_mngr.h"
23#include "common/captureFile.h"
24#include "trex_stateless_rx_core.h"
25
26/**************************************
27 * latency RX feature
28 *
29 *************************************/
30RXLatency::RXLatency() {
31    m_rcv_all    = false;
32    m_rfc2544    = NULL;
33    m_err_cntrs  = NULL;
34
35    for (int i = 0; i < MAX_FLOW_STATS; i++) {
36        m_rx_pg_stat[i].clear();
37        m_rx_pg_stat_payload[i].clear();
38    }
39}
40
41void
42RXLatency::create(CRFC2544Info *rfc2544, CRxCoreErrCntrs *err_cntrs) {
43    m_rfc2544   = rfc2544;
44    m_err_cntrs = err_cntrs;
45}
46
47void
48RXLatency::handle_pkt(const rte_mbuf_t *m) {
49    CFlowStatParser parser;
50
51    if (m_rcv_all || parser.parse(rte_pktmbuf_mtod(m, uint8_t *), m->pkt_len) == 0) {
52        uint32_t ip_id;
53        if (m_rcv_all || (parser.get_ip_id(ip_id) == 0)) {
54            if (m_rcv_all || is_flow_stat_id(ip_id)) {
55                uint16_t hw_id;
56                if (m_rcv_all || is_flow_stat_payload_id(ip_id)) {
57                    bool good_packet = true;
58                    uint8_t *p = rte_pktmbuf_mtod(m, uint8_t*);
59                    struct flow_stat_payload_header *fsp_head = (struct flow_stat_payload_header *)
60                        (p + m->pkt_len - sizeof(struct flow_stat_payload_header));
61                    hw_id = fsp_head->hw_id;
62                    CRFC2544Info *curr_rfc2544;
63
64                    if (unlikely(fsp_head->magic != FLOW_STAT_PAYLOAD_MAGIC) || hw_id >= MAX_FLOW_STATS_PAYLOAD) {
65                        good_packet = false;
66                        if (!m_rcv_all)
67                            m_err_cntrs->m_bad_header++;
68                    } else {
69                        curr_rfc2544 = &m_rfc2544[hw_id];
70
71                        if (fsp_head->flow_seq != curr_rfc2544->get_exp_flow_seq()) {
72                            // bad flow seq num
73                            // Might be the first packet of a new flow, packet from an old flow, or garbage.
74
75                            if (fsp_head->flow_seq == curr_rfc2544->get_prev_flow_seq()) {
76                                // packet from previous flow using this hw_id that arrived late
77                                good_packet = false;
78                                m_err_cntrs->m_old_flow++;
79                            } else {
80                                if (curr_rfc2544->no_flow_seq()) {
81                                    // first packet we see from this flow
82                                    good_packet = true;
83                                    curr_rfc2544->set_exp_flow_seq(fsp_head->flow_seq);
84                                } else {
85                                    // garbage packet
86                                    good_packet = false;
87                                    m_err_cntrs->m_bad_header++;
88                                }
89                            }
90                        }
91                    }
92
93                    if (good_packet) {
94                        uint32_t pkt_seq = fsp_head->seq;
95                        uint32_t exp_seq = curr_rfc2544->get_seq();
96                        if (unlikely(pkt_seq != exp_seq)) {
97                            if (pkt_seq < exp_seq) {
98                                if (exp_seq - pkt_seq > 100000) {
99                                    // packet loss while we had wrap around
100                                    curr_rfc2544->inc_seq_err(pkt_seq - exp_seq);
101                                    curr_rfc2544->inc_seq_err_too_big();
102                                    curr_rfc2544->set_seq(pkt_seq + 1);
103                                } else {
104                                    if (pkt_seq == (exp_seq - 1)) {
105                                        curr_rfc2544->inc_dup();
106                                    } else {
107                                        curr_rfc2544->inc_ooo();
108                                        // We thought it was lost, but it was just out of order
109                                        curr_rfc2544->dec_seq_err();
110                                    }
111                                    curr_rfc2544->inc_seq_err_too_low();
112                                }
113                            } else {
114                                if (unlikely (pkt_seq - exp_seq > 100000)) {
115                                    // packet reorder while we had wrap around
116                                    if (pkt_seq == (exp_seq - 1)) {
117                                        curr_rfc2544->inc_dup();
118                                    } else {
119                                        curr_rfc2544->inc_ooo();
120                                        // We thought it was lost, but it was just out of order
121                                        curr_rfc2544->dec_seq_err();
122                                    }
123                                    curr_rfc2544->inc_seq_err_too_low();
124                                } else {
125                                // seq > curr_rfc2544->seq. Assuming lost packets
126                                    curr_rfc2544->inc_seq_err(pkt_seq - exp_seq);
127                                    curr_rfc2544->inc_seq_err_too_big();
128                                    curr_rfc2544->set_seq(pkt_seq + 1);
129                                }
130                            }
131                        } else {
132                            curr_rfc2544->set_seq(pkt_seq + 1);
133                        }
134                        m_rx_pg_stat_payload[hw_id].add_pkts(1);
135                        m_rx_pg_stat_payload[hw_id].add_bytes(m->pkt_len + 4); // +4 for ethernet CRC
136                        uint64_t d = (os_get_hr_tick_64() - fsp_head->time_stamp );
137                        dsec_t ctime = ptime_convert_hr_dsec(d);
138                        curr_rfc2544->add_sample(ctime);
139                    }
140                } else {
141                    hw_id = get_hw_id(ip_id);
142                    if (hw_id < MAX_FLOW_STATS) {
143                        m_rx_pg_stat[hw_id].add_pkts(1);
144                        m_rx_pg_stat[hw_id].add_bytes(m->pkt_len + 4); // +4 for ethernet CRC
145                    }
146                }
147            }
148        }
149    }
150}
151
152void
153RXLatency::reset_stats() {
154    for (int hw_id = 0; hw_id < MAX_FLOW_STATS; hw_id++) {
155        m_rx_pg_stat[hw_id].clear();
156    }
157}
158
159Json::Value
160RXLatency::to_json() const {
161    return Json::objectValue;
162}
163
164/**************************************
165 * RX feature queue
166 *
167 *************************************/
168
169RXPacketBuffer::RXPacketBuffer(uint64_t size) {
170    m_buffer           = nullptr;
171    m_head             = 0;
172    m_tail             = 0;
173    m_size             = (size + 1); // for the empty/full difference 1 slot reserved
174
175    /* generate queue */
176    m_buffer = new RXPacket*[m_size](); // zeroed
177}
178
179RXPacketBuffer::~RXPacketBuffer() {
180    assert(m_buffer);
181
182    while (!is_empty()) {
183        RXPacket *pkt = pop();
184        delete pkt;
185    }
186    delete [] m_buffer;
187}
188
189void
190RXPacketBuffer::push(const rte_mbuf_t *m) {
191    /* if full - pop the oldest */
192    if (is_full()) {
193        delete pop();
194    }
195
196    /* push packet */
197    m_buffer[m_head] = new RXPacket(m);
198    m_head = next(m_head);
199}
200
201RXPacket *
202RXPacketBuffer::pop() {
203    assert(!is_empty());
204
205    RXPacket *pkt = m_buffer[m_tail];
206    m_tail = next(m_tail);
207
208    return pkt;
209}
210
211uint64_t
212RXPacketBuffer::get_element_count() const {
213    if (m_head >= m_tail) {
214        return (m_head - m_tail);
215    } else {
216        return ( get_capacity() - (m_tail - m_head - 1) );
217    }
218}
219
220Json::Value
221RXPacketBuffer::to_json() const {
222
223    Json::Value output = Json::arrayValue;
224
225    int tmp = m_tail;
226    while (tmp != m_head) {
227        RXPacket *pkt = m_buffer[tmp];
228        output.append(pkt->to_json());
229        tmp = next(tmp);
230    }
231
232    return output;
233}
234
235
236void
237RXQueue::start(uint64_t size) {
238    if (m_pkt_buffer) {
239        delete m_pkt_buffer;
240    }
241    m_pkt_buffer = new RXPacketBuffer(size);
242}
243
244void
245RXQueue::stop() {
246    if (m_pkt_buffer) {
247        delete m_pkt_buffer;
248        m_pkt_buffer = NULL;
249    }
250}
251
252const RXPacketBuffer *
253RXQueue::fetch() {
254
255    /* if no buffer or the buffer is empty - give a NULL one */
256    if ( (!m_pkt_buffer) || (m_pkt_buffer->get_element_count() == 0) ) {
257        return nullptr;
258    }
259
260    /* hold a pointer to the old one */
261    RXPacketBuffer *old_buffer = m_pkt_buffer;
262
263    /* replace the old one with a new one and freeze the old */
264    m_pkt_buffer = new RXPacketBuffer(old_buffer->get_capacity());
265
266    return old_buffer;
267}
268
269void
270RXQueue::handle_pkt(const rte_mbuf_t *m) {
271    m_pkt_buffer->push(m);
272}
273
274Json::Value
275RXQueue::to_json() const {
276    assert(m_pkt_buffer != NULL);
277
278    Json::Value output = Json::objectValue;
279
280    output["size"]    = Json::UInt64(m_pkt_buffer->get_capacity());
281    output["count"]   = Json::UInt64(m_pkt_buffer->get_element_count());
282
283    return output;
284}
285
286/**************************************
287 * RX feature recorder
288 *
289 *************************************/
290
291RXPacketRecorder::RXPacketRecorder() {
292    m_writer = NULL;
293    m_count  = 0;
294    m_limit  = 0;
295    m_epoch  = -1;
296
297    m_pending_flush = false;
298}
299
300void
301RXPacketRecorder::start(const std::string &pcap, uint64_t limit) {
302    m_writer = CCapWriterFactory::CreateWriter(LIBPCAP, (char *)pcap.c_str());
303    if (m_writer == NULL) {
304        std::stringstream ss;
305        ss << "unable to create PCAP file: " << pcap;
306        throw TrexException(ss.str());
307    }
308
309    assert(limit > 0);
310
311    m_limit = limit;
312    m_count = 0;
313    m_pending_flush = false;
314    m_pcap_filename = pcap;
315}
316
317void
318RXPacketRecorder::stop() {
319    if (!m_writer) {
320        return;
321    }
322
323    delete m_writer;
324    m_writer = NULL;
325}
326
327void
328RXPacketRecorder::flush_to_disk() {
329
330    if (m_writer && m_pending_flush) {
331        m_writer->flush_to_disk();
332        m_pending_flush = false;
333    }
334}
335
336void
337RXPacketRecorder::handle_pkt(const rte_mbuf_t *m) {
338    if (!m_writer) {
339        return;
340    }
341
342    dsec_t now = now_sec();
343    if (m_epoch < 0) {
344        m_epoch = now;
345    }
346
347    dsec_t dt = now - m_epoch;
348
349    CPktNsecTimeStamp t_c(dt);
350    m_pkt.time_nsec = t_c.m_time_nsec;
351    m_pkt.time_sec  = t_c.m_time_sec;
352
353    const uint8_t *p = rte_pktmbuf_mtod(m, uint8_t *);
354    m_pkt.pkt_len = m->pkt_len;
355    memcpy(m_pkt.raw, p, m->pkt_len);
356
357    m_writer->write_packet(&m_pkt);
358    m_count++;
359    m_pending_flush = true;
360
361    if (m_count == m_limit) {
362        stop();
363    }
364
365}
366
367Json::Value
368RXPacketRecorder::to_json() const {
369    Json::Value output = Json::objectValue;
370
371    output["pcap_filename"] = m_pcap_filename;
372    output["limit"]         = Json::UInt64(m_limit);
373    output["count"]         = Json::UInt64(m_count);
374
375    return output;
376}
377
378/**************************************
379 * Port manager
380 *
381 *************************************/
382
383RXPortManager::RXPortManager() {
384    clear_all_features();
385    m_io          = NULL;
386    m_cpu_dp_u    = NULL;
387}
388
389
390void
391RXPortManager::create(CPortLatencyHWBase *io,
392                      CRFC2544Info *rfc2544,
393                      CRxCoreErrCntrs *err_cntrs,
394                      CCpuUtlDp *cpu_util) {
395    m_io = io;
396    m_cpu_dp_u = cpu_util;
397
398    /* init features */
399    m_latency.create(rfc2544, err_cntrs);
400}
401
402void RXPortManager::handle_pkt(const rte_mbuf_t *m) {
403
404    /* handle features */
405
406    if (is_feature_set(LATENCY)) {
407        m_latency.handle_pkt(m);
408    }
409
410    if (is_feature_set(RECORDER)) {
411        m_recorder.handle_pkt(m);
412    }
413
414    if (is_feature_set(QUEUE)) {
415        m_queue.handle_pkt(m);
416    }
417}
418
419
420int RXPortManager::process_all_pending_pkts(bool flush_rx) {
421
422    rte_mbuf_t *rx_pkts[64];
423
424    /* try to read 64 packets clean up the queue */
425    uint16_t cnt_p = m_io->rx_burst(rx_pkts, 64);
426    if (cnt_p == 0) {
427        return cnt_p;
428    }
429
430
431    m_cpu_dp_u->start_work1();
432
433    for (int j = 0; j < cnt_p; j++) {
434        rte_mbuf_t *m = rx_pkts[j];
435
436        if (!flush_rx) {
437            handle_pkt(m);
438        }
439
440        rte_pktmbuf_free(m);
441    }
442
443    /* commit only if there was work to do ! */
444    m_cpu_dp_u->commit1();
445
446
447    return cnt_p;
448}
449
450void
451RXPortManager::tick() {
452    if (is_feature_set(RECORDER)) {
453        m_recorder.flush_to_disk();
454    }
455}
456
457Json::Value
458RXPortManager::to_json() const {
459    Json::Value output = Json::objectValue;
460
461    if (is_feature_set(LATENCY)) {
462        output["latency"] = m_latency.to_json();
463        output["latency"]["is_active"] = true;
464    } else {
465        output["latency"]["is_active"] = false;
466    }
467
468    if (is_feature_set(RECORDER)) {
469        output["sniffer"] = m_recorder.to_json();
470        output["sniffer"]["is_active"] = true;
471    } else {
472        output["sniffer"]["is_active"] = false;
473    }
474
475    if (is_feature_set(QUEUE)) {
476        output["queue"] = m_queue.to_json();
477        output["queue"]["is_active"] = true;
478    } else {
479        output["queue"]["is_active"] = false;
480    }
481
482    return output;
483}
484