trex_stateless_rx_core.cpp revision 72b15427
1/*
2  Ido Barnea
3  Cisco Systems, Inc.
4*/
5
6/*
7  Copyright (c) 2016-2016 Cisco Systems, Inc.
8
9  Licensed under the Apache License, Version 2.0 (the "License");
10  you may not use this file except in compliance with the License.
11  You may obtain a copy of the License at
12
13  http://www.apache.org/licenses/LICENSE-2.0
14
15  Unless required by applicable law or agreed to in writing, software
16  distributed under the License is distributed on an "AS IS" BASIS,
17  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  See the License for the specific language governing permissions and
19  limitations under the License.
20*/
21
22#include <stdio.h>
23#include "bp_sim.h"
24#include "flow_stat_parser.h"
25#include "stateful_rx_core.h"
26#include "pal/linux/sanb_atomic.h"
27#include "trex_stateless_messaging.h"
28#include "trex_stateless_rx_core.h"
29#include "trex_stateless.h"
30
31
32void CRFC2544Info::create() {
33    m_latency.Create();
34    m_exp_flow_seq = 0;
35    m_prev_flow_seq = 0;
36    reset();
37}
38
39// after calling stop, packets still arriving will be considered error
40void CRFC2544Info::stop() {
41    m_prev_flow_seq = m_exp_flow_seq;
42    m_exp_flow_seq = FLOW_STAT_PAYLOAD_INITIAL_FLOW_SEQ;
43}
44
45void CRFC2544Info::reset() {
46    // This is the seq num value we expect next packet to have.
47    // Init value should match m_seq_num in CVirtualIFPerSideStats
48    m_seq = UINT32_MAX - 1;  // catch wrap around issues early
49    m_seq_err = 0;
50    m_seq_err_events_too_big = 0;
51    m_seq_err_events_too_low = 0;
52    m_ooo = 0;
53    m_dup = 0;
54    m_latency.Reset();
55    m_jitter.reset();
56}
57
58void CRFC2544Info::export_data(rfc2544_info_t_ &obj) {
59    std::string json_str;
60    Json::Reader reader;
61    Json::Value json;
62
63    obj.set_err_cntrs(m_seq_err, m_ooo, m_dup, m_seq_err_events_too_big, m_seq_err_events_too_low);
64    obj.set_jitter(m_jitter.get_jitter());
65    m_latency.dump_json(json);
66    obj.set_latency_json(json);
67};
68
69void CRxCoreStateless::create(const CRxSlCfg &cfg) {
70    m_capture = false;
71    m_max_ports = cfg.m_max_ports;
72
73    CMessagingManager * cp_rx = CMsgIns::Ins()->getCpRx();
74
75    m_ring_from_cp = cp_rx->getRingCpToDp(0);
76    m_ring_to_cp   = cp_rx->getRingDpToCp(0);
77    m_state = STATE_IDLE;
78
79    for (int i = 0; i < MAX_FLOW_STATS_PAYLOAD; i++) {
80        m_rfc2544[i].create();
81    }
82
83    m_cpu_cp_u.Create(&m_cpu_dp_u);
84
85    /* create per port manager */
86    for (int i = 0; i < m_max_ports; i++) {
87        m_rx_port_mngr[i].create(cfg.m_ports[i],
88                                 m_rfc2544,
89                                 &m_err_cntrs,
90                                 &m_cpu_dp_u);
91    }
92}
93
94void CRxCoreStateless::handle_cp_msg(TrexStatelessCpToRxMsgBase *msg) {
95    msg->handle(this);
96    delete msg;
97}
98
99void CRxCoreStateless::tickle() {
100    m_monitor.tickle();
101}
102
103bool CRxCoreStateless::periodic_check_for_cp_messages() {
104
105    /* tickle the watchdog */
106    tickle();
107
108    /* fast path */
109    if ( likely ( m_ring_from_cp->isEmpty() ) ) {
110        return false;
111    }
112
113    while ( true ) {
114        CGenNode * node = NULL;
115
116        if (m_ring_from_cp->Dequeue(node) != 0) {
117            break;
118        }
119        assert(node);
120        TrexStatelessCpToRxMsgBase * msg = (TrexStatelessCpToRxMsgBase *)node;
121        handle_cp_msg(msg);
122    }
123
124    /* a message might result in a change of state */
125    recalculate_next_state();
126    return true;
127
128}
129
130void CRxCoreStateless::recalculate_next_state() {
131    if (m_state == STATE_QUIT) {
132        return;
133    }
134
135    /* next state is determine by the question are there any ports with active features ? */
136    m_state = (are_any_features_active() ? STATE_WORKING : STATE_IDLE);
137}
138
139bool CRxCoreStateless::are_any_features_active() {
140    for (int i = 0; i < m_max_ports; i++) {
141        if (m_rx_port_mngr[i].has_features_set()) {
142            return true;
143        }
144    }
145
146    return false;
147}
148
149void CRxCoreStateless::idle_state_loop() {
150    const int SHORT_DELAY_MS    = 2;
151    const int LONG_DELAY_MS     = 50;
152    const int DEEP_SLEEP_LIMIT  = 2000;
153
154    int counter = 0;
155
156    while (m_state == STATE_IDLE) {
157        bool had_msg = periodic_check_for_cp_messages();
158        if (had_msg) {
159            counter = 0;
160            continue;
161        } else {
162            flush_all_pending_pkts();
163        }
164
165        /* enter deep sleep only if enough time had passed */
166        if (counter < DEEP_SLEEP_LIMIT) {
167            delay(SHORT_DELAY_MS);
168            counter++;
169        } else {
170            delay(LONG_DELAY_MS);
171        }
172    }
173}
174
175/**
176 * for each port give a tick (for flushing if needed)
177 *
178 */
179void CRxCoreStateless::port_manager_tick() {
180    for (int i = 0; i < m_max_ports; i++) {
181        m_rx_port_mngr[i].tick();
182    }
183}
184
185void CRxCoreStateless::handle_work_stage() {
186
187    /* set the next sync time to */
188    dsec_t sync_time_sec = now_sec() + (1.0 / 1000);
189
190    while (m_state == STATE_WORKING) {
191        process_all_pending_pkts();
192
193        dsec_t now = now_sec();
194
195        if ( (now - sync_time_sec) > 0 ) {
196            periodic_check_for_cp_messages();
197            port_manager_tick();
198            sync_time_sec = now + (1.0 / 1000);
199        }
200
201        rte_pause();
202
203    }
204}
205
206void CRxCoreStateless::start() {
207    /* register a watchdog handle on current core */
208    m_monitor.create("STL RX CORE", 1);
209    TrexWatchDog::getInstance().register_monitor(&m_monitor);
210
211    while (m_state != STATE_QUIT) {
212        switch (m_state) {
213        case STATE_IDLE:
214            idle_state_loop();
215            break;
216        case STATE_WORKING:
217            handle_work_stage();
218            break;
219
220        default:
221            assert(0);
222            break;
223        }
224
225    }
226
227    m_monitor.disable();
228}
229
230void CRxCoreStateless::capture_pkt(rte_mbuf_t *m) {
231
232}
233
234// In VM setup, handle packets coming as messages from DP cores.
235void CRxCoreStateless::handle_rx_queue_msgs(uint8_t thread_id, CNodeRing * r) {
236    while ( true ) {
237        CGenNode * node;
238        if ( r->Dequeue(node) != 0 ) {
239            break;
240        }
241        assert(node);
242
243        CGenNodeMsgBase * msg = (CGenNodeMsgBase *)node;
244        CGenNodeLatencyPktInfo * l_msg;
245        uint8_t msg_type =  msg->m_msg_type;
246        uint8_t rx_port_index;
247
248
249        switch (msg_type) {
250        case CGenNodeMsgBase::LATENCY_PKT:
251            l_msg = (CGenNodeLatencyPktInfo *)msg;
252            assert(l_msg->m_latency_offset == 0xdead);
253            rx_port_index = (thread_id << 1) + (l_msg->m_dir & 1);
254            assert( rx_port_index < m_max_ports );
255
256            m_rx_port_mngr[rx_port_index].handle_pkt((rte_mbuf_t *)l_msg->m_pkt);
257
258            if (m_capture)
259                capture_pkt((rte_mbuf_t *)l_msg->m_pkt);
260            rte_pktmbuf_free((rte_mbuf_t *)l_msg->m_pkt);
261            break;
262        default:
263            printf("ERROR latency-thread message type is not valid %d \n", msg_type);
264            assert(0);
265        }
266
267        CGlobalInfo::free_node(node);
268    }
269}
270
271int CRxCoreStateless::process_all_pending_pkts(bool flush_rx) {
272
273    int total_pkts = 0;
274    for (int i = 0; i < m_max_ports; i++) {
275        total_pkts += m_rx_port_mngr[i].process_all_pending_pkts(flush_rx);
276    }
277
278    return total_pkts;
279
280}
281
282
283void CRxCoreStateless::reset_rx_stats(uint8_t port_id) {
284    m_rx_port_mngr[port_id].clear_stats();
285}
286
287int CRxCoreStateless::get_rx_stats(uint8_t port_id, rx_per_flow_t *rx_stats, int min, int max
288                                   , bool reset, TrexPlatformApi::driver_stat_cap_e type) {
289
290    /* for now only latency stats */
291    m_rx_port_mngr[port_id].get_latency_stats(rx_stats, min, max, reset, type);
292
293    return (0);
294
295}
296
297int CRxCoreStateless::get_rfc2544_info(rfc2544_info_t *rfc2544_info, int min, int max, bool reset) {
298    for (int hw_id = min; hw_id <= max; hw_id++) {
299        CRFC2544Info &curr_rfc2544 = m_rfc2544[hw_id];
300
301        if (reset) {
302            // need to stop first, so count will be consistent
303            curr_rfc2544.stop();
304        }
305
306        curr_rfc2544.sample_period_end();
307        curr_rfc2544.export_data(rfc2544_info[hw_id - min]);
308
309        if (reset) {
310            curr_rfc2544.reset();
311        }
312    }
313    return 0;
314}
315
316int CRxCoreStateless::get_rx_err_cntrs(CRxCoreErrCntrs *rx_err) {
317    *rx_err = m_err_cntrs;
318    return 0;
319}
320
321void CRxCoreStateless::update_cpu_util(){
322    m_cpu_cp_u.Update();
323}
324
325double CRxCoreStateless::get_cpu_util() {
326    return m_cpu_cp_u.GetVal();
327}
328
329
330void
331CRxCoreStateless::start_recorder(uint8_t port_id, const std::string &pcap_filename, uint64_t limit) {
332    m_rx_port_mngr[port_id].start_recorder(pcap_filename, limit);
333    recalculate_next_state();
334}
335
336void
337CRxCoreStateless::stop_recorder(uint8_t port_id) {
338    m_rx_port_mngr[port_id].stop_recorder();
339    recalculate_next_state();
340}
341
342void
343CRxCoreStateless::start_queue(uint8_t port_id, uint64_t size) {
344    m_rx_port_mngr[port_id].start_queue(size);
345    recalculate_next_state();
346}
347
348void
349CRxCoreStateless::stop_queue(uint8_t port_id) {
350    m_rx_port_mngr[port_id].stop_queue();
351    recalculate_next_state();
352}
353
354void
355CRxCoreStateless::enable_latency() {
356    for (int i = 0; i < m_max_ports; i++) {
357        m_rx_port_mngr[i].enable_latency();
358    }
359
360    recalculate_next_state();
361}
362
363void
364CRxCoreStateless::disable_latency() {
365    for (int i = 0; i < m_max_ports; i++) {
366        m_rx_port_mngr[i].disable_latency();
367    }
368
369    recalculate_next_state();
370}
371
372const RXPortManager &
373CRxCoreStateless::get_rx_port_mngr(uint8_t port_id) {
374    assert(port_id < m_max_ports);
375    return m_rx_port_mngr[port_id];
376
377}
378