flow_stat.cpp revision 58f1ee52
1/*
2  Ido Barnea
3  Cisco Systems, Inc.
4*/
5
6/*
7  Copyright (c) 2015-2016 Cisco Systems, Inc.
8
9  Licensed under the Apache License, Version 2.0 (the "License");
10  you may not use this file except in compliance with the License.
11  You may obtain a copy of the License at
12
13  http://www.apache.org/licenses/LICENSE-2.0
14
15  Unless required by applicable law or agreed to in writing, software
16  distributed under the License is distributed on an "AS IS" BASIS,
17  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  See the License for the specific language governing permissions and
19  limitations under the License.
20*/
21
22/*
23Important classes in this file:
24CFlowStatUserIdInfo - Information about one packet group id
25CFlowStatUserIdMap - Mapping between packet group id (chosen by user) and hardware counter id
26CFlowStatHwIdMap - Mapping between hardware id and packet group id
27CFlowStatRuleMgr - API to users of the file
28
29General idea of operation:
30For each stream needing flow statistics, the user provides packet group id (pg_id). Few streams can have the same pg_id.
31We maintain reference count.
32When doing start_stream, for the first stream in pg_id, hw_id is associated with the pg_id, and relevant hardware rules are
33inserted (on supported hardware). When stopping all streams with the pg_id, the hw_id <--> pg_id mapping is removed, hw_id is
34returned to the free hw_id pool, and hardware rules are removed. Counters for the pg_id are kept.
35If starting streams again, new hw_id will be assigned, and counters will continue from where they stopped. Only When deleting
36all streams using certain pg_id, infromation about this pg_id will be freed.
37
38For each stream we keep state in the m_rx_check.m_hw_id field. Since we keep reference count for certain structs, we want to
39protect from illegal operations, like starting stream while it is already starting, stopping when it is stopped...
40State machine is:
41stream_init: HW_ID_INIT
42stream_add: HW_ID_FREE
43stream_start: legal hw_id (range is 0..MAX_FLOW_STATS)
44stream_stop: HW_ID_FREE
45stream_del: HW_ID_INIT
46 */
47#include <sstream>
48#include <string>
49#include <iostream>
50#include <assert.h>
51#include <os_time.h>
52#include "internal_api/trex_platform_api.h"
53#include "trex_stateless.h"
54#include "trex_stateless_messaging.h"
55#include "trex_stateless_rx_core.h"
56#include "trex_stream.h"
57#include "flow_stat_parser.h"
58#include "flow_stat.h"
59
60
61#define FLOW_STAT_ADD_ALL_PORTS 255
62
63static const uint16_t HW_ID_INIT = UINT16_MAX;
64static const uint16_t HW_ID_FREE = UINT16_MAX - 1;
65static const uint8_t PAYLOAD_RULE_PROTO = 255;
66const uint32_t FLOW_STAT_PAYLOAD_IP_ID = IP_ID_RESERVE_BASE + MAX_FLOW_STATS;
67
68inline std::string methodName(const std::string& prettyFunction)
69{
70    size_t colons = prettyFunction.find("::");
71    size_t begin = prettyFunction.substr(0,colons).rfind(" ") + 1;
72    size_t end = prettyFunction.rfind("(") - begin;
73
74    return prettyFunction.substr(begin,end) + "()";
75}
76
77#define __METHOD_NAME__ methodName(__PRETTY_FUNCTION__)
78#ifdef __DEBUG_FUNC_ENTRY__
79#define FUNC_ENTRY (std::cout << __METHOD_NAME__ << std::endl);
80#ifdef __STREAM_DUMP__
81#define stream_dump(stream) stream->Dump(stderr)
82#else
83#define stream_dump(stream)
84#endif
85#else
86#define FUNC_ENTRY
87#endif
88
89/************** class CFlowStatUserIdInfo ***************/
90CFlowStatUserIdInfo::CFlowStatUserIdInfo(uint16_t l3_proto, uint8_t l4_proto, uint8_t ipv6_next_h) {
91    memset(m_rx_cntr, 0, sizeof(m_rx_cntr));
92    memset(m_rx_cntr_base, 0, sizeof(m_rx_cntr));
93    memset(m_tx_cntr, 0, sizeof(m_tx_cntr));
94    memset(m_tx_cntr_base, 0, sizeof(m_tx_cntr));
95    m_hw_id = UINT16_MAX;
96    m_l3_proto = l3_proto;
97    m_l4_proto = l4_proto;
98    m_ipv6_next_h = ipv6_next_h;
99    m_ref_count = 1;
100    m_trans_ref_count = 0;
101    m_was_sent = false;
102    for (int i = 0; i < TREX_MAX_PORTS; i++) {
103        m_rx_changed[i] = false;
104        m_tx_changed[i] = false;
105    }
106    m_rfc2544_support = false;
107}
108
109std::ostream& operator<<(std::ostream& os, const CFlowStatUserIdInfo& cf) {
110    os << "hw_id:" << cf.m_hw_id << " l3 proto:" << (uint16_t) cf.m_l3_proto << " ref("
111       << (uint16_t) cf.m_ref_count << "," << (uint16_t) cf.m_trans_ref_count << ")";
112    os << " rx count (";
113    os << cf.m_rx_cntr[0];
114    for (int i = 1; i < TREX_MAX_PORTS; i++) {
115        os << "," << cf.m_rx_cntr[i];
116    }
117    os << ")";
118    os << " rx count base(";
119    os << cf.m_rx_cntr_base[0];
120    for (int i = 1; i < TREX_MAX_PORTS; i++) {
121        os << "," << cf.m_rx_cntr_base[i];
122    }
123    os << ")";
124
125    os << " tx count (";
126    os << cf.m_tx_cntr[0];
127    for (int i = 1; i < TREX_MAX_PORTS; i++) {
128        os << "," << cf.m_tx_cntr[i];
129    }
130    os << ")";
131    os << " tx count base(";
132    os << cf.m_tx_cntr_base[0];
133    for (int i = 1; i < TREX_MAX_PORTS; i++) {
134        os << "," << cf.m_tx_cntr_base[i];
135    }
136    os << ")";
137
138    return os;
139}
140
141void CFlowStatUserIdInfo::add_stream(uint8_t proto) {
142#ifdef __DEBUG_FUNC_ENTRY__
143    std::cout << __METHOD_NAME__ << " proto:" << (uint16_t)proto << std::endl;
144#endif
145
146    if (proto != m_l4_proto)
147        throw TrexFStatEx("Can't use same pg_id for streams with different l4 protocol",
148                                    TrexException::T_FLOW_STAT_PG_ID_DIFF_L4);
149
150    m_ref_count++;
151}
152
153void CFlowStatUserIdInfo::reset_hw_id() {
154    FUNC_ENTRY;
155
156    m_hw_id = UINT16_MAX;
157    // we are not attached to hw. Save packet count of session.
158    // Next session will start counting from 0.
159    for (int i = 0; i < TREX_MAX_PORTS; i++) {
160        m_rx_cntr_base[i] += m_rx_cntr[i];
161        memset(&m_rx_cntr[i], 0, sizeof(m_rx_cntr[0]));
162        m_tx_cntr_base[i] += m_tx_cntr[i];
163        memset(&m_tx_cntr[i], 0, sizeof(m_tx_cntr[0]));
164    }
165}
166
167/************** class CFlowStatUserIdInfoPayload ***************/
168void CFlowStatUserIdInfoPayload::add_stream(uint8_t proto) {
169    throw TrexFStatEx("For payload rules: Can't have two streams with same pg_id, or same stream on more than one port"
170                      , TrexException::T_FLOW_STAT_DUP_PG_ID);
171}
172
173void CFlowStatUserIdInfoPayload::reset_hw_id() {
174    CFlowStatUserIdInfo::reset_hw_id();
175
176    m_seq_err_base += m_rfc2544_info.m_seq_err;
177    m_out_of_order_base += m_rfc2544_info.m_out_of_order;
178    m_dup_base += m_rfc2544_info.m_dup;
179    m_seq_err_ev_big_base += m_rfc2544_info.m_seq_err_ev_big;
180    m_seq_err_ev_low_base += m_rfc2544_info.m_seq_err_ev_low;
181    m_rfc2544_info.m_seq_err = 0;
182    m_rfc2544_info.m_out_of_order = 0;
183    m_rfc2544_info.m_dup = 0;
184    m_rfc2544_info.m_seq_err_ev_big = 0;
185    m_rfc2544_info.m_seq_err_ev_low = 0;
186}
187
188/************** class CFlowStatUserIdMap ***************/
189CFlowStatUserIdMap::CFlowStatUserIdMap() {
190
191}
192
193std::ostream& operator<<(std::ostream& os, const CFlowStatUserIdMap& cf) {
194    std::map<unsigned int, CFlowStatUserIdInfo*>::const_iterator it;
195    for (it = cf.m_map.begin(); it != cf.m_map.end(); it++) {
196        CFlowStatUserIdInfo *user_id_info = it->second;
197        uint32_t user_id = it->first;
198        os << "Flow stat user id info:\n";
199        os << "  " << user_id << ":" << *user_id_info << std::endl;
200    }
201    return os;
202}
203
204uint16_t CFlowStatUserIdMap::get_hw_id(uint32_t user_id) {
205    CFlowStatUserIdInfo *cf = find_user_id(user_id);
206
207    if (cf == NULL) {
208        return HW_ID_FREE;
209    } else {
210        return cf->get_hw_id();
211    }
212}
213
214CFlowStatUserIdInfo *
215CFlowStatUserIdMap::find_user_id(uint32_t user_id) {
216    flow_stat_user_id_map_it_t it = m_map.find(user_id);
217
218    if (it == m_map.end()) {
219        return NULL;
220    } else {
221        return it->second;
222    }
223}
224
225CFlowStatUserIdInfo *
226CFlowStatUserIdMap::add_user_id(uint32_t user_id, uint16_t l3_proto, uint8_t l4_proto, uint8_t ipv6_next_h) {
227#ifdef __DEBUG_FUNC_ENTRY__
228    std::cout << __METHOD_NAME__ << " user id:" << user_id << " proto:" << (uint16_t)proto
229              << std::endl;
230#endif
231
232    CFlowStatUserIdInfo *new_id;
233
234    if (l4_proto == PAYLOAD_RULE_PROTO) {
235        new_id = new CFlowStatUserIdInfoPayload(l3_proto, l4_proto, ipv6_next_h);
236    } else {
237        new_id = new CFlowStatUserIdInfo(l3_proto, l4_proto, ipv6_next_h);
238    }
239    if (new_id != NULL) {
240        std::pair<flow_stat_user_id_map_it_t, bool> ret;
241        ret = m_map.insert(std::pair<uint32_t, CFlowStatUserIdInfo *>(user_id, new_id));
242        if (ret.second == false) {
243            delete new_id;
244            throw TrexFStatEx("packet group id " + std::to_string(user_id) + " already exists"
245                              , TrexException::T_FLOW_STAT_ALREADY_EXIST);
246        }
247        return new_id;
248    } else {
249        throw TrexFStatEx("Failed allocating memory for new statistic counter"
250                          , TrexException::T_FLOW_STAT_ALLOC_FAIL);
251    }
252}
253
254void CFlowStatUserIdMap::add_stream(uint32_t user_id, uint16_t l3_proto, uint8_t l4_proto, uint8_t ipv6_next_h) {
255#ifdef __DEBUG_FUNC_ENTRY__
256    std::cout << __METHOD_NAME__ << " user id:" << user_id << " l3 proto:" << (uint16_t)l3_proto
257              << " l4 proto:" << (uint16_t)l4_proto << " IPv6 next header:" << (uint16_t)ipv6_next_h
258              << std::endl;
259#endif
260
261    CFlowStatUserIdInfo *c_user_id;
262
263    c_user_id = find_user_id(user_id);
264    if (! c_user_id) {
265        // throws exception on error
266        c_user_id = add_user_id(user_id, l3_proto, l4_proto, ipv6_next_h);
267    } else {
268        c_user_id->add_stream(l4_proto);
269    }
270}
271
272int CFlowStatUserIdMap::del_stream(uint32_t user_id) {
273#ifdef __DEBUG_FUNC_ENTRY__
274    std::cout << __METHOD_NAME__ << " user id:" << user_id << std::endl;
275#endif
276
277    CFlowStatUserIdInfo *c_user_id;
278
279    c_user_id = find_user_id(user_id);
280    if (! c_user_id) {
281        throw TrexFStatEx("Trying to delete stream which does not exist"
282                          , TrexException::T_FLOW_STAT_DEL_NON_EXIST);
283    }
284
285    if (c_user_id->del_stream() == 0) {
286        // ref count of this entry became 0. can release this entry.
287        m_map.erase(user_id);
288        delete c_user_id;
289    }
290
291    return 0;
292}
293
294int CFlowStatUserIdMap::start_stream(uint32_t user_id, uint16_t hw_id) {
295#ifdef __DEBUG_FUNC_ENTRY__
296    std::cout << __METHOD_NAME__ << " user id:" << user_id << " hw_id:" << hw_id << std::endl;
297#endif
298
299    CFlowStatUserIdInfo *c_user_id;
300
301    c_user_id = find_user_id(user_id);
302    if (! c_user_id) {
303        throw TrexFStatEx("Internal error: Trying to associate non exist group id " + std::to_string(user_id)
304                          + " to hardware id " + std::to_string(hw_id)
305                          , TrexException::T_FLOW_STAT_ASSOC_NON_EXIST_ID);
306    }
307
308    if (c_user_id->is_hw_id()) {
309        throw TrexFStatEx("Internal error: Trying to associate hw id " + std::to_string(hw_id) + " to user_id "
310                          + std::to_string(user_id) + ", but it is already associated to "
311                          + std::to_string(c_user_id->get_hw_id())
312                          , TrexException::T_FLOW_STAT_ASSOC_OCC_ID);
313    }
314    c_user_id->set_hw_id(hw_id);
315    c_user_id->add_started_stream();
316
317    return 0;
318}
319
320int CFlowStatUserIdMap::start_stream(uint32_t user_id) {
321#ifdef __DEBUG_FUNC_ENTRY__
322    std::cout << __METHOD_NAME__ << " user id:" << user_id << std::endl;
323#endif
324
325    CFlowStatUserIdInfo *c_user_id;
326
327    c_user_id = find_user_id(user_id);
328    if (! c_user_id) {
329        throw TrexFStatEx("Trying to start stream with non exist packet group id " + std::to_string(user_id)
330                          , TrexException::T_FLOW_STAT_NON_EXIST_ID);
331    }
332
333    c_user_id->add_started_stream();
334
335    return 0;
336}
337
338// return: negative number in case of error.
339//         Number of started streams attached to used_id otherwise.
340int CFlowStatUserIdMap::stop_stream(uint32_t user_id) {
341#ifdef __DEBUG_FUNC_ENTRY__
342    std::cout << __METHOD_NAME__ << " user id:" << user_id << std::endl;
343#endif
344
345    CFlowStatUserIdInfo *c_user_id;
346
347    c_user_id = find_user_id(user_id);
348    if (! c_user_id) {
349        throw TrexFStatEx("Trying to stop stream with non exist packet group id" + std::to_string(user_id)
350                          , TrexException::T_FLOW_STAT_NON_EXIST_ID);
351    }
352
353    return c_user_id->stop_started_stream();
354}
355
356bool CFlowStatUserIdMap::is_started(uint32_t user_id) {
357    CFlowStatUserIdInfo *c_user_id;
358
359    c_user_id = find_user_id(user_id);
360    if (! c_user_id) {
361        return false;
362    }
363
364    return c_user_id->is_started();
365}
366
367uint16_t CFlowStatUserIdMap::unmap(uint32_t user_id) {
368#ifdef __DEBUG_FUNC_ENTRY__
369    std::cout << __METHOD_NAME__ << " user id:" << user_id << std::endl;
370#endif
371
372    CFlowStatUserIdInfo *c_user_id;
373
374    c_user_id = find_user_id(user_id);
375    if (! c_user_id) {
376        return UINT16_MAX;
377    }
378    uint16_t old_hw_id = c_user_id->get_hw_id();
379    c_user_id->reset_hw_id();
380
381    return old_hw_id;
382}
383
384/************** class CFlowStatHwIdMap ***************/
385CFlowStatHwIdMap::CFlowStatHwIdMap() {
386    m_map = NULL; // must call create in order to work with the class
387    m_num_free = 0; // to make coverity happy, init this here too.
388}
389
390CFlowStatHwIdMap::~CFlowStatHwIdMap() {
391    delete[] m_map;
392}
393
394void CFlowStatHwIdMap::create(uint16_t size) {
395    m_map = new uint32_t[size];
396    assert (m_map != NULL);
397    m_num_free = size;
398    for (int i = 0; i < size; i++) {
399        m_map[i] = HW_ID_FREE;
400    }
401}
402
403std::ostream& operator<<(std::ostream& os, const CFlowStatHwIdMap& cf) {
404    int count = 0;
405
406    os << "HW id map:\n";
407    os << "  num free:" << cf.m_num_free << std::endl;
408    for (int i = 0; i < MAX_FLOW_STATS; i++) {
409        if (cf.m_map[i] != 0) {
410            count++;
411            os << "(" << i << ":" << cf.m_map[i] << ")";
412            if (count == 10) {
413                os << std::endl;
414                count = 0;
415            }
416        }
417    }
418
419    return os;
420}
421
422uint16_t CFlowStatHwIdMap::find_free_hw_id() {
423    for (int i = 0; i < MAX_FLOW_STATS; i++) {
424        if (m_map[i] == HW_ID_FREE)
425            return i;
426    }
427
428    return HW_ID_FREE;
429}
430
431void CFlowStatHwIdMap::map(uint16_t hw_id, uint32_t user_id) {
432#ifdef __DEBUG_FUNC_ENTRY__
433    std::cout << __METHOD_NAME__ << " hw id:" << hw_id << " user id:" << user_id << std::endl;
434#endif
435
436    m_map[hw_id] = user_id;
437    m_num_free--;
438}
439
440void CFlowStatHwIdMap::unmap(uint16_t hw_id) {
441#ifdef __DEBUG_FUNC_ENTRY__
442    std::cout << __METHOD_NAME__ << " hw id:" << hw_id << std::endl;
443#endif
444
445    m_map[hw_id] = HW_ID_FREE;
446    m_num_free++;
447}
448
449/************** class CFlowStatRuleMgr ***************/
450CFlowStatRuleMgr::CFlowStatRuleMgr() {
451    m_api = NULL;
452    m_max_hw_id = -1;
453    m_max_hw_id_payload = -1;
454    m_num_started_streams = 0;
455    m_ring_to_rx = NULL;
456    m_cap = 0;
457    m_parser = NULL;
458    m_rx_core = NULL;
459    m_hw_id_map.create(MAX_FLOW_STATS);
460    m_hw_id_map_payload.create(MAX_FLOW_STATS_PAYLOAD);
461    memset(m_rx_cant_count_err, 0, sizeof(m_rx_cant_count_err));
462    memset(m_tx_cant_count_err, 0, sizeof(m_tx_cant_count_err));
463    m_num_ports = 0; // need to call create to init
464}
465
466CFlowStatRuleMgr::~CFlowStatRuleMgr() {
467    delete m_parser;
468#ifdef TREX_SIM
469    // In simulator, nobody handles the messages to RX, so need to free them to have clean valgrind run.
470    if (m_ring_to_rx) {
471        CGenNode *msg = NULL;
472        while (! m_ring_to_rx->isEmpty()) {
473            m_ring_to_rx->Dequeue(msg);
474            delete msg;
475        }
476    }
477#endif
478}
479
480void CFlowStatRuleMgr::create() {
481    uint16_t num_counters, cap;
482    TrexStateless *tstateless = get_stateless_obj();
483    assert(tstateless);
484
485    m_api = tstateless->get_platform_api();
486    assert(m_api);
487    m_api->get_interface_stat_info(0, num_counters, cap);
488    m_api->get_port_num(m_num_ports); // This initialize m_num_ports
489    for (uint8_t port = 0; port < m_num_ports; port++) {
490        assert(m_api->reset_hw_flow_stats(port) == 0);
491    }
492    m_ring_to_rx = CMsgIns::Ins()->getCpRx()->getRingCpToDp(0);
493    assert(m_ring_to_rx);
494    m_rx_core = get_rx_sl_core_obj();
495    m_parser = m_api->get_flow_stat_parser();
496    assert(m_parser);
497    m_cap = cap;
498}
499
500std::ostream& operator<<(std::ostream& os, const CFlowStatRuleMgr& cf) {
501    os << "Flow stat rule mgr (" << cf.m_num_ports << ") ports:" << std::endl;
502    os << cf.m_hw_id_map;
503    os << cf.m_hw_id_map_payload;
504    os << cf.m_user_id_map;
505    return os;
506}
507
508int CFlowStatRuleMgr::compile_stream(const TrexStream * stream, CFlowStatParser *parser) {
509#ifdef __DEBUG_FUNC_ENTRY__
510    std::cout << __METHOD_NAME__ << " user id:" << stream->m_rx_check.m_pg_id << " en:";
511    std::cout << stream->m_rx_check.m_enabled << std::endl;
512#endif
513
514    if (parser->parse(stream->m_pkt.binary, stream->m_pkt.len) != 0) {
515        // if we could not parse the packet, but no stat count needed, it is probably OK.
516        if (stream->m_rx_check.m_enabled) {
517            throw TrexFStatEx("Failed parsing given packet for flow stat. Please consult the manual for supported packet types for flow stat."
518                              , TrexException::T_FLOW_STAT_BAD_PKT_FORMAT);
519        } else {
520            return 0;
521        }
522    }
523
524    if (!parser->is_stat_supported()) {
525        if (! stream->m_rx_check.m_enabled) {
526            // flow stat not needed. Do nothing.
527            return 0;
528        } else {
529            // flow stat needed, but packet format is not supported
530            throw TrexFStatEx("Unsupported packet format for flow stat on given interface type"
531                              , TrexException::T_FLOW_STAT_UNSUPP_PKT_FORMAT);
532        }
533    }
534
535    return 0;
536}
537
538void CFlowStatRuleMgr::copy_state(TrexStream * from, TrexStream * to) {
539    to->m_rx_check.m_hw_id = from->m_rx_check.m_hw_id;
540}
541void CFlowStatRuleMgr::init_stream(TrexStream * stream) {
542    stream->m_rx_check.m_hw_id = HW_ID_INIT;
543}
544
545int CFlowStatRuleMgr::verify_stream(TrexStream * stream) {
546    return add_stream_internal(stream, false);
547}
548
549int CFlowStatRuleMgr::add_stream(TrexStream * stream) {
550    return add_stream_internal(stream, true);
551}
552
553/*
554 * Helper function for adding/verifying streams
555 * stream - stream to act on
556 * do_action - if false, just verify. Do not change any state, or add to database.
557 */
558int CFlowStatRuleMgr::add_stream_internal(TrexStream * stream, bool do_action) {
559#ifdef __DEBUG_FUNC_ENTRY__
560    std::cout << __METHOD_NAME__ << " user id:" << stream->m_rx_check.m_pg_id << std::endl;
561    stream_dump(stream);
562#endif
563
564    if (! stream->m_rx_check.m_enabled) {
565        return 0;
566    }
567
568    // Init everything here, and not in the constructor, since we relay on other objects
569    // By the time a stream is added everything else is initialized.
570    if (! m_api ) {
571        create();
572    }
573
574    TrexPlatformApi::driver_stat_cap_e rule_type = (TrexPlatformApi::driver_stat_cap_e)stream->m_rx_check.m_rule_type;
575
576    if ((m_cap & rule_type) == 0) {
577        throw TrexFStatEx("Interface does not support given rule type", TrexException::T_FLOW_STAT_BAD_RULE_TYPE_FOR_IF);
578    }
579
580    // compile_stream throws exception if something goes wrong
581    compile_stream(stream, m_parser);
582
583    switch(rule_type) {
584    case TrexPlatformApi::IF_STAT_IPV4_ID:
585        uint16_t l3_proto;
586
587        if (m_mode == FLOW_STAT_MODE_PASS_ALL) {
588            throw TrexFStatEx("Can not add flow stat stream in 'receive all' mode", TrexException::T_FLOW_STAT_BAD_RULE_TYPE_FOR_MODE);
589        }
590
591        if (m_parser->get_l3_proto(l3_proto) < 0) {
592            throw TrexFStatEx("Failed determining l3 proto for packet", TrexException::T_FLOW_STAT_FAILED_FIND_L3);
593        }
594        uint8_t l4_proto;
595        if (m_parser->get_l4_proto(l4_proto) < 0) {
596            throw TrexFStatEx("Failed determining l4 proto for packet", TrexException::T_FLOW_STAT_FAILED_FIND_L4);
597        }
598
599
600        // throws exception if there is error
601        if (do_action) {
602            // passing 0 in ipv6_next_h. This is not used currently in stateless.
603            m_user_id_map.add_stream(stream->m_rx_check.m_pg_id, l3_proto, l4_proto, 0);
604        }
605        break;
606    case TrexPlatformApi::IF_STAT_PAYLOAD:
607        uint16_t payload_len;
608        if (m_parser->get_payload_len(stream->m_pkt.binary, stream->m_pkt.len, payload_len) < 0) {
609            throw TrexFStatEx("Failed getting payload len", TrexException::T_FLOW_STAT_BAD_PKT_FORMAT);
610        }
611        if (payload_len < sizeof(struct flow_stat_payload_header)) {
612            throw TrexFStatEx("Need at least " + std::to_string(sizeof(struct latency_header))
613                              + " payload bytes for payload rules. Packet only has " + std::to_string(payload_len) + " bytes"
614                              , TrexException::T_FLOW_STAT_PAYLOAD_TOO_SHORT);
615        }
616        if (do_action) {
617            m_user_id_map.add_stream(stream->m_rx_check.m_pg_id, 0, PAYLOAD_RULE_PROTO, 0);
618        }
619        break;
620    default:
621        throw TrexFStatEx("Wrong rule_type", TrexException::T_FLOW_STAT_BAD_RULE_TYPE);
622        break;
623    }
624    if (do_action) {
625        stream->m_rx_check.m_hw_id = HW_ID_FREE;
626    }
627    return 0;
628}
629
630int CFlowStatRuleMgr::del_stream(TrexStream * stream) {
631#ifdef __DEBUG_FUNC_ENTRY__
632    std::cout << __METHOD_NAME__ << " user id:" << stream->m_rx_check.m_pg_id << std::endl;
633    stream_dump(stream);
634#endif
635
636    if (! stream->m_rx_check.m_enabled) {
637        return 0;
638    }
639
640    if (! m_api)
641        throw TrexFStatEx("Called del_stream, but no stream was added", TrexException::T_FLOW_STAT_NO_STREAMS_EXIST);
642
643    TrexPlatformApi::driver_stat_cap_e rule_type = (TrexPlatformApi::driver_stat_cap_e)stream->m_rx_check.m_rule_type;
644    switch(rule_type) {
645    case TrexPlatformApi::IF_STAT_IPV4_ID:
646        break;
647    case TrexPlatformApi::IF_STAT_PAYLOAD:
648        break;
649    default:
650        throw TrexFStatEx("Wrong rule_type", TrexException::T_FLOW_STAT_BAD_RULE_TYPE);
651        break;
652    }
653
654    // we got del_stream command for a stream which has valid hw_id.
655    // Probably someone forgot to call stop
656    if(stream->m_rx_check.m_hw_id < MAX_FLOW_STATS + MAX_FLOW_STATS_PAYLOAD) {
657        stop_stream(stream);
658    }
659
660    // calling del for same stream twice, or for a stream which was never "added"
661    if(stream->m_rx_check.m_hw_id == HW_ID_INIT) {
662        return 0;
663    }
664    m_user_id_map.del_stream(stream->m_rx_check.m_pg_id); // Throws exception in case of error
665    stream->m_rx_check.m_hw_id = HW_ID_INIT;
666
667    return 0;
668}
669
670// called on all streams, when stream start to transmit
671// If stream need flow stat counting, make sure the type of packet is supported, and
672// embed needed info in packet.
673// If stream does not need flow stat counting, make sure it does not interfere with
674// other streams that do need stat counting.
675// Might change the IP ID of the stream packet
676int CFlowStatRuleMgr::start_stream(TrexStream * stream) {
677#ifdef __DEBUG_FUNC_ENTRY__
678    std::cout << __METHOD_NAME__ << " user id:" << stream->m_rx_check.m_pg_id << std::endl;
679    stream_dump(stream);
680#endif
681
682    int ret;
683    // Streams which does not need statistics might be started, before any stream that do
684    // need statistcs, so start_stream might be called before add_stream
685    if (! m_api ) {
686        create();
687    }
688
689    // first handle streams that do not need rx stat
690    if (! stream->m_rx_check.m_enabled) {
691        try {
692            compile_stream(stream, m_parser);
693        } catch (TrexFStatEx) {
694            // If no statistics needed, and we can't parse the stream, that's OK.
695            return 0;
696        }
697
698        uint32_t ip_id;
699        if (m_parser->get_ip_id(ip_id) < 0) {
700            return 0; // if we could not find the ip id, no need to fix
701        }
702        // verify no reserved IP_ID used, and change if needed
703        if (ip_id >= IP_ID_RESERVE_BASE) {
704            if (m_parser->set_ip_id(ip_id & 0xefff) < 0) {
705                throw TrexFStatEx("Stream IP ID in reserved range. Failed changing it"
706                                  , TrexException::T_FLOW_STAT_FAILED_CHANGE_IP_ID);
707            }
708        }
709        return 0;
710    }
711
712    // from here, we know the stream need rx stat
713
714    // Starting a stream which was never added
715    if (stream->m_rx_check.m_hw_id == HW_ID_INIT) {
716        add_stream(stream);
717    }
718
719    if (stream->m_rx_check.m_hw_id < MAX_FLOW_STATS + MAX_FLOW_STATS_PAYLOAD) {
720        throw TrexFStatEx("Starting a stream which was already started"
721                          , TrexException::T_FLOW_STAT_ALREADY_STARTED);
722    }
723
724    TrexPlatformApi::driver_stat_cap_e rule_type = (TrexPlatformApi::driver_stat_cap_e)stream->m_rx_check.m_rule_type;
725
726    if ((m_cap & rule_type) == 0) {
727        throw TrexFStatEx("Interface does not support given rule type", TrexException::T_FLOW_STAT_BAD_RULE_TYPE_FOR_IF);
728    }
729
730    // compile_stream throws exception if something goes wrong
731    if ((ret = compile_stream(stream, m_parser)) < 0)
732        return ret;
733
734    uint16_t hw_id;
735
736    switch(rule_type) {
737    case TrexPlatformApi::IF_STAT_IPV4_ID:
738        break;
739    case TrexPlatformApi::IF_STAT_PAYLOAD:
740        break;
741    default:
742        throw TrexFStatEx("Wrong rule_type", TrexException::T_FLOW_STAT_BAD_RULE_TYPE);
743        break;
744    }
745
746    if (m_user_id_map.is_started(stream->m_rx_check.m_pg_id)) {
747        m_user_id_map.start_stream(stream->m_rx_check.m_pg_id); // just increase ref count;
748        hw_id = m_user_id_map.get_hw_id(stream->m_rx_check.m_pg_id); // can't fail if we got here
749    } else {
750        if (rule_type == TrexPlatformApi::IF_STAT_IPV4_ID) {
751            hw_id = m_hw_id_map.find_free_hw_id();
752        } else {
753            hw_id = m_hw_id_map_payload.find_free_hw_id();
754        }
755        if (hw_id == HW_ID_FREE) {
756            throw TrexFStatEx("Failed allocating statistic counter. Probably all are used for this rule type."
757                              , TrexException::T_FLOW_STAT_NO_FREE_HW_ID);
758        } else {
759            uint32_t user_id = stream->m_rx_check.m_pg_id;
760            m_user_id_map.start_stream(user_id, hw_id);
761            if (rule_type == TrexPlatformApi::IF_STAT_IPV4_ID) {
762                if (hw_id > m_max_hw_id) {
763                    m_max_hw_id = hw_id;
764                }
765                m_hw_id_map.map(hw_id, user_id);
766                CFlowStatUserIdInfo *uid_info = m_user_id_map.find_user_id(user_id);
767                if (uid_info != NULL) {
768                    add_hw_rule(hw_id, uid_info->get_l3_proto(), uid_info->get_l4_proto(), uid_info->get_ipv6_next_h());
769                }
770            } else {
771                if (hw_id > m_max_hw_id_payload) {
772                    m_max_hw_id_payload = hw_id;
773                }
774                m_hw_id_map_payload.map(hw_id, user_id);
775            }
776            // clear hardware counters. Just in case we have garbage from previous iteration
777            rx_per_flow_t rx_cntr;
778            tx_per_flow_t tx_cntr;
779            rfc2544_info_t rfc2544_info;
780            for (uint8_t port = 0; port < m_num_ports; port++) {
781                m_api->get_flow_stats(port, &rx_cntr, (void *)&tx_cntr, hw_id, hw_id, true, rule_type);
782            }
783            if (rule_type == TrexPlatformApi::IF_STAT_PAYLOAD) {
784                m_api->get_rfc2544_info(&rfc2544_info, hw_id, hw_id, true);
785            }
786        }
787    }
788
789    // saving given hw_id on stream for use by tx statistics count
790    if (rule_type == TrexPlatformApi::IF_STAT_IPV4_ID) {
791        m_parser->set_ip_id(IP_ID_RESERVE_BASE + hw_id);
792        stream->m_rx_check.m_hw_id = hw_id;
793    } else {
794        m_parser->set_ip_id(FLOW_STAT_PAYLOAD_IP_ID);
795        // for payload rules, we use the range right after ip id rules
796        stream->m_rx_check.m_hw_id = hw_id + MAX_FLOW_STATS;
797    }
798
799#ifdef __DEBUG_FUNC_ENTRY__
800    std::cout << "exit:" << __METHOD_NAME__ << " hw_id:" << hw_id << std::endl;
801    stream_dump(stream);
802#endif
803
804    if (m_num_started_streams == 0) {
805        send_start_stop_msg_to_rx(true); // First transmitting stream. Rx core should start reading packets;
806        //also good time to zero global counters
807        memset(m_rx_cant_count_err, 0, sizeof(m_rx_cant_count_err));
808        memset(m_tx_cant_count_err, 0, sizeof(m_tx_cant_count_err));
809
810        // wait to make sure that message is acknowledged. RX core might be in deep sleep mode, and we want to
811        // start transmitting packets only after it is working, otherwise, packets will get lost.
812        if (m_rx_core) { // in simulation, m_rx_core will be NULL
813            int count = 0;
814            while (!m_rx_core->is_working()) {
815                delay(1);
816                count++;
817                if (count == 100) {
818                    throw TrexFStatEx("Critical error!! - RX core failed to start", TrexException::T_FLOW_STAT_RX_CORE_START_FAIL);
819                }
820            }
821        }
822    } else {
823        // make sure rx core is working. If not, we got really confused somehow.
824        if (m_rx_core)
825            assert(m_rx_core->is_working());
826    }
827    m_num_started_streams++;
828    return 0;
829}
830
831int CFlowStatRuleMgr::add_hw_rule(uint16_t hw_id, uint16_t l3_proto, uint8_t l4_proto, uint8_t ipv6_next_h) {
832    for (int port = 0; port < m_num_ports; port++) {
833        m_api->add_rx_flow_stat_rule(port, l3_proto, l4_proto, ipv6_next_h, hw_id);
834    }
835
836    return 0;
837}
838
839int CFlowStatRuleMgr::stop_stream(TrexStream * stream) {
840#ifdef __DEBUG_FUNC_ENTRY__
841    std::cout << __METHOD_NAME__ << " user id:" << stream->m_rx_check.m_pg_id << std::endl;
842    stream_dump(stream);
843#endif
844    if (! stream->m_rx_check.m_enabled) {
845        return 0;
846    }
847
848    if (! m_api)
849        throw TrexFStatEx("Called stop_stream, but no stream was added", TrexException::T_FLOW_STAT_NO_STREAMS_EXIST);
850
851    if (stream->m_rx_check.m_hw_id >= MAX_FLOW_STATS + MAX_FLOW_STATS_PAYLOAD) {
852        // We allow stopping while already stopped. Will not hurt us.
853        return 0;
854    }
855
856    TrexPlatformApi::driver_stat_cap_e rule_type = (TrexPlatformApi::driver_stat_cap_e)stream->m_rx_check.m_rule_type;
857    switch(rule_type) {
858    case TrexPlatformApi::IF_STAT_IPV4_ID:
859        break;
860    case TrexPlatformApi::IF_STAT_PAYLOAD:
861        break;
862    default:
863        throw TrexFStatEx("Wrong rule_type", TrexException::T_FLOW_STAT_BAD_RULE_TYPE);
864        break;
865    }
866
867    stream->m_rx_check.m_hw_id = HW_ID_FREE;
868
869    if (m_user_id_map.stop_stream(stream->m_rx_check.m_pg_id) == 0) {
870        // last stream associated with the entry stopped transmittig.
871        // remove user_id <--> hw_id mapping
872        uint16_t hw_id = m_user_id_map.get_hw_id(stream->m_rx_check.m_pg_id);
873        if (hw_id >= MAX_FLOW_STATS) {
874            throw TrexFStatEx("Internal error in stop_stream. Got bad hw_id" + std::to_string(hw_id)
875                              , TrexException::T_FLOW_STAT_BAD_HW_ID);
876        } else {
877            CFlowStatUserIdInfo *p_user_id;
878            // update counters, and reset before unmapping
879            if (rule_type == TrexPlatformApi::IF_STAT_IPV4_ID) {
880                p_user_id = m_user_id_map.find_user_id(m_hw_id_map.get_user_id(hw_id));
881            } else {
882                p_user_id = m_user_id_map.find_user_id(m_hw_id_map_payload.get_user_id(hw_id));
883            }
884            assert(p_user_id != NULL);
885            rx_per_flow_t rx_cntr;
886            tx_per_flow_t tx_cntr;
887            rfc2544_info_t rfc2544_info;
888            for (uint8_t port = 0; port < m_num_ports; port++) {
889                if (rule_type == TrexPlatformApi::IF_STAT_IPV4_ID) {
890                    m_api->del_rx_flow_stat_rule(port, p_user_id->get_l3_proto(), p_user_id->get_l4_proto()
891                                                 , p_user_id->get_ipv6_next_h(), hw_id);
892                }
893                m_api->get_flow_stats(port, &rx_cntr, (void *)&tx_cntr, hw_id, hw_id, true, rule_type);
894                // when stopping, always send counters for stopped stream one last time
895                p_user_id->set_rx_cntr(port, rx_cntr);
896                p_user_id->set_need_to_send_rx(port);
897                p_user_id->set_tx_cntr(port, tx_cntr);
898                p_user_id->set_need_to_send_tx(port);
899            }
900
901            if (rule_type == TrexPlatformApi::IF_STAT_IPV4_ID) {
902                m_hw_id_map.unmap(hw_id);
903            } else {
904                CFlowStatUserIdInfoPayload *p_user_id_p = (CFlowStatUserIdInfoPayload *)p_user_id;
905                Json::Value json;
906                m_api->get_rfc2544_info(&rfc2544_info, hw_id, hw_id, true);
907                p_user_id_p->set_jitter(rfc2544_info.get_jitter());
908                rfc2544_info.get_latency_json(json);
909                p_user_id_p->set_latency_json(json);
910                p_user_id_p->set_seq_err_cnt(rfc2544_info.get_seq_err_cnt());
911                p_user_id_p->set_ooo_cnt(rfc2544_info.get_ooo_cnt());
912                p_user_id_p->set_dup_cnt(rfc2544_info.get_dup_cnt());
913                p_user_id_p->set_seq_err_big_cnt(rfc2544_info.get_seq_err_ev_big());
914                p_user_id_p->set_seq_err_low_cnt(rfc2544_info.get_seq_err_ev_low());
915                m_hw_id_map_payload.unmap(hw_id);
916            }
917            m_user_id_map.unmap(stream->m_rx_check.m_pg_id);
918        }
919    }
920    m_num_started_streams--;
921    assert (m_num_started_streams >= 0);
922    if (m_num_started_streams == 0) {
923        send_start_stop_msg_to_rx(false); // No more transmittig streams. Rx core should get into idle loop.
924    }
925    return 0;
926}
927
928int CFlowStatRuleMgr::get_active_pgids(flow_stat_active_t &result) {
929    flow_stat_user_id_map_it_t it;
930
931    for (it = m_user_id_map.begin(); it != m_user_id_map.end(); it++) {
932        result.insert(it->first);
933    }
934
935    return 0;
936}
937
938int CFlowStatRuleMgr::set_mode(enum flow_stat_mode_e mode) {
939    if ( ! m_user_id_map.is_empty() )
940        return -1;
941
942    if (! m_api ) {
943        create();
944    }
945
946    switch (mode) {
947    case FLOW_STAT_MODE_PASS_ALL:
948        delete m_parser;
949        m_parser = new CPassAllParser;
950        break;
951    case FLOW_STAT_MODE_NORMAL:
952        delete m_parser;
953        m_parser = m_api->get_flow_stat_parser();
954        assert(m_parser);
955        break;
956    default:
957        return -1;
958
959    }
960
961    m_mode = mode;
962
963    return 0;
964}
965
966extern bool rx_should_stop;
967void CFlowStatRuleMgr::send_start_stop_msg_to_rx(bool is_start) {
968    TrexStatelessCpToRxMsgBase *msg;
969
970    if (is_start) {
971        msg = new TrexStatelessRxStartMsg();
972    } else {
973        msg = new TrexStatelessRxStopMsg();
974    }
975    m_ring_to_rx->Enqueue((CGenNode *)msg);
976}
977
978// return false if no counters changed since last run. true otherwise
979// s_json - flow statistics json
980// l_json - latency data json
981// baseline - If true, send flow statistics fields even if they were not changed since last run
982bool CFlowStatRuleMgr::dump_json(std::string & s_json, std::string & l_json, bool baseline) {
983    rx_per_flow_t rx_stats[MAX_FLOW_STATS];
984    rx_per_flow_t rx_stats_payload[MAX_FLOW_STATS];
985    tx_per_flow_t tx_stats[MAX_FLOW_STATS];
986    tx_per_flow_t tx_stats_payload[MAX_FLOW_STATS_PAYLOAD];
987    rfc2544_info_t rfc2544_info[MAX_FLOW_STATS_PAYLOAD];
988    CRxCoreErrCntrs rx_err_cntrs;
989    Json::FastWriter writer;
990    Json::Value s_root;
991    Json::Value l_root;
992
993    s_root["name"] = "flow_stats";
994    s_root["type"] = 0;
995    l_root["name"] = "latency_stats";
996    l_root["type"] = 0;
997
998    if (baseline) {
999        s_root["baseline"] = true;
1000        l_root["baseline"] = true;
1001    }
1002
1003    Json::Value &s_data_section = s_root["data"];
1004    Json::Value &l_data_section = l_root["data"];
1005    s_data_section["ts"]["value"] = Json::Value::UInt64(os_get_hr_tick_64());
1006    s_data_section["ts"]["freq"] = Json::Value::UInt64(os_get_hr_freq());
1007
1008    if (m_user_id_map.is_empty()) {
1009        s_json = writer.write(s_root);
1010        l_json = writer.write(l_root);
1011        return true;
1012    }
1013
1014    m_api->get_rfc2544_info(rfc2544_info, 0, m_max_hw_id_payload, false);
1015    m_api->get_rx_err_cntrs(&rx_err_cntrs);
1016
1017    // read hw counters, and update
1018    for (uint8_t port = 0; port < m_num_ports; port++) {
1019        m_api->get_flow_stats(port, rx_stats, (void *)tx_stats, 0, m_max_hw_id, false, TrexPlatformApi::IF_STAT_IPV4_ID);
1020        for (int i = 0; i <= m_max_hw_id; i++) {
1021            if (rx_stats[i].get_pkts() != 0) {
1022                rx_per_flow_t rx_pkts = rx_stats[i];
1023                CFlowStatUserIdInfo *p_user_id = m_user_id_map.find_user_id(m_hw_id_map.get_user_id(i));
1024                if (likely(p_user_id != NULL)) {
1025                    if (p_user_id->get_rx_cntr(port) != rx_pkts) {
1026                        p_user_id->set_rx_cntr(port, rx_pkts);
1027                        p_user_id->set_need_to_send_rx(port);
1028                    }
1029                } else {
1030                    m_rx_cant_count_err[port] += rx_pkts.get_pkts();
1031                    std::cerr <<  __METHOD_NAME__ << i << ":Could not count " << rx_pkts << " rx packets, on port "
1032                              << (uint16_t)port << ", because no mapping was found." << std::endl;
1033                }
1034            }
1035            if (tx_stats[i].get_pkts() != 0) {
1036                tx_per_flow_t tx_pkts = tx_stats[i];
1037                CFlowStatUserIdInfo *p_user_id = m_user_id_map.find_user_id(m_hw_id_map.get_user_id(i));
1038                if (likely(p_user_id != NULL)) {
1039                    if (p_user_id->get_tx_cntr(port) != tx_pkts) {
1040                        p_user_id->set_tx_cntr(port, tx_pkts);
1041                        p_user_id->set_need_to_send_tx(port);
1042                    }
1043                } else {
1044                    m_tx_cant_count_err[port] += tx_pkts.get_pkts();;
1045                    std::cerr <<  __METHOD_NAME__ << i << ":Could not count " << tx_pkts <<  " tx packets on port "
1046                              << (uint16_t)port << ", because no mapping was found." << std::endl;
1047                }
1048            }
1049        }
1050        // payload rules
1051        m_api->get_flow_stats(port, rx_stats_payload, (void *)tx_stats_payload, 0, m_max_hw_id_payload
1052                              , false, TrexPlatformApi::IF_STAT_PAYLOAD);
1053        for (int i = 0; i <= m_max_hw_id_payload; i++) {
1054            if (rx_stats_payload[i].get_pkts() != 0) {
1055                rx_per_flow_t rx_pkts = rx_stats_payload[i];
1056                CFlowStatUserIdInfo *p_user_id = m_user_id_map.find_user_id(m_hw_id_map_payload.get_user_id(i));
1057                if (likely(p_user_id != NULL)) {
1058                    if (p_user_id->get_rx_cntr(port) != rx_pkts) {
1059                        p_user_id->set_rx_cntr(port, rx_pkts);
1060                        p_user_id->set_need_to_send_rx(port);
1061                    }
1062                } else {
1063                    m_rx_cant_count_err[port] += rx_pkts.get_pkts();;
1064                    std::cerr <<  __METHOD_NAME__ << i << ":Could not count " << rx_pkts << " rx payload packets, on port "
1065                              << (uint16_t)port << ", because no mapping was found." << std::endl;
1066                }
1067            }
1068            if (tx_stats_payload[i].get_pkts() != 0) {
1069                tx_per_flow_t tx_pkts = tx_stats_payload[i];
1070                CFlowStatUserIdInfo *p_user_id = m_user_id_map.find_user_id(m_hw_id_map_payload.get_user_id(i));
1071                if (likely(p_user_id != NULL)) {
1072                    if (p_user_id->get_tx_cntr(port) != tx_pkts) {
1073                        p_user_id->set_tx_cntr(port, tx_pkts);
1074                        p_user_id->set_need_to_send_tx(port);
1075                    }
1076                } else {
1077                    m_tx_cant_count_err[port] += tx_pkts.get_pkts();;
1078                    std::cerr <<  __METHOD_NAME__ << i << ":Could not count " << tx_pkts <<  " tx packets on port "
1079                              << (uint16_t)port << ", because no mapping was found." << std::endl;
1080                }
1081            }
1082        }
1083    }
1084
1085    // build json report
1086    // general per port data
1087    for (uint8_t port = 0; port < m_num_ports; port++) {
1088            std::string str_port = static_cast<std::ostringstream*>( &(std::ostringstream() << int(port) ) )->str();
1089            if ((m_rx_cant_count_err[port] != 0) || baseline)
1090                s_data_section["global"]["rx_err"][str_port] = m_rx_cant_count_err[port];
1091            if ((m_tx_cant_count_err[port] != 0) || baseline)
1092                s_data_section["global"]["tx_err"][str_port] = m_tx_cant_count_err[port];
1093    }
1094
1095    // payload rules rx errors
1096    uint64_t tmp_cnt;
1097    tmp_cnt = rx_err_cntrs.get_bad_header();
1098    if (tmp_cnt || baseline) {
1099        l_data_section["global"]["bad_hdr"] = Json::Value::UInt64(tmp_cnt);
1100    }
1101    tmp_cnt = rx_err_cntrs.get_old_flow();
1102    if (tmp_cnt || baseline) {
1103        l_data_section["global"]["old_flow"] = Json::Value::UInt64(tmp_cnt);
1104    }
1105
1106    flow_stat_user_id_map_it_t it;
1107    for (it = m_user_id_map.begin(); it != m_user_id_map.end(); it++) {
1108        bool send_empty = true;
1109        CFlowStatUserIdInfo *user_id_info = it->second;
1110        uint32_t user_id = it->first;
1111        std::string str_user_id = static_cast<std::ostringstream*>( &(std::ostringstream() << user_id) )->str();
1112        if (! user_id_info->was_sent()) {
1113            s_data_section[str_user_id]["first_time"] = true;
1114            user_id_info->set_was_sent(true);
1115            send_empty = false;
1116        }
1117        // flow stat json
1118        for (uint8_t port = 0; port < m_num_ports; port++) {
1119            std::string str_port = static_cast<std::ostringstream*>( &(std::ostringstream() << int(port) ) )->str();
1120            if (user_id_info->need_to_send_rx(port) || baseline) {
1121                user_id_info->set_no_need_to_send_rx(port);
1122                s_data_section[str_user_id]["rx_pkts"][str_port] = Json::Value::UInt64(user_id_info->get_rx_cntr(port).get_pkts());
1123                if (m_cap & TrexPlatformApi::IF_STAT_RX_BYTES_COUNT)
1124                    s_data_section[str_user_id]["rx_bytes"][str_port] = Json::Value::UInt64(user_id_info->get_rx_cntr(port).get_bytes());
1125                send_empty = false;
1126            }
1127            if (user_id_info->need_to_send_tx(port) || baseline) {
1128                user_id_info->set_no_need_to_send_tx(port);
1129                s_data_section[str_user_id]["tx_pkts"][str_port] = Json::Value::UInt64(user_id_info->get_tx_cntr(port).get_pkts());
1130                s_data_section[str_user_id]["tx_bytes"][str_port] = Json::Value::UInt64(user_id_info->get_tx_cntr(port).get_bytes());
1131                send_empty = false;
1132            }
1133        }
1134        if (send_empty) {
1135            s_data_section[str_user_id] = Json::objectValue;
1136        }
1137
1138        // latency info json
1139        if (user_id_info->rfc2544_support()) {
1140            CFlowStatUserIdInfoPayload *user_id_info_p = (CFlowStatUserIdInfoPayload *)user_id_info;
1141            // payload object. Send also latency, jitter...
1142            Json::Value lat_hist = Json::arrayValue;
1143            if (user_id_info->is_hw_id()) {
1144                // if mapped to hw_id, take info from what we just got from rx core
1145                uint16_t hw_id = user_id_info->get_hw_id();
1146                rfc2544_info[hw_id].get_latency_json(lat_hist);
1147                user_id_info_p->set_seq_err_cnt(rfc2544_info[hw_id].get_seq_err_cnt());
1148                user_id_info_p->set_ooo_cnt(rfc2544_info[hw_id].get_ooo_cnt());
1149                user_id_info_p->set_dup_cnt(rfc2544_info[hw_id].get_dup_cnt());
1150                user_id_info_p->set_seq_err_big_cnt(rfc2544_info[hw_id].get_seq_err_ev_big());
1151                user_id_info_p->set_seq_err_low_cnt(rfc2544_info[hw_id].get_seq_err_ev_low());
1152                l_data_section[str_user_id]["latency"] = lat_hist;
1153                l_data_section[str_user_id]["latency"]["jitter"] = rfc2544_info[hw_id].get_jitter_usec();
1154            } else {
1155                // Not mapped to hw_id. Get saved info.
1156                user_id_info_p->get_latency_json(lat_hist);
1157                if (lat_hist != Json::nullValue) {
1158                    l_data_section[str_user_id]["latency"] = lat_hist;
1159                    l_data_section[str_user_id]["latency"]["jitter"] = user_id_info_p->get_jitter_usec();
1160                }
1161            }
1162            l_data_section[str_user_id]["err_cntrs"]["dropped"]
1163                = Json::Value::UInt64(user_id_info_p->get_seq_err_cnt());
1164            l_data_section[str_user_id]["err_cntrs"]["out_of_order"]
1165                = Json::Value::UInt64(user_id_info_p->get_ooo_cnt());
1166            l_data_section[str_user_id]["err_cntrs"]["dup"]
1167                = Json::Value::UInt64(user_id_info_p->get_dup_cnt());
1168            l_data_section[str_user_id]["err_cntrs"]["seq_too_high"]
1169                = Json::Value::UInt64(user_id_info_p->get_seq_err_big_cnt());
1170            l_data_section[str_user_id]["err_cntrs"]["seq_too_low"]
1171                = Json::Value::UInt64(user_id_info_p->get_seq_err_low_cnt());
1172        }
1173    }
1174
1175    s_json = writer.write(s_root);
1176    l_json = writer.write(l_root);
1177    // We always want to publish, even only the timestamp.
1178    return true;
1179}
1180