trex_stateless_rx_core.cpp revision 051a334b
1/*
2  Ido Barnea
3  Cisco Systems, Inc.
4*/
5
6/*
7  Copyright (c) 2016-2016 Cisco Systems, Inc.
8
9  Licensed under the Apache License, Version 2.0 (the "License");
10  you may not use this file except in compliance with the License.
11  You may obtain a copy of the License at
12
13  http://www.apache.org/licenses/LICENSE-2.0
14
15  Unless required by applicable law or agreed to in writing, software
16  distributed under the License is distributed on an "AS IS" BASIS,
17  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  See the License for the specific language governing permissions and
19  limitations under the License.
20*/
21
22#include <stdio.h>
23#include "bp_sim.h"
24#include "flow_stat_parser.h"
25#include "stateful_rx_core.h"
26#include "pal/linux/sanb_atomic.h"
27#include "trex_stateless_messaging.h"
28#include "trex_stateless_rx_core.h"
29#include "trex_stateless.h"
30
31
32void CRFC2544Info::create() {
33    m_latency.Create();
34    m_exp_flow_seq = 0;
35    m_prev_flow_seq = 0;
36    reset();
37}
38
39// after calling stop, packets still arriving will be considered error
40void CRFC2544Info::stop() {
41    m_prev_flow_seq = m_exp_flow_seq;
42    m_exp_flow_seq = FLOW_STAT_PAYLOAD_INITIAL_FLOW_SEQ;
43}
44
45void CRFC2544Info::reset() {
46    // This is the seq num value we expect next packet to have.
47    // Init value should match m_seq_num in CVirtualIFPerSideStats
48    m_seq = UINT32_MAX - 1;  // catch wrap around issues early
49    m_seq_err = 0;
50    m_seq_err_events_too_big = 0;
51    m_seq_err_events_too_low = 0;
52    m_ooo = 0;
53    m_dup = 0;
54    m_latency.Reset();
55    m_jitter.reset();
56}
57
58void CRFC2544Info::export_data(rfc2544_info_t_ &obj) {
59    std::string json_str;
60    Json::Reader reader;
61    Json::Value json;
62
63    obj.set_err_cntrs(m_seq_err, m_ooo, m_dup, m_seq_err_events_too_big, m_seq_err_events_too_low);
64    obj.set_jitter(m_jitter.get_jitter());
65    m_latency.dump_json(json);
66    obj.set_latency_json(json);
67};
68
69void CRxCoreStateless::create(const CRxSlCfg &cfg) {
70    m_capture = false;
71    m_max_ports = cfg.m_max_ports;
72
73    CMessagingManager * cp_rx = CMsgIns::Ins()->getCpRx();
74
75    m_ring_from_cp = cp_rx->getRingCpToDp(0);
76    m_ring_to_cp   = cp_rx->getRingDpToCp(0);
77    m_state = STATE_IDLE;
78
79    for (int i = 0; i < MAX_FLOW_STATS_PAYLOAD; i++) {
80        m_rfc2544[i].create();
81    }
82
83    m_cpu_cp_u.Create(&m_cpu_dp_u);
84
85    /* create per port manager */
86    for (int i = 0; i < m_max_ports; i++) {
87        m_rx_port_mngr[i].create(cfg.m_ports[i],
88                                 m_rfc2544,
89                                 &m_err_cntrs,
90                                 &m_cpu_dp_u);
91    }
92}
93
94void CRxCoreStateless::handle_cp_msg(TrexStatelessCpToRxMsgBase *msg) {
95    msg->handle(this);
96    delete msg;
97}
98
99void CRxCoreStateless::tickle() {
100    m_monitor.tickle();
101}
102
103bool CRxCoreStateless::periodic_check_for_cp_messages() {
104
105    /* tickle the watchdog */
106    tickle();
107
108    /* fast path */
109    if ( likely ( m_ring_from_cp->isEmpty() ) ) {
110        return false;
111    }
112
113    while ( true ) {
114        CGenNode * node = NULL;
115
116        if (m_ring_from_cp->Dequeue(node) != 0) {
117            break;
118        }
119        assert(node);
120        TrexStatelessCpToRxMsgBase * msg = (TrexStatelessCpToRxMsgBase *)node;
121        handle_cp_msg(msg);
122    }
123
124    recalculate_next_state();
125    return true;
126
127}
128
129void CRxCoreStateless::recalculate_next_state() {
130    if (m_state == STATE_QUIT) {
131        return;
132    }
133
134    /* next state is determine by the question are there any ports with active features ? */
135    m_state = (are_any_features_active() ? STATE_WORKING : STATE_IDLE);
136}
137
138bool CRxCoreStateless::are_any_features_active() {
139    for (int i = 0; i < m_max_ports; i++) {
140        if (m_rx_port_mngr[i].has_features_set()) {
141            return true;
142        }
143    }
144
145    return false;
146}
147
148void CRxCoreStateless::idle_state_loop() {
149    const int SHORT_DELAY_MS    = 2;
150    const int LONG_DELAY_MS     = 50;
151    const int DEEP_SLEEP_LIMIT  = 2000;
152
153    int counter = 0;
154
155    while (m_state == STATE_IDLE) {
156        bool had_msg = periodic_check_for_cp_messages();
157        if (had_msg) {
158            counter = 0;
159            continue;
160        } else {
161            flush_all_pending_pkts();
162        }
163
164        /* enter deep sleep only if enough time had passed */
165        if (counter < DEEP_SLEEP_LIMIT) {
166            delay(SHORT_DELAY_MS);
167            counter++;
168        } else {
169            delay(LONG_DELAY_MS);
170        }
171    }
172}
173
174/**
175 * for each port give a tick (for flushing if needed)
176 *
177 */
178void CRxCoreStateless::port_manager_tick() {
179    for (int i = 0; i < m_max_ports; i++) {
180        m_rx_port_mngr[i].tick();
181    }
182}
183
184void CRxCoreStateless::handle_work_stage(bool do_try_rx_queue) {
185    int i = 0;
186    int j = 0;
187
188    while (m_state == STATE_WORKING) {
189
190        if (do_try_rx_queue) {
191            try_rx_queues();
192        }
193
194        process_all_pending_pkts();
195
196        /* TODO: with scheduler, this should be solved better */
197        i++;
198        if (i == 100000) { // approx 10msec
199            i = 0;
200            periodic_check_for_cp_messages(); // m_state might change in here
201
202            j++;
203            if (j == 100) { // approx 1 sec
204                j = 0;
205                port_manager_tick();
206            }
207        }
208
209        rte_pause();
210    }
211}
212
213void CRxCoreStateless::start() {
214    bool do_try_rx_queue = CGlobalInfo::m_options.preview.get_vm_one_queue_enable() ? true : false;
215
216    /* register a watchdog handle on current core */
217    m_monitor.create("STL RX CORE", 1);
218    TrexWatchDog::getInstance().register_monitor(&m_monitor);
219
220    while (m_state != STATE_QUIT) {
221
222        switch (m_state) {
223        case STATE_IDLE:
224            set_working_msg_ack(false);
225            idle_state_loop();
226            set_working_msg_ack(true);
227            break;
228
229        case STATE_WORKING:
230            handle_work_stage(do_try_rx_queue);
231            break;
232
233        default:
234            assert(0);
235            break;
236        }
237
238    }
239
240    m_monitor.disable();
241}
242
243void CRxCoreStateless::capture_pkt(rte_mbuf_t *m) {
244
245}
246
247// In VM setup, handle packets coming as messages from DP cores.
248void CRxCoreStateless::handle_rx_queue_msgs(uint8_t thread_id, CNodeRing * r) {
249    while ( true ) {
250        CGenNode * node;
251        if ( r->Dequeue(node) != 0 ) {
252            break;
253        }
254        assert(node);
255
256        CGenNodeMsgBase * msg = (CGenNodeMsgBase *)node;
257        CGenNodeLatencyPktInfo * l_msg;
258        uint8_t msg_type =  msg->m_msg_type;
259        uint8_t rx_port_index;
260
261
262        switch (msg_type) {
263        case CGenNodeMsgBase::LATENCY_PKT:
264            l_msg = (CGenNodeLatencyPktInfo *)msg;
265            assert(l_msg->m_latency_offset == 0xdead);
266            rx_port_index = (thread_id << 1) + (l_msg->m_dir & 1);
267            assert( rx_port_index < m_max_ports );
268
269            m_rx_port_mngr[rx_port_index].handle_pkt((rte_mbuf_t *)l_msg->m_pkt);
270
271            if (m_capture)
272                capture_pkt((rte_mbuf_t *)l_msg->m_pkt);
273            rte_pktmbuf_free((rte_mbuf_t *)l_msg->m_pkt);
274            break;
275        default:
276            printf("ERROR latency-thread message type is not valid %d \n", msg_type);
277            assert(0);
278        }
279
280        CGlobalInfo::free_node(node);
281    }
282}
283
284// VM mode function. Handle messages from DP
285void CRxCoreStateless::try_rx_queues() {
286
287    CMessagingManager * rx_dp = CMsgIns::Ins()->getRxDp();
288    uint8_t threads=CMsgIns::Ins()->get_num_threads();
289    int ti;
290    for (ti = 0; ti < (int)threads; ti++) {
291        CNodeRing * r = rx_dp->getRingDpToCp(ti);
292        if ( ! r->isEmpty() ) {
293            handle_rx_queue_msgs((uint8_t)ti, r);
294        }
295    }
296}
297
298int CRxCoreStateless::process_all_pending_pkts(bool flush_rx) {
299
300    int total_pkts = 0;
301    for (int i = 0; i < m_max_ports; i++) {
302        total_pkts += m_rx_port_mngr[i].process_all_pending_pkts(flush_rx);
303    }
304
305    return total_pkts;
306
307}
308
309
310void CRxCoreStateless::reset_rx_stats(uint8_t port_id) {
311    m_rx_port_mngr[port_id].clear_stats();
312}
313
314int CRxCoreStateless::get_rx_stats(uint8_t port_id, rx_per_flow_t *rx_stats, int min, int max
315                                   , bool reset, TrexPlatformApi::driver_stat_cap_e type) {
316
317    RXLatency &latency = m_rx_port_mngr[port_id].get_latency();
318
319    for (int hw_id = min; hw_id <= max; hw_id++) {
320        if (type == TrexPlatformApi::IF_STAT_PAYLOAD) {
321            rx_stats[hw_id - min] = latency.m_rx_pg_stat_payload[hw_id];
322        } else {
323            rx_stats[hw_id - min] = latency.m_rx_pg_stat[hw_id];
324        }
325        if (reset) {
326            if (type == TrexPlatformApi::IF_STAT_PAYLOAD) {
327                latency.m_rx_pg_stat_payload[hw_id].clear();
328            } else {
329                latency.m_rx_pg_stat[hw_id].clear();
330            }
331        }
332    }
333    return 0;
334}
335
336int CRxCoreStateless::get_rfc2544_info(rfc2544_info_t *rfc2544_info, int min, int max, bool reset) {
337    for (int hw_id = min; hw_id <= max; hw_id++) {
338        CRFC2544Info &curr_rfc2544 = m_rfc2544[hw_id];
339
340        if (reset) {
341            // need to stop first, so count will be consistent
342            curr_rfc2544.stop();
343        }
344
345        curr_rfc2544.sample_period_end();
346        curr_rfc2544.export_data(rfc2544_info[hw_id - min]);
347
348        if (reset) {
349            curr_rfc2544.reset();
350        }
351    }
352    return 0;
353}
354
355int CRxCoreStateless::get_rx_err_cntrs(CRxCoreErrCntrs *rx_err) {
356    *rx_err = m_err_cntrs;
357    return 0;
358}
359
360void CRxCoreStateless::set_working_msg_ack(bool val) {
361    sanb_smp_memory_barrier();
362    m_ack_start_work_msg = val;
363    sanb_smp_memory_barrier();
364}
365
366
367void CRxCoreStateless::update_cpu_util(){
368    m_cpu_cp_u.Update();
369}
370
371double CRxCoreStateless::get_cpu_util() {
372    return m_cpu_cp_u.GetVal();
373}
374
375
376void
377CRxCoreStateless::start_recorder(uint8_t port_id, const std::string &pcap_filename, uint64_t limit) {
378    m_rx_port_mngr[port_id].start_recorder(pcap_filename, limit);
379}
380
381void
382CRxCoreStateless::stop_recorder(uint8_t port_id) {
383    m_rx_port_mngr[port_id].stop_recorder();
384}
385
386void
387CRxCoreStateless::start_queue(uint8_t port_id, uint64_t size) {
388    m_rx_port_mngr[port_id].start_queue(size);
389}
390
391void
392CRxCoreStateless::stop_queue(uint8_t port_id) {
393    m_rx_port_mngr[port_id].stop_queue();
394}
395
396void
397CRxCoreStateless::enable_latency() {
398    for (int i = 0; i < m_max_ports; i++) {
399        m_rx_port_mngr[i].enable_latency();
400    }
401}
402
403void
404CRxCoreStateless::disable_latency() {
405    for (int i = 0; i < m_max_ports; i++) {
406        m_rx_port_mngr[i].disable_latency();
407    }
408}
409
410const RXPortManager &
411CRxCoreStateless::get_rx_port_mngr(uint8_t port_id) {
412    assert(port_id < m_max_ports);
413    return m_rx_port_mngr[port_id];
414
415}
416