1/*
2  Ido Barnea
3  Cisco Systems, Inc.
4*/
5
6/*
7  Copyright (c) 2015-2017 Cisco Systems, Inc.
8
9  Licensed under the Apache License, Version 2.0 (the "License");
10  you may not use this file except in compliance with the License.
11  You may obtain a copy of the License at
12
13  http://www.apache.org/licenses/LICENSE-2.0
14
15  Unless required by applicable law or agreed to in writing, software
16  distributed under the License is distributed on an "AS IS" BASIS,
17  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  See the License for the specific language governing permissions and
19  limitations under the License.
20*/
21
22/*
23Important classes in this file:
24CFlowStatUserIdInfo - Information about one packet group id
25CFlowStatUserIdMap - Mapping between packet group id (chosen by user) and hardware counter id
26CFlowStatHwIdMap - Mapping between hardware id and packet group id
27CFlowStatRuleMgr - API to users of the file
28
29General idea of operation:
30For each stream needing flow statistics, the user provides packet group id (pg_id). Few streams can have the same pg_id.
31We maintain reference count.
32When doing start_stream, for the first stream in pg_id, hw_id is associated with the pg_id, and relevant hardware rules are
33inserted (on supported hardware). When stopping all streams with the pg_id, the hw_id <--> pg_id mapping is removed, hw_id is
34returned to the free hw_id pool, and hardware rules are removed. Counters for the pg_id are kept.
35If starting streams again, new hw_id will be assigned, and counters will continue from where they stopped. Only When deleting
36all streams using certain pg_id, infromation about this pg_id will be freed.
37
38For each stream we keep state in the m_rx_check.m_hw_id field. Since we keep reference count for certain structs, we want to
39protect from illegal operations, like starting stream while it is already starting, stopping when it is stopped...
40State machine is:
41stream_init: HW_ID_INIT
42stream_add: HW_ID_FREE
43stream_start: legal hw_id (range is 0..MAX_FLOW_STATS)
44stream_stop: HW_ID_FREE
45stream_del: HW_ID_INIT
46 */
47#include <sstream>
48#include <string>
49#include <iostream>
50#include <assert.h>
51#include <os_time.h>
52#include "internal_api/trex_platform_api.h"
53#include "trex_stateless.h"
54#include "trex_stateless_messaging.h"
55#include "trex_stateless_rx_core.h"
56#include "trex_stream.h"
57#include "flow_stat_parser.h"
58#include "flow_stat.h"
59
60
61#define FLOW_STAT_ADD_ALL_PORTS 255
62
63static const uint16_t HW_ID_INIT = UINT16_MAX;
64static const uint16_t HW_ID_FREE = UINT16_MAX - 1;
65static const uint8_t PAYLOAD_RULE_PROTO = 255;
66const uint32_t FLOW_STAT_PAYLOAD_IP_ID = UINT16_MAX;
67
68inline std::string methodName(const std::string& prettyFunction)
69{
70    size_t colons = prettyFunction.find("::");
71    size_t begin = prettyFunction.substr(0,colons).rfind(" ") + 1;
72    size_t end = prettyFunction.rfind("(") - begin;
73
74    return prettyFunction.substr(begin,end) + "()";
75}
76
77#define __METHOD_NAME__ methodName(__PRETTY_FUNCTION__)
78#ifdef __DEBUG_FUNC_ENTRY__
79#define FUNC_ENTRY (std::cout << __METHOD_NAME__ << std::endl);
80#ifdef __STREAM_DUMP__
81#define stream_dump(stream) stream->Dump(stderr)
82#else
83#define stream_dump(stream)
84#endif
85#else
86#define FUNC_ENTRY
87#endif
88
89/************** class CFlowStatUserIdInfo ***************/
90CFlowStatUserIdInfo::CFlowStatUserIdInfo(uint16_t l3_proto, uint8_t l4_proto, uint8_t ipv6_next_h) {
91    memset(m_rx_cntr, 0, sizeof(m_rx_cntr));
92    memset(m_rx_cntr_base, 0, sizeof(m_rx_cntr));
93    memset(m_tx_cntr, 0, sizeof(m_tx_cntr));
94    memset(m_tx_cntr_base, 0, sizeof(m_tx_cntr));
95    m_hw_id = UINT16_MAX;
96    m_l3_proto = l3_proto;
97    m_l4_proto = l4_proto;
98    m_ipv6_next_h = ipv6_next_h;
99    m_ref_count = 1;
100    m_trans_ref_count = 0;
101    m_was_sent = false;
102    for (int i = 0; i < TREX_MAX_PORTS; i++) {
103        m_rx_changed[i] = false;
104        m_tx_changed[i] = false;
105    }
106    m_rfc2544_support = false;
107}
108
109std::ostream& operator<<(std::ostream& os, const CFlowStatUserIdInfo& cf) {
110    os << "hw_id:" << cf.m_hw_id << " l3 proto:" << (uint16_t) cf.m_l3_proto << " ref("
111       << (uint16_t) cf.m_ref_count << "," << (uint16_t) cf.m_trans_ref_count << ")";
112    os << " rx count (";
113    os << cf.m_rx_cntr[0];
114    for (int i = 1; i < TREX_MAX_PORTS; i++) {
115        os << "," << cf.m_rx_cntr[i];
116    }
117    os << ")";
118    os << " rx count base(";
119    os << cf.m_rx_cntr_base[0];
120    for (int i = 1; i < TREX_MAX_PORTS; i++) {
121        os << "," << cf.m_rx_cntr_base[i];
122    }
123    os << ")";
124
125    os << " tx count (";
126    os << cf.m_tx_cntr[0];
127    for (int i = 1; i < TREX_MAX_PORTS; i++) {
128        os << "," << cf.m_tx_cntr[i];
129    }
130    os << ")";
131    os << " tx count base(";
132    os << cf.m_tx_cntr_base[0];
133    for (int i = 1; i < TREX_MAX_PORTS; i++) {
134        os << "," << cf.m_tx_cntr_base[i];
135    }
136    os << ")";
137
138    return os;
139}
140
141void CFlowStatUserIdInfo::add_stream(uint8_t proto) {
142#ifdef __DEBUG_FUNC_ENTRY__
143    std::cout << __METHOD_NAME__ << " proto:" << (uint16_t)proto << std::endl;
144#endif
145
146    if (proto != m_l4_proto)
147        throw TrexFStatEx("Can't use same pg_id for streams with different l4 protocol",
148                                    TrexException::T_FLOW_STAT_PG_ID_DIFF_L4);
149
150    m_ref_count++;
151}
152
153void CFlowStatUserIdInfo::reset_hw_id() {
154    FUNC_ENTRY;
155
156    m_hw_id = UINT16_MAX;
157    // we are not attached to hw. Save packet count of session.
158    // Next session will start counting from 0.
159    for (int i = 0; i < TREX_MAX_PORTS; i++) {
160        m_rx_cntr_base[i] += m_rx_cntr[i];
161        memset(&m_rx_cntr[i], 0, sizeof(m_rx_cntr[0]));
162        m_tx_cntr_base[i] += m_tx_cntr[i];
163        memset(&m_tx_cntr[i], 0, sizeof(m_tx_cntr[0]));
164    }
165}
166
167/************** class CFlowStatUserIdInfoPayload ***************/
168void CFlowStatUserIdInfoPayload::add_stream(uint8_t proto) {
169    throw TrexFStatEx("For payload rules: Can't have two streams with same pg_id, or same stream on more than one port"
170                      , TrexException::T_FLOW_STAT_DUP_PG_ID);
171}
172
173void CFlowStatUserIdInfoPayload::reset_hw_id() {
174    CFlowStatUserIdInfo::reset_hw_id();
175
176    m_seq_err_base += m_rfc2544_info.m_seq_err;
177    m_out_of_order_base += m_rfc2544_info.m_out_of_order;
178    m_dup_base += m_rfc2544_info.m_dup;
179    m_seq_err_ev_big_base += m_rfc2544_info.m_seq_err_ev_big;
180    m_seq_err_ev_low_base += m_rfc2544_info.m_seq_err_ev_low;
181    m_rfc2544_info.m_seq_err = 0;
182    m_rfc2544_info.m_out_of_order = 0;
183    m_rfc2544_info.m_dup = 0;
184    m_rfc2544_info.m_seq_err_ev_big = 0;
185    m_rfc2544_info.m_seq_err_ev_low = 0;
186}
187
188/************** class CFlowStatUserIdMap ***************/
189CFlowStatUserIdMap::CFlowStatUserIdMap() {
190
191}
192
193std::ostream& operator<<(std::ostream& os, const CFlowStatUserIdMap& cf) {
194    std::map<unsigned int, CFlowStatUserIdInfo*>::const_iterator it;
195    for (it = cf.m_map.begin(); it != cf.m_map.end(); it++) {
196        CFlowStatUserIdInfo *user_id_info = it->second;
197        uint32_t user_id = it->first;
198        os << "Flow stat user id info:\n";
199        os << "  " << user_id << ":" << *user_id_info << std::endl;
200    }
201    return os;
202}
203
204uint16_t CFlowStatUserIdMap::get_hw_id(uint32_t user_id) {
205    CFlowStatUserIdInfo *cf = find_user_id(user_id);
206
207    if (cf == NULL) {
208        return HW_ID_FREE;
209    } else {
210        return cf->get_hw_id();
211    }
212}
213
214CFlowStatUserIdInfo *
215CFlowStatUserIdMap::find_user_id(uint32_t user_id) {
216    flow_stat_user_id_map_it_t it = m_map.find(user_id);
217
218    if (it == m_map.end()) {
219        return NULL;
220    } else {
221        return it->second;
222    }
223}
224
225CFlowStatUserIdInfo *
226CFlowStatUserIdMap::add_user_id(uint32_t user_id, uint16_t l3_proto, uint8_t l4_proto, uint8_t ipv6_next_h) {
227#ifdef __DEBUG_FUNC_ENTRY__
228    std::cout << __METHOD_NAME__ << " user id:" << user_id << " proto:" << (uint16_t)proto
229              << std::endl;
230#endif
231
232    CFlowStatUserIdInfo *new_id;
233
234    if (l4_proto == PAYLOAD_RULE_PROTO) {
235        new_id = new CFlowStatUserIdInfoPayload(l3_proto, l4_proto, ipv6_next_h);
236    } else {
237        new_id = new CFlowStatUserIdInfo(l3_proto, l4_proto, ipv6_next_h);
238    }
239    if (new_id != NULL) {
240        std::pair<flow_stat_user_id_map_it_t, bool> ret;
241        ret = m_map.insert(std::pair<uint32_t, CFlowStatUserIdInfo *>(user_id, new_id));
242        if (ret.second == false) {
243            delete new_id;
244            throw TrexFStatEx("packet group id " + std::to_string(user_id) + " already exists"
245                              , TrexException::T_FLOW_STAT_ALREADY_EXIST);
246        }
247        return new_id;
248    } else {
249        throw TrexFStatEx("Failed allocating memory for new statistic counter"
250                          , TrexException::T_FLOW_STAT_ALLOC_FAIL);
251    }
252}
253
254void CFlowStatUserIdMap::add_stream(uint32_t user_id, uint16_t l3_proto, uint8_t l4_proto, uint8_t ipv6_next_h) {
255#ifdef __DEBUG_FUNC_ENTRY__
256    std::cout << __METHOD_NAME__ << " user id:" << user_id << " l3 proto:" << (uint16_t)l3_proto
257              << " l4 proto:" << (uint16_t)l4_proto << " IPv6 next header:" << (uint16_t)ipv6_next_h
258              << std::endl;
259#endif
260
261    CFlowStatUserIdInfo *c_user_id;
262
263    c_user_id = find_user_id(user_id);
264    if (! c_user_id) {
265        // throws exception on error
266        c_user_id = add_user_id(user_id, l3_proto, l4_proto, ipv6_next_h);
267    } else {
268        c_user_id->add_stream(l4_proto);
269    }
270}
271
272int CFlowStatUserIdMap::del_stream(uint32_t user_id) {
273#ifdef __DEBUG_FUNC_ENTRY__
274    std::cout << __METHOD_NAME__ << " user id:" << user_id << std::endl;
275#endif
276
277    CFlowStatUserIdInfo *c_user_id;
278
279    c_user_id = find_user_id(user_id);
280    if (! c_user_id) {
281        throw TrexFStatEx("Trying to delete stream which does not exist"
282                          , TrexException::T_FLOW_STAT_DEL_NON_EXIST);
283    }
284
285    if (c_user_id->del_stream() == 0) {
286        // ref count of this entry became 0. can release this entry.
287        m_map.erase(user_id);
288        delete c_user_id;
289    }
290
291    return 0;
292}
293
294int CFlowStatUserIdMap::start_stream(uint32_t user_id, uint16_t hw_id) {
295#ifdef __DEBUG_FUNC_ENTRY__
296    std::cout << __METHOD_NAME__ << " user id:" << user_id << " hw_id:" << hw_id << std::endl;
297#endif
298
299    CFlowStatUserIdInfo *c_user_id;
300
301    c_user_id = find_user_id(user_id);
302    if (! c_user_id) {
303        throw TrexFStatEx("Internal error: Trying to associate non exist group id " + std::to_string(user_id)
304                          + " to hardware id " + std::to_string(hw_id)
305                          , TrexException::T_FLOW_STAT_ASSOC_NON_EXIST_ID);
306    }
307
308    if (c_user_id->is_hw_id()) {
309        throw TrexFStatEx("Internal error: Trying to associate hw id " + std::to_string(hw_id) + " to user_id "
310                          + std::to_string(user_id) + ", but it is already associated to "
311                          + std::to_string(c_user_id->get_hw_id())
312                          , TrexException::T_FLOW_STAT_ASSOC_OCC_ID);
313    }
314    c_user_id->set_hw_id(hw_id);
315    c_user_id->add_started_stream();
316
317    return 0;
318}
319
320int CFlowStatUserIdMap::start_stream(uint32_t user_id) {
321#ifdef __DEBUG_FUNC_ENTRY__
322    std::cout << __METHOD_NAME__ << " user id:" << user_id << std::endl;
323#endif
324
325    CFlowStatUserIdInfo *c_user_id;
326
327    c_user_id = find_user_id(user_id);
328    if (! c_user_id) {
329        throw TrexFStatEx("Trying to start stream with non exist packet group id " + std::to_string(user_id)
330                          , TrexException::T_FLOW_STAT_NON_EXIST_ID);
331    }
332
333    c_user_id->add_started_stream();
334
335    return 0;
336}
337
338// return: negative number in case of error.
339//         Number of started streams attached to used_id otherwise.
340int CFlowStatUserIdMap::stop_stream(uint32_t user_id) {
341#ifdef __DEBUG_FUNC_ENTRY__
342    std::cout << __METHOD_NAME__ << " user id:" << user_id << std::endl;
343#endif
344
345    CFlowStatUserIdInfo *c_user_id;
346
347    c_user_id = find_user_id(user_id);
348    if (! c_user_id) {
349        throw TrexFStatEx("Trying to stop stream with non exist packet group id" + std::to_string(user_id)
350                          , TrexException::T_FLOW_STAT_NON_EXIST_ID);
351    }
352
353    return c_user_id->stop_started_stream();
354}
355
356bool CFlowStatUserIdMap::is_started(uint32_t user_id) {
357    CFlowStatUserIdInfo *c_user_id;
358
359    c_user_id = find_user_id(user_id);
360    if (! c_user_id) {
361        return false;
362    }
363
364    return c_user_id->is_started();
365}
366
367uint16_t CFlowStatUserIdMap::unmap(uint32_t user_id) {
368#ifdef __DEBUG_FUNC_ENTRY__
369    std::cout << __METHOD_NAME__ << " user id:" << user_id << std::endl;
370#endif
371
372    CFlowStatUserIdInfo *c_user_id;
373
374    c_user_id = find_user_id(user_id);
375    if (! c_user_id) {
376        return UINT16_MAX;
377    }
378    uint16_t old_hw_id = c_user_id->get_hw_id();
379    c_user_id->reset_hw_id();
380
381    return old_hw_id;
382}
383
384/************** class CFlowStatHwIdMap ***************/
385CFlowStatHwIdMap::CFlowStatHwIdMap() {
386    m_map = NULL; // must call create in order to work with the class
387    m_num_free = 0; // to make coverity happy, init this here too.
388}
389
390CFlowStatHwIdMap::~CFlowStatHwIdMap() {
391    delete[] m_map;
392}
393
394void CFlowStatHwIdMap::create(uint16_t size) {
395    m_map = new uint32_t[size];
396    assert (m_map != NULL);
397    m_num_free = size;
398    for (int i = 0; i < size; i++) {
399        m_map[i] = HW_ID_FREE;
400    }
401}
402
403std::ostream& operator<<(std::ostream& os, const CFlowStatHwIdMap& cf) {
404    int count = 0;
405
406    os << "HW id map:\n";
407    os << "  num free:" << cf.m_num_free << std::endl;
408    for (int i = 0; i < MAX_FLOW_STATS; i++) {
409        if (cf.m_map[i] != 0) {
410            count++;
411            os << "(" << i << ":" << cf.m_map[i] << ")";
412            if (count == 10) {
413                os << std::endl;
414                count = 0;
415            }
416        }
417    }
418
419    return os;
420}
421
422uint16_t CFlowStatHwIdMap::find_free_hw_id() {
423    for (int i = 0; i < MAX_FLOW_STATS; i++) {
424        if (m_map[i] == HW_ID_FREE)
425            return i;
426    }
427
428    return HW_ID_FREE;
429}
430
431void CFlowStatHwIdMap::map(uint16_t hw_id, uint32_t user_id) {
432#ifdef __DEBUG_FUNC_ENTRY__
433    std::cout << __METHOD_NAME__ << " hw id:" << hw_id << " user id:" << user_id << std::endl;
434#endif
435
436    m_map[hw_id] = user_id;
437    m_num_free--;
438}
439
440void CFlowStatHwIdMap::unmap(uint16_t hw_id) {
441#ifdef __DEBUG_FUNC_ENTRY__
442    std::cout << __METHOD_NAME__ << " hw id:" << hw_id << std::endl;
443#endif
444
445    m_map[hw_id] = HW_ID_FREE;
446    m_num_free++;
447}
448
449/************** class CFlowStatRuleMgr ***************/
450CFlowStatRuleMgr::CFlowStatRuleMgr() {
451    m_api = NULL;
452    m_max_hw_id = -1;
453    m_max_hw_id_payload = -1;
454    m_num_started_streams = 0;
455    m_ring_to_rx = NULL;
456    m_cap = 0;
457    m_parser_ipid = NULL;
458    m_parser_pl = NULL;
459    m_rx_core = NULL;
460    memset(m_rx_cant_count_err, 0, sizeof(m_rx_cant_count_err));
461    memset(m_tx_cant_count_err, 0, sizeof(m_tx_cant_count_err));
462    m_num_ports = 0; // need to call create to init
463    m_mode = FLOW_STAT_MODE_NORMAL;
464}
465
466CFlowStatRuleMgr::~CFlowStatRuleMgr() {
467    delete m_parser_ipid;
468    delete m_parser_pl;
469#ifdef TREX_SIM
470    // In simulator, nobody handles the messages to RX, so need to free them to have clean valgrind run.
471    if (m_ring_to_rx) {
472        CGenNode *msg = NULL;
473        while (! m_ring_to_rx->isEmpty()) {
474            m_ring_to_rx->Dequeue(msg);
475            delete msg;
476        }
477    }
478#endif
479}
480
481void CFlowStatRuleMgr::create() {
482    uint16_t num_counters, cap, ip_id_base;
483    TrexStateless *tstateless = get_stateless_obj();
484    assert(tstateless);
485
486    m_api = tstateless->get_platform_api();
487    assert(m_api);
488    m_api->get_interface_stat_info(0, num_counters, cap, ip_id_base);
489    m_api->get_port_num(m_num_ports); // This initialize m_num_ports
490    for (uint8_t port = 0; port < m_num_ports; port++) {
491        assert(m_api->reset_hw_flow_stats(port) == 0);
492    }
493    m_hw_id_map.create(num_counters);
494    m_hw_id_map_payload.create(MAX_FLOW_STATS_PAYLOAD);
495    m_ring_to_rx = CMsgIns::Ins()->getCpRx()->getRingCpToDp(0);
496    assert(m_ring_to_rx);
497    m_rx_core = get_rx_sl_core_obj();
498    m_cap = cap;
499    m_ip_id_reserve_base = ip_id_base;
500
501    if ((CGlobalInfo::get_queues_mode() == CGlobalInfo::Q_MODE_ONE_QUEUE)
502        || (CGlobalInfo::get_queues_mode() == CGlobalInfo::Q_MODE_RSS)) {
503        set_mode(FLOW_STAT_MODE_PASS_ALL);
504        m_parser_ipid = new CFlowStatParser(CFlowStatParser::FLOW_STAT_PARSER_MODE_SW);
505        m_parser_pl = new CPassAllParser;
506    } else {
507        m_parser_ipid = m_api->get_flow_stat_parser();
508        m_parser_pl = m_api->get_flow_stat_parser();
509    }
510    assert(m_parser_ipid);
511    assert(m_parser_pl);
512}
513
514std::ostream& operator<<(std::ostream& os, const CFlowStatRuleMgr& cf) {
515    os << "Flow stat rule mgr (" << cf.m_num_ports << ") ports:" << std::endl;
516    os << cf.m_hw_id_map;
517    os << cf.m_hw_id_map_payload;
518    os << cf.m_user_id_map;
519    return os;
520}
521
522int CFlowStatRuleMgr::compile_stream(const TrexStream * stream, CFlowStatParser *parser) {
523#ifdef __DEBUG_FUNC_ENTRY__
524    std::cout << __METHOD_NAME__ << " user id:" << stream->m_rx_check.m_pg_id << " en:";
525    std::cout << stream->m_rx_check.m_enabled << std::endl;
526#endif
527    CFlowStatParser_err_t ret = parser->parse(stream->m_pkt.binary, stream->m_pkt.len);
528
529    if (ret != FSTAT_PARSER_E_OK) {
530        // if we could not parse the packet, but no stat count needed, it is probably OK.
531        if (stream->m_rx_check.m_enabled) {
532            throw TrexFStatEx(parser->get_error_str(ret), TrexException::T_FLOW_STAT_BAD_PKT_FORMAT);
533        } else {
534            return 0;
535        }
536    }
537
538    return 0;
539}
540
541void CFlowStatRuleMgr::copy_state(TrexStream * from, TrexStream * to) {
542    to->m_rx_check.m_hw_id = from->m_rx_check.m_hw_id;
543}
544void CFlowStatRuleMgr::init_stream(TrexStream * stream) {
545    stream->m_rx_check.m_hw_id = HW_ID_INIT;
546}
547
548int CFlowStatRuleMgr::verify_stream(TrexStream * stream) {
549    return add_stream_internal(stream, false);
550}
551
552int CFlowStatRuleMgr::add_stream(TrexStream * stream) {
553    return add_stream_internal(stream, true);
554}
555
556/*
557 * Helper function for adding/verifying streams
558 * stream - stream to act on
559 * do_action - if false, just verify. Do not change any state, or add to database.
560 */
561int CFlowStatRuleMgr::add_stream_internal(TrexStream * stream, bool do_action) {
562#ifdef __DEBUG_FUNC_ENTRY__
563    std::cout << __METHOD_NAME__ << " user id:" << stream->m_rx_check.m_pg_id << std::endl;
564    stream_dump(stream);
565#endif
566
567    if (! stream->m_rx_check.m_enabled) {
568        return 0;
569    }
570
571    // Init everything here, and not in the constructor, since we relay on other objects
572    // By the time a stream is added everything else is initialized.
573    if (! m_api ) {
574        create();
575    }
576
577    TrexPlatformApi::driver_stat_cap_e rule_type = (TrexPlatformApi::driver_stat_cap_e)stream->m_rx_check.m_rule_type;
578
579    if ((m_cap & rule_type) == 0) {
580        throw TrexFStatEx("Interface does not support given rule type", TrexException::T_FLOW_STAT_BAD_RULE_TYPE_FOR_IF);
581    }
582
583    switch(rule_type) {
584    case TrexPlatformApi::IF_STAT_IPV4_ID:
585        uint16_t l3_proto;
586        // compile_stream throws exception if something goes wrong
587        compile_stream(stream, m_parser_ipid);
588
589        if (m_parser_ipid->get_l3_proto(l3_proto) < 0) {
590            throw TrexFStatEx("Failed determining l3 proto for packet", TrexException::T_FLOW_STAT_FAILED_FIND_L3);
591        }
592        uint8_t l4_proto;
593        if (m_parser_ipid->get_l4_proto(l4_proto) < 0) {
594            throw TrexFStatEx("Failed determining l4 proto for packet", TrexException::T_FLOW_STAT_FAILED_FIND_L4);
595        }
596
597
598        // throws exception if there is error
599        if (do_action) {
600            // passing 0 in ipv6_next_h. This is not used currently in stateless.
601            m_user_id_map.add_stream(stream->m_rx_check.m_pg_id, l3_proto, l4_proto, 0);
602        }
603        break;
604    case TrexPlatformApi::IF_STAT_PAYLOAD:
605        uint16_t payload_len;
606        // compile_stream throws exception if something goes wrong
607        compile_stream(stream, m_parser_pl);
608
609        if (m_parser_pl->get_payload_len(stream->m_pkt.binary, stream->m_pkt.len, payload_len) < 0) {
610            throw TrexFStatEx("Failed getting payload len", TrexException::T_FLOW_STAT_BAD_PKT_FORMAT);
611        }
612        if (payload_len < sizeof(struct flow_stat_payload_header)) {
613            throw TrexFStatEx("Need at least " + std::to_string(sizeof(struct latency_header))
614                              + " payload bytes for payload rules. Packet only has " + std::to_string(payload_len) + " bytes"
615                              , TrexException::T_FLOW_STAT_PAYLOAD_TOO_SHORT);
616        }
617        if (do_action) {
618            m_user_id_map.add_stream(stream->m_rx_check.m_pg_id, 0, PAYLOAD_RULE_PROTO, 0);
619        }
620        break;
621    default:
622        throw TrexFStatEx("Wrong rule_type", TrexException::T_FLOW_STAT_BAD_RULE_TYPE);
623        break;
624    }
625    if (do_action) {
626        stream->m_rx_check.m_hw_id = HW_ID_FREE;
627    }
628    return 0;
629}
630
631int CFlowStatRuleMgr::del_stream(TrexStream * stream) {
632#ifdef __DEBUG_FUNC_ENTRY__
633    std::cout << __METHOD_NAME__ << " user id:" << stream->m_rx_check.m_pg_id << std::endl;
634    stream_dump(stream);
635#endif
636
637    if (! stream->m_rx_check.m_enabled) {
638        return 0;
639    }
640
641    if (! m_api)
642        throw TrexFStatEx("Called del_stream, but no stream was added", TrexException::T_FLOW_STAT_NO_STREAMS_EXIST);
643
644    TrexPlatformApi::driver_stat_cap_e rule_type = (TrexPlatformApi::driver_stat_cap_e)stream->m_rx_check.m_rule_type;
645    switch(rule_type) {
646    case TrexPlatformApi::IF_STAT_IPV4_ID:
647        break;
648    case TrexPlatformApi::IF_STAT_PAYLOAD:
649        break;
650    default:
651        throw TrexFStatEx("Wrong rule_type", TrexException::T_FLOW_STAT_BAD_RULE_TYPE);
652        break;
653    }
654
655    // we got del_stream command for a stream which has valid hw_id.
656    // Probably someone forgot to call stop
657    if(stream->m_rx_check.m_hw_id < MAX_FLOW_STATS + MAX_FLOW_STATS_PAYLOAD) {
658        stop_stream(stream);
659    }
660
661    // calling del for same stream twice, or for a stream which was never "added"
662    if(stream->m_rx_check.m_hw_id == HW_ID_INIT) {
663        return 0;
664    }
665    m_user_id_map.del_stream(stream->m_rx_check.m_pg_id); // Throws exception in case of error
666    if (m_user_id_map.is_empty()) {
667        m_max_hw_id = 0;
668        m_max_hw_id_payload = 0;
669    }
670
671    stream->m_rx_check.m_hw_id = HW_ID_INIT;
672
673    return 0;
674}
675
676// called on all streams, when stream start to transmit
677// If stream need flow stat counting, make sure the type of packet is supported, and
678// embed needed info in packet.
679// If stream does not need flow stat counting, make sure it does not interfere with
680// other streams that do need stat counting.
681// Might change the IP ID of the stream packet
682int CFlowStatRuleMgr::start_stream(TrexStream * stream) {
683#ifdef __DEBUG_FUNC_ENTRY__
684    std::cout << __METHOD_NAME__ << " user id:" << stream->m_rx_check.m_pg_id << std::endl;
685    stream_dump(stream);
686#endif
687
688    int ret;
689    // Streams which does not need statistics might be started, before any stream that do
690    // need statistcs, so start_stream might be called before add_stream
691    if (! m_api ) {
692        create();
693    }
694
695    // first handle streams that do not need rx stat
696    if (! stream->m_rx_check.m_enabled) {
697        try {
698            compile_stream(stream, m_parser_ipid);
699        } catch (TrexFStatEx) {
700            // If no statistics needed, and we can't parse the stream, that's OK.
701            return 0;
702        }
703
704        uint32_t ip_id; // 32 bit, since this also supports IPv6
705        if (m_parser_ipid->get_ip_id(ip_id) < 0) {
706            return 0; // if we could not find the ip id, no need to fix
707        }
708        // verify no reserved IP_ID used, and change if needed
709        if (ip_id >= m_ip_id_reserve_base) {
710            m_parser_ipid->set_ip_id(ip_id & 0x0000efff);
711        }
712        return 0;
713    }
714
715    // from here, we know the stream need rx stat
716
717    // Starting a stream which was never added
718    if (stream->m_rx_check.m_hw_id == HW_ID_INIT) {
719        add_stream(stream);
720    }
721
722    if (stream->m_rx_check.m_hw_id < MAX_FLOW_STATS + MAX_FLOW_STATS_PAYLOAD) {
723        throw TrexFStatEx("Starting a stream which was already started"
724                          , TrexException::T_FLOW_STAT_ALREADY_STARTED);
725    }
726
727    TrexPlatformApi::driver_stat_cap_e rule_type = (TrexPlatformApi::driver_stat_cap_e)stream->m_rx_check.m_rule_type;
728
729    if ((m_cap & rule_type) == 0) {
730        throw TrexFStatEx("Interface does not support given rule type", TrexException::T_FLOW_STAT_BAD_RULE_TYPE_FOR_IF);
731    }
732    uint16_t hw_id;
733
734    switch(rule_type) {
735    case TrexPlatformApi::IF_STAT_IPV4_ID:
736        // compile_stream throws exception if something goes wrong
737        if ((ret = compile_stream(stream, m_parser_ipid)) < 0)
738            return ret;
739        break;
740    case TrexPlatformApi::IF_STAT_PAYLOAD:
741        // compile_stream throws exception if something goes wrong
742        if ((ret = compile_stream(stream, m_parser_pl)) < 0)
743            return ret;
744        break;
745    default:
746        throw TrexFStatEx("Wrong rule_type", TrexException::T_FLOW_STAT_BAD_RULE_TYPE);
747        break;
748    }
749
750    if (m_user_id_map.is_started(stream->m_rx_check.m_pg_id)) {
751        m_user_id_map.start_stream(stream->m_rx_check.m_pg_id); // just increase ref count;
752        hw_id = m_user_id_map.get_hw_id(stream->m_rx_check.m_pg_id); // can't fail if we got here
753    } else {
754        if (rule_type == TrexPlatformApi::IF_STAT_IPV4_ID) {
755            hw_id = m_hw_id_map.find_free_hw_id();
756        } else {
757            hw_id = m_hw_id_map_payload.find_free_hw_id();
758        }
759        if (hw_id == HW_ID_FREE) {
760            throw TrexFStatEx("Failed allocating statistic counter. Probably all are used for this rule type."
761                              , TrexException::T_FLOW_STAT_NO_FREE_HW_ID);
762        } else {
763            uint32_t user_id = stream->m_rx_check.m_pg_id;
764            m_user_id_map.start_stream(user_id, hw_id);
765            if (rule_type == TrexPlatformApi::IF_STAT_IPV4_ID) {
766                if (hw_id > m_max_hw_id) {
767                    m_max_hw_id = hw_id;
768                }
769                m_hw_id_map.map(hw_id, user_id);
770                CFlowStatUserIdInfo *uid_info = m_user_id_map.find_user_id(user_id);
771                if (uid_info != NULL) {
772                    add_hw_rule(hw_id, uid_info->get_l3_proto(), uid_info->get_l4_proto(), uid_info->get_ipv6_next_h());
773                }
774            } else {
775                if (hw_id > m_max_hw_id_payload) {
776                    m_max_hw_id_payload = hw_id;
777                }
778                m_hw_id_map_payload.map(hw_id, user_id);
779            }
780            // clear hardware counters. Just in case we have garbage from previous iteration
781            rx_per_flow_t rx_cntr;
782            tx_per_flow_t tx_cntr;
783            rfc2544_info_t rfc2544_info;
784            for (uint8_t port = 0; port < m_num_ports; port++) {
785                m_api->get_flow_stats(port, &rx_cntr, (void *)&tx_cntr, hw_id, hw_id, true, rule_type);
786            }
787            if (rule_type == TrexPlatformApi::IF_STAT_PAYLOAD) {
788                m_api->get_rfc2544_info(&rfc2544_info, hw_id, hw_id, true);
789            }
790        }
791    }
792
793    // saving given hw_id on stream for use by tx statistics count
794    if (rule_type == TrexPlatformApi::IF_STAT_IPV4_ID) {
795        m_parser_ipid->set_ip_id(m_ip_id_reserve_base + hw_id);
796        m_parser_ipid->set_tos_to_cpu();
797        stream->m_rx_check.m_hw_id = hw_id;
798    } else {
799        m_parser_pl->set_ip_id(FLOW_STAT_PAYLOAD_IP_ID);
800        m_parser_pl->set_tos_to_cpu();
801        // for payload rules, we use the range right after ip id rules
802        stream->m_rx_check.m_hw_id = hw_id + MAX_FLOW_STATS;
803    }
804
805#ifdef __DEBUG_FUNC_ENTRY__
806    std::cout << "exit:" << __METHOD_NAME__ << " hw_id:" << hw_id << std::endl;
807    stream_dump(stream);
808#endif
809
810    if (m_num_started_streams == 0) {
811
812        send_start_stop_msg_to_rx(true); // First transmitting stream. Rx core should start reading packets;
813
814        //also good time to zero global counters
815        memset(m_rx_cant_count_err, 0, sizeof(m_rx_cant_count_err));
816        memset(m_tx_cant_count_err, 0, sizeof(m_tx_cant_count_err));
817
818        #if 0
819        // wait to make sure that message is acknowledged. RX core might be in deep sleep mode, and we want to
820        // start transmitting packets only after it is working, otherwise, packets will get lost.
821        if (m_rx_core) { // in simulation, m_rx_core will be NULL
822            int count = 0;
823            while (!m_rx_core->is_working()) {
824                delay(1);
825                count++;
826                if (count == 100) {
827                    throw TrexFStatEx("Critical error!! - RX core failed to start", TrexException::T_FLOW_STAT_RX_CORE_START_FAIL);
828                }
829            }
830        }
831        #endif
832
833    } else {
834        // make sure rx core is working. If not, we got really confused somehow.
835        if (m_rx_core)
836            assert(m_rx_core->is_working());
837    }
838    m_num_started_streams++;
839    return 0;
840}
841
842int CFlowStatRuleMgr::add_hw_rule(uint16_t hw_id, uint16_t l3_proto, uint8_t l4_proto, uint8_t ipv6_next_h) {
843    for (int port = 0; port < m_num_ports; port++) {
844        m_api->add_rx_flow_stat_rule(port, l3_proto, l4_proto, ipv6_next_h, hw_id);
845    }
846
847    return 0;
848}
849
850int CFlowStatRuleMgr::stop_stream(TrexStream * stream) {
851#ifdef __DEBUG_FUNC_ENTRY__
852    std::cout << __METHOD_NAME__ << " user id:" << stream->m_rx_check.m_pg_id << std::endl;
853    stream_dump(stream);
854#endif
855    if (! stream->m_rx_check.m_enabled) {
856        return 0;
857    }
858
859    if (! m_api)
860        throw TrexFStatEx("Called stop_stream, but no stream was added", TrexException::T_FLOW_STAT_NO_STREAMS_EXIST);
861
862    if (stream->m_rx_check.m_hw_id >= MAX_FLOW_STATS + MAX_FLOW_STATS_PAYLOAD) {
863        // We allow stopping while already stopped. Will not hurt us.
864        return 0;
865    }
866
867    TrexPlatformApi::driver_stat_cap_e rule_type = (TrexPlatformApi::driver_stat_cap_e)stream->m_rx_check.m_rule_type;
868    switch(rule_type) {
869    case TrexPlatformApi::IF_STAT_IPV4_ID:
870        break;
871    case TrexPlatformApi::IF_STAT_PAYLOAD:
872        break;
873    default:
874        throw TrexFStatEx("Wrong rule_type", TrexException::T_FLOW_STAT_BAD_RULE_TYPE);
875        break;
876    }
877
878    stream->m_rx_check.m_hw_id = HW_ID_FREE;
879
880    if (m_user_id_map.stop_stream(stream->m_rx_check.m_pg_id) == 0) {
881        // last stream associated with the entry stopped transmittig.
882        // remove user_id <--> hw_id mapping
883        uint16_t hw_id = m_user_id_map.get_hw_id(stream->m_rx_check.m_pg_id);
884        if (hw_id >= MAX_FLOW_STATS) {
885            throw TrexFStatEx("Internal error in stop_stream. Got bad hw_id" + std::to_string(hw_id)
886                              , TrexException::T_FLOW_STAT_BAD_HW_ID);
887        } else {
888            CFlowStatUserIdInfo *p_user_id;
889            // update counters, and reset before unmapping
890            if (rule_type == TrexPlatformApi::IF_STAT_IPV4_ID) {
891                p_user_id = m_user_id_map.find_user_id(m_hw_id_map.get_user_id(hw_id));
892            } else {
893                p_user_id = m_user_id_map.find_user_id(m_hw_id_map_payload.get_user_id(hw_id));
894            }
895            assert(p_user_id != NULL);
896            rx_per_flow_t rx_cntr;
897            tx_per_flow_t tx_cntr;
898            rfc2544_info_t rfc2544_info;
899            for (uint8_t port = 0; port < m_num_ports; port++) {
900                if (rule_type == TrexPlatformApi::IF_STAT_IPV4_ID) {
901                    m_api->del_rx_flow_stat_rule(port, p_user_id->get_l3_proto(), p_user_id->get_l4_proto()
902                                                 , p_user_id->get_ipv6_next_h(), hw_id);
903                }
904                m_api->get_flow_stats(port, &rx_cntr, (void *)&tx_cntr, hw_id, hw_id, true, rule_type);
905                // when stopping, always send counters for stopped stream one last time
906                p_user_id->set_rx_cntr(port, rx_cntr);
907                p_user_id->set_need_to_send_rx(port);
908                p_user_id->set_tx_cntr(port, tx_cntr);
909                p_user_id->set_need_to_send_tx(port);
910            }
911
912            if (rule_type == TrexPlatformApi::IF_STAT_IPV4_ID) {
913                m_hw_id_map.unmap(hw_id);
914            } else {
915                CFlowStatUserIdInfoPayload *p_user_id_p = (CFlowStatUserIdInfoPayload *)p_user_id;
916                Json::Value json;
917                m_api->get_rfc2544_info(&rfc2544_info, hw_id, hw_id, true);
918                p_user_id_p->set_jitter(rfc2544_info.get_jitter());
919                rfc2544_info.get_latency_json(json);
920                p_user_id_p->set_latency_json(json);
921                p_user_id_p->set_seq_err_cnt(rfc2544_info.get_seq_err_cnt());
922                p_user_id_p->set_ooo_cnt(rfc2544_info.get_ooo_cnt());
923                p_user_id_p->set_dup_cnt(rfc2544_info.get_dup_cnt());
924                p_user_id_p->set_seq_err_big_cnt(rfc2544_info.get_seq_err_ev_big());
925                p_user_id_p->set_seq_err_low_cnt(rfc2544_info.get_seq_err_ev_low());
926                m_hw_id_map_payload.unmap(hw_id);
927            }
928            m_user_id_map.unmap(stream->m_rx_check.m_pg_id);
929        }
930    }
931    m_num_started_streams--;
932    assert (m_num_started_streams >= 0);
933    if (m_num_started_streams == 0) {
934        send_start_stop_msg_to_rx(false); // No more transmittig streams. Rx core should get into idle loop.
935    }
936    return 0;
937}
938
939int CFlowStatRuleMgr::get_active_pgids(flow_stat_active_t &result) {
940    flow_stat_user_id_map_it_t it;
941
942    for (it = m_user_id_map.begin(); it != m_user_id_map.end(); it++) {
943        result.insert(it->first);
944    }
945
946    return 0;
947}
948
949int CFlowStatRuleMgr::set_mode(enum flow_stat_mode_e mode) {
950    if ( ! m_user_id_map.is_empty() )
951        return -1;
952
953    if (! m_api ) {
954        create();
955    }
956
957    switch (mode) {
958    case FLOW_STAT_MODE_PASS_ALL:
959        delete m_parser_ipid;
960        delete m_parser_pl;
961        m_parser_ipid = new CFlowStatParser(CFlowStatParser::FLOW_STAT_PARSER_MODE_SW);
962        m_parser_pl = new CPassAllParser;
963        break;
964    case FLOW_STAT_MODE_NORMAL:
965        delete m_parser_ipid;
966        delete m_parser_pl;
967        m_parser_ipid = m_api->get_flow_stat_parser();
968        m_parser_pl = m_api->get_flow_stat_parser();
969        break;
970    default:
971        return -1;
972
973    }
974    assert(m_parser_ipid);
975    assert(m_parser_pl);
976
977    m_mode = mode;
978
979    return 0;
980}
981
982extern bool rx_should_stop;
983void CFlowStatRuleMgr::send_start_stop_msg_to_rx(bool is_start) {
984    TrexStatelessCpToRxMsgBase *msg;
985
986    if (is_start) {
987        static MsgReply<bool> reply;
988        reply.reset();
989
990        msg = new TrexStatelessRxEnableLatency(reply);
991        m_ring_to_rx->Enqueue((CGenNode *)msg);
992
993        /* hold until message was ack'ed - otherwise we might lose packets */
994        if (m_rx_core) {
995            reply.wait_for_reply();
996            assert(m_rx_core->is_working());
997        }
998
999    } else {
1000        msg = new TrexStatelessRxDisableLatency();
1001        m_ring_to_rx->Enqueue((CGenNode *)msg);
1002    }
1003}
1004
1005// return false if no counters changed since last run. true otherwise
1006// s_json - flow statistics json
1007// l_json - latency data json
1008// baseline - If true, send flow statistics fields even if they were not changed since last run
1009// send_all - If true, send data for all pg_ids. This is used for getting statistics in automation API.
1010//            If false, send small amount of pg ids. Used for async interface, for displaying in console
1011bool CFlowStatRuleMgr::dump_json(std::string & s_json, std::string & l_json, bool baseline, bool send_all) {
1012    rx_per_flow_t rx_stats[MAX_FLOW_STATS];
1013    rx_per_flow_t rx_stats_payload[MAX_FLOW_STATS];
1014    tx_per_flow_t tx_stats[MAX_FLOW_STATS];
1015    tx_per_flow_t tx_stats_payload[MAX_FLOW_STATS_PAYLOAD];
1016    rfc2544_info_t rfc2544_info[MAX_FLOW_STATS_PAYLOAD];
1017    CRxCoreErrCntrs rx_err_cntrs;
1018    Json::FastWriter writer;
1019    Json::Value s_root;
1020    Json::Value l_root;
1021
1022    s_root["name"] = "flow_stats";
1023    s_root["type"] = 0;
1024    l_root["name"] = "latency_stats";
1025    l_root["type"] = 0;
1026
1027    if (baseline) {
1028        s_root["baseline"] = true;
1029        l_root["baseline"] = true;
1030    }
1031
1032    Json::Value &s_data_section = s_root["data"];
1033    Json::Value &l_data_section = l_root["data"];
1034    s_data_section["ts"]["value"] = Json::Value::UInt64(os_get_hr_tick_64());
1035    s_data_section["ts"]["freq"] = Json::Value::UInt64(os_get_hr_freq());
1036
1037    if (m_user_id_map.is_empty()) {
1038        s_json = writer.write(s_root);
1039        l_json = writer.write(l_root);
1040        return true;
1041    }
1042
1043    m_api->get_rfc2544_info(rfc2544_info, 0, m_max_hw_id_payload, false);
1044    m_api->get_rx_err_cntrs(&rx_err_cntrs);
1045
1046
1047#if 0
1048    // If we want to send all PG_IDs, in groups of 128, enable this, and remove the restrication
1049    // of sending only 8 in loop of building json message below
1050    static int min_to_send = 0;
1051    static int max_to_send = m_max_hw_id;
1052
1053    min_to_send += 128;
1054    if (min_to_send > m_max_hw_id) {
1055        min_to_send = 0;
1056    }
1057    max_to_send = min_to_send + 128;
1058    if (max_to_send > m_max_hw_id) {
1059        max_to_send = m_max_hw_id;
1060    }
1061
1062    // If asking for "baseline", send everything always.
1063    if (baseline || send_all) {
1064        min_to_send = 0;
1065        max_to_send = m_max_hw_id;
1066    }
1067#endif
1068
1069    int min_to_send = 0;
1070    int max_to_send = m_max_hw_id;
1071
1072    // read hw counters, and update
1073    for (uint8_t port = 0; port < m_num_ports; port++) {
1074        m_api->get_flow_stats(port, rx_stats, (void *)tx_stats, min_to_send, max_to_send, false, TrexPlatformApi::IF_STAT_IPV4_ID);
1075        for (int i = 0; i <= max_to_send - min_to_send; i++) {
1076            if (rx_stats[i].get_pkts() != 0) {
1077                rx_per_flow_t rx_pkts = rx_stats[i];
1078                CFlowStatUserIdInfo *p_user_id = m_user_id_map.find_user_id(m_hw_id_map.get_user_id(i + min_to_send));
1079                if (likely(p_user_id != NULL)) {
1080                    if (p_user_id->get_rx_cntr(port) != rx_pkts) {
1081                        p_user_id->set_rx_cntr(port, rx_pkts);
1082                        p_user_id->set_need_to_send_rx(port);
1083                    }
1084                } else {
1085                    m_rx_cant_count_err[port] += rx_pkts.get_pkts();
1086                    std::cerr <<  __METHOD_NAME__ << i << ":Could not count " << rx_pkts << " rx packets, on port "
1087                              << (uint16_t)port << ", because no mapping was found." << std::endl;
1088                }
1089            }
1090            if (tx_stats[i].get_pkts() != 0) {
1091                tx_per_flow_t tx_pkts = tx_stats[i];
1092                CFlowStatUserIdInfo *p_user_id = m_user_id_map.find_user_id(m_hw_id_map.get_user_id(i + min_to_send));
1093                if (likely(p_user_id != NULL)) {
1094                    if (p_user_id->get_tx_cntr(port) != tx_pkts) {
1095                        p_user_id->set_tx_cntr(port, tx_pkts);
1096                        p_user_id->set_need_to_send_tx(port);
1097                    }
1098                } else {
1099                    m_tx_cant_count_err[port] += tx_pkts.get_pkts();;
1100                    std::cerr <<  __METHOD_NAME__ << i << ":Could not count " << tx_pkts <<  " tx packets on port "
1101                              << (uint16_t)port << ", because no mapping was found." << std::endl;
1102                }
1103            }
1104        }
1105        // payload rules
1106        m_api->get_flow_stats(port, rx_stats_payload, (void *)tx_stats_payload, 0, m_max_hw_id_payload
1107                              , false, TrexPlatformApi::IF_STAT_PAYLOAD);
1108        for (int i = 0; i <= m_max_hw_id_payload; i++) {
1109            if (rx_stats_payload[i].get_pkts() != 0) {
1110                rx_per_flow_t rx_pkts = rx_stats_payload[i];
1111                CFlowStatUserIdInfo *p_user_id = m_user_id_map.find_user_id(m_hw_id_map_payload.get_user_id(i));
1112                if (likely(p_user_id != NULL)) {
1113                    if (p_user_id->get_rx_cntr(port) != rx_pkts) {
1114                        p_user_id->set_rx_cntr(port, rx_pkts);
1115                        p_user_id->set_need_to_send_rx(port);
1116                    }
1117                } else {
1118                    m_rx_cant_count_err[port] += rx_pkts.get_pkts();;
1119                    std::cerr <<  __METHOD_NAME__ << i << ":Could not count " << rx_pkts << " rx payload packets, on port "
1120                              << (uint16_t)port << ", because no mapping was found." << std::endl;
1121                }
1122            }
1123            if (tx_stats_payload[i].get_pkts() != 0) {
1124                tx_per_flow_t tx_pkts = tx_stats_payload[i];
1125                CFlowStatUserIdInfo *p_user_id = m_user_id_map.find_user_id(m_hw_id_map_payload.get_user_id(i));
1126                if (likely(p_user_id != NULL)) {
1127                    if (p_user_id->get_tx_cntr(port) != tx_pkts) {
1128                        p_user_id->set_tx_cntr(port, tx_pkts);
1129                        p_user_id->set_need_to_send_tx(port);
1130                    }
1131                } else {
1132                    m_tx_cant_count_err[port] += tx_pkts.get_pkts();;
1133                    std::cerr <<  __METHOD_NAME__ << i << ":Could not count " << tx_pkts <<  " tx packets on port "
1134                              << (uint16_t)port << ", because no mapping was found." << std::endl;
1135                }
1136            }
1137        }
1138    }
1139
1140    // build json report
1141    // general per port data
1142    for (uint8_t port = 0; port < m_num_ports; port++) {
1143            std::string str_port = static_cast<std::ostringstream*>( &(std::ostringstream() << int(port) ) )->str();
1144            if ((m_rx_cant_count_err[port] != 0) || baseline)
1145                s_data_section["global"]["rx_err"][str_port] = m_rx_cant_count_err[port];
1146            if ((m_tx_cant_count_err[port] != 0) || baseline)
1147                s_data_section["global"]["tx_err"][str_port] = m_tx_cant_count_err[port];
1148    }
1149
1150    // payload rules rx errors
1151    uint64_t tmp_cnt;
1152    tmp_cnt = rx_err_cntrs.get_bad_header();
1153    if (tmp_cnt || baseline) {
1154        l_data_section["global"]["bad_hdr"] = Json::Value::UInt64(tmp_cnt);
1155    }
1156    tmp_cnt = rx_err_cntrs.get_old_flow();
1157    if (tmp_cnt || baseline) {
1158        l_data_section["global"]["old_flow"] = Json::Value::UInt64(tmp_cnt);
1159    }
1160
1161    // TUI only present 4 PG_IDs today, so we send everything only when explicit request comes
1162    // (probably from automation API)
1163    uint16_t max_pg_ids_to_send = 8;
1164    static int times_send_all = 0;
1165    if (send_all) {
1166       times_send_all = 20;
1167    }
1168    if (baseline || (times_send_all > 0)) {
1169        if (times_send_all > 0)
1170            times_send_all--;
1171        max_pg_ids_to_send = UINT16_MAX;
1172    }
1173
1174    flow_stat_user_id_map_it_t it;
1175    for (it = m_user_id_map.begin(); it != m_user_id_map.end(); it++) {
1176        if (max_pg_ids_to_send == 0) {
1177            break;
1178        }
1179        max_pg_ids_to_send--;
1180        bool send_empty = true;
1181        CFlowStatUserIdInfo *user_id_info = it->second;
1182        uint32_t user_id = it->first;
1183        std::string str_user_id = static_cast<std::ostringstream*>( &(std::ostringstream() << user_id) )->str();
1184        if (! user_id_info->was_sent()) {
1185            s_data_section[str_user_id]["first_time"] = true;
1186            user_id_info->set_was_sent(true);
1187            send_empty = false;
1188        }
1189        // flow stat json
1190        for (uint8_t port = 0; port < m_num_ports; port++) {
1191            std::string str_port = static_cast<std::ostringstream*>( &(std::ostringstream() << int(port) ) )->str();
1192            if (user_id_info->need_to_send_rx(port) || baseline) {
1193                user_id_info->set_no_need_to_send_rx(port);
1194                s_data_section[str_user_id]["rx_pkts"][str_port] = Json::Value::UInt64(user_id_info->get_rx_cntr(port).get_pkts());
1195                if (m_cap & TrexPlatformApi::IF_STAT_RX_BYTES_COUNT)
1196                    s_data_section[str_user_id]["rx_bytes"][str_port] = Json::Value::UInt64(user_id_info->get_rx_cntr(port).get_bytes());
1197                send_empty = false;
1198            }
1199            if (user_id_info->need_to_send_tx(port) || baseline) {
1200                user_id_info->set_no_need_to_send_tx(port);
1201                s_data_section[str_user_id]["tx_pkts"][str_port] = Json::Value::UInt64(user_id_info->get_tx_cntr(port).get_pkts());
1202                s_data_section[str_user_id]["tx_bytes"][str_port] = Json::Value::UInt64(user_id_info->get_tx_cntr(port).get_bytes());
1203                send_empty = false;
1204            }
1205        }
1206        if (send_empty) {
1207            s_data_section[str_user_id] = Json::objectValue;
1208        }
1209
1210        // latency info json
1211        if (user_id_info->rfc2544_support()) {
1212            CFlowStatUserIdInfoPayload *user_id_info_p = (CFlowStatUserIdInfoPayload *)user_id_info;
1213            // payload object. Send also latency, jitter...
1214            Json::Value lat_hist = Json::arrayValue;
1215            if (user_id_info->is_hw_id()) {
1216                // if mapped to hw_id, take info from what we just got from rx core
1217                uint16_t hw_id = user_id_info->get_hw_id();
1218                rfc2544_info[hw_id].get_latency_json(lat_hist);
1219                user_id_info_p->set_seq_err_cnt(rfc2544_info[hw_id].get_seq_err_cnt());
1220                user_id_info_p->set_ooo_cnt(rfc2544_info[hw_id].get_ooo_cnt());
1221                user_id_info_p->set_dup_cnt(rfc2544_info[hw_id].get_dup_cnt());
1222                user_id_info_p->set_seq_err_big_cnt(rfc2544_info[hw_id].get_seq_err_ev_big());
1223                user_id_info_p->set_seq_err_low_cnt(rfc2544_info[hw_id].get_seq_err_ev_low());
1224                l_data_section[str_user_id]["latency"] = lat_hist;
1225                l_data_section[str_user_id]["latency"]["jitter"] = rfc2544_info[hw_id].get_jitter_usec();
1226            } else {
1227                // Not mapped to hw_id. Get saved info.
1228                user_id_info_p->get_latency_json(lat_hist);
1229                if (lat_hist != Json::nullValue) {
1230                    l_data_section[str_user_id]["latency"] = lat_hist;
1231                    l_data_section[str_user_id]["latency"]["jitter"] = user_id_info_p->get_jitter_usec();
1232                }
1233            }
1234            l_data_section[str_user_id]["err_cntrs"]["dropped"]
1235                = Json::Value::UInt64(user_id_info_p->get_seq_err_cnt());
1236            l_data_section[str_user_id]["err_cntrs"]["out_of_order"]
1237                = Json::Value::UInt64(user_id_info_p->get_ooo_cnt());
1238            l_data_section[str_user_id]["err_cntrs"]["dup"]
1239                = Json::Value::UInt64(user_id_info_p->get_dup_cnt());
1240            l_data_section[str_user_id]["err_cntrs"]["seq_too_high"]
1241                = Json::Value::UInt64(user_id_info_p->get_seq_err_big_cnt());
1242            l_data_section[str_user_id]["err_cntrs"]["seq_too_low"]
1243                = Json::Value::UInt64(user_id_info_p->get_seq_err_low_cnt());
1244        }
1245    }
1246
1247    s_json = writer.write(s_root);
1248    l_json = writer.write(l_root);
1249    // We always want to publish, even only the timestamp.
1250    return true;
1251}
1252