trex_stateless_rx_core.cpp revision 4477491b
1/*
2  Ido Barnea
3  Cisco Systems, Inc.
4*/
5
6/*
7  Copyright (c) 2016-2016 Cisco Systems, Inc.
8
9  Licensed under the Apache License, Version 2.0 (the "License");
10  you may not use this file except in compliance with the License.
11  You may obtain a copy of the License at
12
13  http://www.apache.org/licenses/LICENSE-2.0
14
15  Unless required by applicable law or agreed to in writing, software
16  distributed under the License is distributed on an "AS IS" BASIS,
17  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  See the License for the specific language governing permissions and
19  limitations under the License.
20*/
21
22#include <stdio.h>
23#include "bp_sim.h"
24#include "flow_stat_parser.h"
25#include "stateful_rx_core.h"
26#include "pal/linux/sanb_atomic.h"
27#include "trex_stateless_messaging.h"
28#include "trex_stateless_rx_core.h"
29#include "trex_stateless.h"
30
31
32void CRFC2544Info::create() {
33    m_latency.Create();
34    m_exp_flow_seq = 0;
35    m_prev_flow_seq = 0;
36    reset();
37}
38
39// after calling stop, packets still arriving will be considered error
40void CRFC2544Info::stop() {
41    m_prev_flow_seq = m_exp_flow_seq;
42    m_exp_flow_seq = FLOW_STAT_PAYLOAD_INITIAL_FLOW_SEQ;
43}
44
45void CRFC2544Info::reset() {
46    // This is the seq num value we expect next packet to have.
47    // Init value should match m_seq_num in CVirtualIFPerSideStats
48    m_seq = UINT32_MAX - 1;  // catch wrap around issues early
49    m_seq_err = 0;
50    m_seq_err_events_too_big = 0;
51    m_seq_err_events_too_low = 0;
52    m_ooo = 0;
53    m_dup = 0;
54    m_latency.Reset();
55    m_jitter.reset();
56}
57
58void CRFC2544Info::export_data(rfc2544_info_t_ &obj) {
59    std::string json_str;
60    Json::Reader reader;
61    Json::Value json;
62
63    obj.set_err_cntrs(m_seq_err, m_ooo, m_dup, m_seq_err_events_too_big, m_seq_err_events_too_low);
64    obj.set_jitter(m_jitter.get_jitter());
65    m_latency.dump_json(json);
66    obj.set_latency_json(json);
67};
68
69void CRxCoreStateless::create(const CRxSlCfg &cfg) {
70    m_capture = false;
71    m_max_ports = cfg.m_max_ports;
72
73    CMessagingManager * cp_rx = CMsgIns::Ins()->getCpRx();
74
75    m_ring_from_cp = cp_rx->getRingCpToDp(0);
76    m_ring_to_cp   = cp_rx->getRingDpToCp(0);
77    m_state = STATE_IDLE;
78
79    for (int i = 0; i < MAX_FLOW_STATS_PAYLOAD; i++) {
80        m_rfc2544[i].create();
81    }
82
83    m_cpu_cp_u.Create(&m_cpu_dp_u);
84
85    /* create per port manager */
86    for (int i = 0; i < m_max_ports; i++) {
87        m_rx_port_mngr[i].create(cfg.m_ports[i],
88                                 m_rfc2544,
89                                 &m_err_cntrs,
90                                 &m_cpu_dp_u,
91                                 cfg.m_num_crc_fix_bytes);
92    }
93}
94
95void CRxCoreStateless::handle_cp_msg(TrexStatelessCpToRxMsgBase *msg) {
96    msg->handle(this);
97    delete msg;
98}
99
100void CRxCoreStateless::tickle() {
101    m_monitor.tickle();
102}
103
104bool CRxCoreStateless::periodic_check_for_cp_messages() {
105
106    /* tickle the watchdog */
107    tickle();
108
109    /* fast path */
110    if ( likely ( m_ring_from_cp->isEmpty() ) ) {
111        return false;
112    }
113
114    while ( true ) {
115        CGenNode * node = NULL;
116
117        if (m_ring_from_cp->Dequeue(node) != 0) {
118            break;
119        }
120        assert(node);
121        TrexStatelessCpToRxMsgBase * msg = (TrexStatelessCpToRxMsgBase *)node;
122        handle_cp_msg(msg);
123    }
124
125    /* a message might result in a change of state */
126    recalculate_next_state();
127    return true;
128
129}
130
131void CRxCoreStateless::recalculate_next_state() {
132    if (m_state == STATE_QUIT) {
133        return;
134    }
135
136    /* next state is determine by the question are there any ports with active features ? */
137    m_state = (are_any_features_active() ? STATE_WORKING : STATE_IDLE);
138}
139
140bool CRxCoreStateless::are_any_features_active() {
141    for (int i = 0; i < m_max_ports; i++) {
142        if (m_rx_port_mngr[i].has_features_set()) {
143            return true;
144        }
145    }
146
147    return false;
148}
149
150void CRxCoreStateless::idle_state_loop() {
151    const int SHORT_DELAY_MS    = 2;
152    const int LONG_DELAY_MS     = 50;
153    const int DEEP_SLEEP_LIMIT  = 2000;
154
155    int counter = 0;
156
157    while (m_state == STATE_IDLE) {
158        bool had_msg = periodic_check_for_cp_messages();
159        if (had_msg) {
160            counter = 0;
161            continue;
162        } else {
163            flush_all_pending_pkts();
164        }
165
166        /* enter deep sleep only if enough time had passed */
167        if (counter < DEEP_SLEEP_LIMIT) {
168            delay(SHORT_DELAY_MS);
169            counter++;
170        } else {
171            delay(LONG_DELAY_MS);
172        }
173    }
174}
175
176/**
177 * for each port give a tick (for flushing if needed)
178 *
179 */
180void CRxCoreStateless::port_manager_tick() {
181    for (int i = 0; i < m_max_ports; i++) {
182        m_rx_port_mngr[i].tick();
183    }
184}
185
186void CRxCoreStateless::handle_work_stage() {
187
188    /* set the next sync time to */
189    dsec_t sync_time_sec = now_sec() + (1.0 / 1000);
190
191    while (m_state == STATE_WORKING) {
192        process_all_pending_pkts();
193
194        dsec_t now = now_sec();
195
196        if ( (now - sync_time_sec) > 0 ) {
197            periodic_check_for_cp_messages();
198            port_manager_tick();
199            sync_time_sec = now + (1.0 / 1000);
200        }
201
202        rte_pause();
203
204    }
205}
206
207void CRxCoreStateless::start() {
208    /* register a watchdog handle on current core */
209    m_monitor.create("STL RX CORE", 1);
210    TrexWatchDog::getInstance().register_monitor(&m_monitor);
211
212    while (m_state != STATE_QUIT) {
213        switch (m_state) {
214        case STATE_IDLE:
215            idle_state_loop();
216            break;
217        case STATE_WORKING:
218            handle_work_stage();
219            break;
220
221        default:
222            assert(0);
223            break;
224        }
225
226    }
227
228    m_monitor.disable();
229}
230
231void CRxCoreStateless::capture_pkt(rte_mbuf_t *m) {
232
233}
234
235int CRxCoreStateless::process_all_pending_pkts(bool flush_rx) {
236
237    int total_pkts = 0;
238    for (int i = 0; i < m_max_ports; i++) {
239        total_pkts += m_rx_port_mngr[i].process_all_pending_pkts(flush_rx);
240    }
241
242    return total_pkts;
243
244}
245
246void CRxCoreStateless::reset_rx_stats(uint8_t port_id) {
247    m_rx_port_mngr[port_id].clear_stats();
248}
249
250int CRxCoreStateless::get_rx_stats(uint8_t port_id, rx_per_flow_t *rx_stats, int min, int max
251                                   , bool reset, TrexPlatformApi::driver_stat_cap_e type) {
252
253    /* for now only latency stats */
254    m_rx_port_mngr[port_id].get_latency_stats(rx_stats, min, max, reset, type);
255
256    return (0);
257
258}
259
260int CRxCoreStateless::get_rfc2544_info(rfc2544_info_t *rfc2544_info, int min, int max, bool reset) {
261    for (int hw_id = min; hw_id <= max; hw_id++) {
262        CRFC2544Info &curr_rfc2544 = m_rfc2544[hw_id];
263
264        if (reset) {
265            // need to stop first, so count will be consistent
266            curr_rfc2544.stop();
267        }
268
269        curr_rfc2544.sample_period_end();
270        curr_rfc2544.export_data(rfc2544_info[hw_id - min]);
271
272        if (reset) {
273            curr_rfc2544.reset();
274        }
275    }
276    return 0;
277}
278
279int CRxCoreStateless::get_rx_err_cntrs(CRxCoreErrCntrs *rx_err) {
280    *rx_err = m_err_cntrs;
281    return 0;
282}
283
284void CRxCoreStateless::update_cpu_util(){
285    m_cpu_cp_u.Update();
286}
287
288double CRxCoreStateless::get_cpu_util() {
289    return m_cpu_cp_u.GetVal();
290}
291
292
293void
294CRxCoreStateless::start_recorder(uint8_t port_id, const std::string &pcap_filename, uint64_t limit) {
295    m_rx_port_mngr[port_id].start_recorder(pcap_filename, limit);
296    recalculate_next_state();
297}
298
299void
300CRxCoreStateless::stop_recorder(uint8_t port_id) {
301    m_rx_port_mngr[port_id].stop_recorder();
302    recalculate_next_state();
303}
304
305void
306CRxCoreStateless::start_queue(uint8_t port_id, uint64_t size) {
307    m_rx_port_mngr[port_id].start_queue(size);
308    recalculate_next_state();
309}
310
311void
312CRxCoreStateless::stop_queue(uint8_t port_id) {
313    m_rx_port_mngr[port_id].stop_queue();
314    recalculate_next_state();
315}
316
317void
318CRxCoreStateless::enable_latency() {
319    for (int i = 0; i < m_max_ports; i++) {
320        m_rx_port_mngr[i].enable_latency();
321    }
322
323    recalculate_next_state();
324}
325
326void
327CRxCoreStateless::disable_latency() {
328    for (int i = 0; i < m_max_ports; i++) {
329        m_rx_port_mngr[i].disable_latency();
330    }
331
332    recalculate_next_state();
333}
334
335const RXPortManager &
336CRxCoreStateless::get_rx_port_mngr(uint8_t port_id) {
337    assert(port_id < m_max_ports);
338    return m_rx_port_mngr[port_id];
339
340}
341