1/*
2 Itay Marom
3 Cisco Systems, Inc.
4*/
5
6/*
7Copyright (c) 2015-2015 Cisco Systems, Inc.
8
9Licensed under the Apache License, Version 2.0 (the "License");
10you may not use this file except in compliance with the License.
11You may obtain a copy of the License at
12
13    http://www.apache.org/licenses/LICENSE-2.0
14
15Unless required by applicable law or agreed to in writing, software
16distributed under the License is distributed on an "AS IS" BASIS,
17WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18See the License for the specific language governing permissions and
19limitations under the License.
20*/
21
22#include <string>
23#include <sstream>
24#include <assert.h>
25#include <iostream>
26
27#include <trex_streams_compiler.h>
28#include <trex_stateless.h>
29#include <trex_vm_splitter.h>
30#include <trex_stream.h>
31
32
33
34/**
35 * describes a graph node in the pre compile check
36 *
37 * @author imarom (16-Nov-15)
38 */
39class GraphNode {
40public:
41    GraphNode(const TrexStream *stream, GraphNode *next) : m_stream(stream), m_next(next) {
42        m_marked   = false;
43        m_compressed_stream_id=-1;
44
45    }
46
47    uint32_t get_stream_id() const {
48        return m_stream->m_stream_id;
49    }
50
51    uint32_t get_next_stream_id() const {
52        return m_stream->m_next_stream_id;
53
54    }
55
56    const TrexStream *m_stream;
57    GraphNode *m_next;
58    std::vector<const GraphNode *> m_parents;
59    bool m_marked;
60    int m_compressed_stream_id;
61};
62
63/**
64 * node map
65 *
66 */
67class GraphNodeMap {
68public:
69
70    GraphNodeMap() : m_dead_end(NULL, NULL) {
71
72    }
73
74    bool add(GraphNode *node) {
75        if (has(node->get_stream_id())) {
76            return false;
77        }
78
79        m_nodes[node->get_stream_id()] = node;
80
81        if (node->m_stream->m_self_start) {
82            m_roots.push_back(node);
83        }
84
85        return true;
86    }
87
88    bool has(uint32_t stream_id) {
89
90        return (get(stream_id) != NULL);
91    }
92
93    GraphNode * get(uint32_t stream_id) {
94
95        if (stream_id == -1) {
96            return &m_dead_end;
97        }
98
99        auto search = m_nodes.find(stream_id);
100
101        if (search != m_nodes.end()) {
102            return search->second;
103        } else {
104            return NULL;
105        }
106    }
107
108    void clear_marks() {
109        for (auto node : m_nodes) {
110            node.second->m_marked = false;
111        }
112    }
113
114    void get_unmarked(std::vector <GraphNode *> &unmarked) {
115        for (auto node : m_nodes) {
116            if (!node.second->m_marked) {
117                unmarked.push_back(node.second);
118            }
119        }
120    }
121
122
123    ~GraphNodeMap() {
124        for (auto node : m_nodes) {
125            delete node.second;
126        }
127        m_nodes.clear();
128    }
129
130    std::vector <GraphNode *> & get_roots() {
131        return m_roots;
132    }
133
134
135    std::unordered_map<uint32_t, GraphNode *> get_nodes() {
136        return m_nodes;
137    }
138
139private:
140    std::unordered_map<uint32_t, GraphNode *> m_nodes;
141    std::vector <GraphNode *> m_roots;
142    GraphNode m_dead_end;
143};
144
145
146/**************************************
147 * stream compiled object
148 *************************************/
149TrexStreamsCompiledObj::TrexStreamsCompiledObj(uint8_t port_id) {
150    m_port_id = port_id;
151    m_all_continues = false;
152}
153
154TrexStreamsCompiledObj::~TrexStreamsCompiledObj() {
155    for (auto obj : m_objs) {
156        delete obj.m_stream;
157    }
158    m_objs.clear();
159}
160
161
162void
163TrexStreamsCompiledObj::add_compiled_stream(TrexStream *stream){
164
165    obj_st obj;
166
167    obj.m_stream = stream;
168
169    m_objs.push_back(obj);
170}
171
172
173TrexStreamsCompiledObj *
174TrexStreamsCompiledObj::clone() {
175
176    TrexStreamsCompiledObj *new_compiled_obj = new TrexStreamsCompiledObj(m_port_id);
177
178    /**
179     * clone each element
180     */
181    for (auto obj : m_objs) {
182        TrexStream *new_stream = obj.m_stream->clone();
183        new_compiled_obj->add_compiled_stream(new_stream);
184    }
185
186    return new_compiled_obj;
187}
188
189void TrexStreamsCompiledObj::Dump(FILE *fd){
190    for (auto obj : m_objs) {
191        obj.m_stream->Dump(fd);
192    }
193}
194
195
196void
197TrexStreamsCompiler::add_warning(const std::string &warning) {
198    m_warnings.push_back("*** warning: " + warning);
199}
200
201void
202TrexStreamsCompiler::err(const std::string &err) {
203    throw TrexException("*** error: " + err);
204}
205
206void
207TrexStreamsCompiler::check_stream(const TrexStream *stream) {
208    std::stringstream ss;
209
210    /* cont. stream can point only on itself */
211    if (stream->get_type() == TrexStream::stCONTINUOUS) {
212        if (stream->m_next_stream_id != -1) {
213            ss << "continous stream '" << stream->m_stream_id << "' cannot point to another stream";
214            err(ss.str());
215        }
216    }
217}
218
219void
220TrexStreamsCompiler::allocate_pass(const std::vector<TrexStream *> &streams,
221                                   GraphNodeMap *nodes) {
222    std::stringstream ss;
223    uint32_t compressed_stream_id=0;
224
225
226    /* first pass - allocate all nodes and check for duplicates */
227    for (auto stream : streams) {
228
229        /* skip non enabled streams */
230        if (!stream->m_enabled) {
231            continue;
232        }
233
234        /* sanity check on the stream itself */
235        check_stream(stream);
236
237        /* duplicate stream id ? */
238        if (nodes->has(stream->m_stream_id)) {
239            ss << "duplicate instance of stream id " << stream->m_stream_id;
240            err(ss.str());
241        }
242
243        GraphNode *node = new GraphNode(stream, NULL);
244        /* allocate new compressed id */
245        node->m_compressed_stream_id = compressed_stream_id;
246
247        compressed_stream_id++;
248
249        /* add to the map */
250        assert(nodes->add(node));
251    }
252
253}
254
255/**
256 * on this pass we direct the graph to point to the right nodes
257 *
258 */
259void
260TrexStreamsCompiler::direct_pass(GraphNodeMap *nodes) {
261
262    /* second pass - direct the graph */
263    for (auto p : nodes->get_nodes()) {
264
265        GraphNode *node = p.second;
266        const TrexStream *stream = node->m_stream;
267
268        /* check the stream points on an existing stream */
269        GraphNode *next_node = nodes->get(stream->m_next_stream_id);
270        if (!next_node) {
271            std::stringstream ss;
272            ss << "stream " << node->get_stream_id() << " is pointing on non existent stream " << stream->m_next_stream_id;
273            err(ss.str());
274        }
275
276        node->m_next = next_node;
277
278        /* do we have more than one parent ? */
279        next_node->m_parents.push_back(node);
280    }
281
282
283    /* check for multiple parents */
284    for (auto p : nodes->get_nodes()) {
285        GraphNode *node = p.second;
286
287        if (node->m_parents.size() > 0 ) {
288            std::stringstream ss;
289
290            ss << "stream " << node->get_stream_id() << " is triggered by multiple streams: ";
291            for (auto x : node->m_parents) {
292                ss << x->get_stream_id() << " ";
293            }
294
295            add_warning(ss.str());
296        }
297    }
298}
299
300/**
301 * mark sure all the streams are reachable
302 *
303 */
304void
305TrexStreamsCompiler::check_for_unreachable_streams(GraphNodeMap *nodes) {
306    /* start with the roots */
307    std::vector <GraphNode *> next_nodes = nodes->get_roots();
308
309
310    nodes->clear_marks();
311
312    /* run BFS from all the roots */
313    while (!next_nodes.empty()) {
314
315        /* pull one */
316        GraphNode *node = next_nodes.back();
317        next_nodes.pop_back();
318        if (node->m_marked) {
319            continue;
320        }
321
322        node->m_marked = true;
323
324        if (node->m_next != NULL) {
325            next_nodes.push_back(node->m_next);
326        }
327
328    }
329
330    std::vector <GraphNode *> unmarked;
331    nodes->get_unmarked(unmarked);
332
333    if (!unmarked.empty()) {
334        std::stringstream ss;
335        for (auto node : unmarked) {
336            ss << "stream " << node->get_stream_id() << " is unreachable from any other stream\n";
337        }
338        err(ss.str());
339    }
340
341
342}
343
344/**
345 * check validation of streams for compile
346 *
347 * @author imarom (16-Nov-15)
348 *
349 * @param streams
350 * @param fail_msg
351 *
352 * @return bool
353 */
354void
355TrexStreamsCompiler::pre_compile_check(const std::vector<TrexStream *> &streams,
356                                       GraphNodeMap & nodes) {
357
358    m_warnings.clear();
359
360    /* allocate nodes */
361    allocate_pass(streams, &nodes);
362
363    /* direct the graph */
364    direct_pass(&nodes);
365
366    /* check for non reachable streams inside the graph */
367    check_for_unreachable_streams(&nodes);
368
369}
370
371/**************************************
372 * stream compiler
373 *************************************/
374bool
375TrexStreamsCompiler::compile(uint8_t                                port_id,
376                             const std::vector<TrexStream *>        &streams,
377                             std::vector<TrexStreamsCompiledObj *>  &objs,
378                             const TrexDPCoreMask                   &core_mask,
379                             double                                 factor,
380                             std::string                            *fail_msg) {
381
382    assert(core_mask.get_active_count() > 0);
383
384    /* create an indirect list which contains all
385       the active cores according to the mask
386     */
387    uint8_t indirect_core_count = core_mask.get_active_count();
388    std::vector<TrexStreamsCompiledObj *> indirect_objs(indirect_core_count);
389
390    /* zero all */
391    for (int i = 0; i < indirect_core_count; i++) {
392        indirect_objs[i] = NULL;
393    }
394
395    try {
396        bool rc = compile_internal(port_id,
397                                   streams,
398                                   indirect_objs,
399                                   indirect_core_count,
400                                   factor,
401                                   fail_msg);
402        if (!rc) {
403            return rc;
404        }
405
406    } catch (const TrexException &ex) {
407        if (fail_msg) {
408            *fail_msg = ex.what();
409        } else {
410            std::cout << ex.what();
411        }
412        return false;
413    }
414
415    /* prepare the result */
416    objs.resize(core_mask.get_total_count());
417    for (int i = 0; i < core_mask.get_total_count(); i++) {
418        objs.push_back(NULL);
419    }
420
421    uint8_t index = 0;
422    for (uint8_t active_core_id : core_mask.get_active_cores()) {
423
424        /* the indirect compile might have decided to compile on one core only
425           (or any other number which is lower than all the actives)
426         */
427        if (indirect_objs[index] == NULL) {
428            break;
429        }
430        objs[active_core_id] = indirect_objs[index++];
431    }
432
433    return true;
434}
435
436bool
437TrexStreamsCompiler::compile_internal(uint8_t                                port_id,
438                                      const std::vector<TrexStream *>        &streams,
439                                      std::vector<TrexStreamsCompiledObj *>  &objs,
440                                      uint8_t                                dp_core_count,
441                                      double                                 factor,
442                                      std::string                            *fail_msg) {
443
444#if 0
445    for (auto stream : streams) {
446        stream->Dump(stdout);
447    }
448    fprintf(stdout,"------------pre compile \n");
449#endif
450
451    GraphNodeMap nodes;
452
453    /* compile checks */
454    pre_compile_check(streams, nodes);
455
456    /* check if all are cont. streams */
457    bool all_continues = true;
458    int  non_splitable_count = 0;
459    for (const auto stream : streams) {
460        if (stream->get_type() != TrexStream::stCONTINUOUS) {
461            all_continues = false;
462        }
463        if (!stream->is_splitable(dp_core_count)) {
464            non_splitable_count++;
465        }
466    }
467
468    /* if all streams are not splitable - all should go to core 0 */
469    if (non_splitable_count == streams.size()) {
470        compile_on_single_core(port_id,
471                               streams,
472                               objs,
473                               dp_core_count,
474                               factor,
475                               nodes,
476                               all_continues);
477    } else {
478        compile_on_all_cores(port_id,
479                             streams,
480                             objs,
481                             dp_core_count,
482                             factor,
483                             nodes,
484                             all_continues);
485    }
486
487    return true;
488
489}
490
491/**
492 * compile a list of streams on a single core (pinned to core 0)
493 *
494 */
495void
496TrexStreamsCompiler::compile_on_single_core(uint8_t                                port_id,
497                                            const std::vector<TrexStream *>        &streams,
498                                            std::vector<TrexStreamsCompiledObj *>  &objs,
499                                            uint8_t                                dp_core_count,
500                                            double                                 factor,
501                                            GraphNodeMap                           &nodes,
502                                            bool                                   all_continues) {
503
504    /* allocate object only for core 0 */
505    TrexStreamsCompiledObj *obj = new TrexStreamsCompiledObj(port_id);
506    obj->m_all_continues = all_continues;
507    objs[0] = obj;
508
509     /* compile all the streams */
510    for (auto const stream : streams) {
511
512        /* skip non-enabled streams */
513        if (!stream->m_enabled) {
514            continue;
515        }
516
517        /* compile the stream for only one core */
518        compile_stream(stream, factor, 1, objs, nodes);
519    }
520}
521
522/**
523 * compile a list of streams on all DP cores
524 *
525 */
526void
527TrexStreamsCompiler::compile_on_all_cores(uint8_t                                port_id,
528                                          const std::vector<TrexStream *>        &streams,
529                                          std::vector<TrexStreamsCompiledObj *>  &objs,
530                                          uint8_t                                dp_core_count,
531                                          double                                 factor,
532                                          GraphNodeMap                           &nodes,
533                                          bool                                   all_continues) {
534
535    /* allocate objects for all DP cores */
536    for (uint8_t i = 0; i < dp_core_count; i++) {
537        TrexStreamsCompiledObj *obj = new TrexStreamsCompiledObj(port_id);
538        obj->m_all_continues = all_continues;
539        objs[i] = obj;
540    }
541
542    /* compile all the streams */
543    for (auto const stream : streams) {
544
545        /* skip non-enabled streams */
546        if (!stream->m_enabled) {
547            continue;
548        }
549
550        /* compile a single stream to all cores */
551        compile_stream(stream, factor, dp_core_count, objs, nodes);
552    }
553
554}
555
556/**
557 * compiles a single stream to DP objects
558 *
559 * @author imarom (03-Dec-15)
560 *
561 */
562void
563TrexStreamsCompiler::compile_stream(const TrexStream *stream,
564                                    double factor,
565                                    uint8_t dp_core_count,
566                                    std::vector<TrexStreamsCompiledObj *> &objs,
567                                    GraphNodeMap &nodes) {
568
569
570    /* fix the stream ids */
571    int new_id = nodes.get(stream->m_stream_id)->m_compressed_stream_id;
572    assert(new_id >= 0);
573
574    int new_next_id = -1;
575    if (stream->m_next_stream_id >= 0) {
576        new_next_id = nodes.get(stream->m_next_stream_id)->m_compressed_stream_id;
577    }
578
579    /* we clone because we alter the stream now */
580    std::unique_ptr<TrexStream> tmp_stream(stream->clone(true));
581    tmp_stream->update_rate_factor(factor);
582
583    /* can this stream be split to many cores ? */
584    if ( (dp_core_count == 1) || (!stream->is_splitable(dp_core_count)) ) {
585        compile_stream_on_single_core(tmp_stream.get(),
586                                      dp_core_count,
587                                      objs,
588                                      new_id,
589                                      new_next_id);
590    } else {
591        compile_stream_on_all_cores(tmp_stream.get(),
592                                    dp_core_count,
593                                    objs,
594                                    new_id,
595                                    new_next_id);
596    }
597
598}
599
600/**
601 * compile the stream on all the cores available
602 *
603 */
604void
605TrexStreamsCompiler::compile_stream_on_all_cores(TrexStream *stream,
606                                                 uint8_t dp_core_count,
607                                                 std::vector<TrexStreamsCompiledObj *> &objs,
608                                                 int new_id,
609                                                 int new_next_id) {
610
611    std::vector<TrexStream *> core_streams(dp_core_count);
612
613    int per_core_burst_total_pkts = (stream->m_burst_total_pkts / dp_core_count);
614    const int burst_remainder     = (stream->m_burst_total_pkts % dp_core_count);
615    int remainder_left            = burst_remainder;
616
617    /* this is the stream base IPG (pre split) */
618    double base_ipg_sec     = stream->get_ipg_sec();
619
620
621    /* for each core - creates its own version of the stream */
622    for (uint8_t i = 0; i < dp_core_count; i++) {
623        TrexStream *dp_stream = stream->clone();
624
625        /* fix stream ID */
626        dp_stream->fix_dp_stream_id(new_id, new_next_id);
627
628        /* some phase is added to avoid all the cores TXing at once */
629        dp_stream->m_mc_phase_pre_sec = base_ipg_sec * i;
630
631
632        /* each core gets a share of the packets */
633        dp_stream->m_burst_total_pkts  = per_core_burst_total_pkts;
634
635        /* rate is slower * dp_core_count */
636        dp_stream->update_rate_factor(1.0 / dp_core_count);
637
638
639        if (remainder_left > 0) {
640            dp_stream->m_burst_total_pkts++;
641            remainder_left--;
642            /* this core needs to wait to the rest of the cores that will participate in the last round */
643            dp_stream->m_mc_phase_post_sec = base_ipg_sec * remainder_left;
644        } else {
645            /* this core did not participate in the last round so it will wait its current round's left + burst_remainder */
646            dp_stream->m_mc_phase_post_sec = base_ipg_sec * (dp_core_count - 1 - i + burst_remainder);
647        }
648
649
650        core_streams[i] = dp_stream;
651    }
652
653    /* handle VM (split if needed) */
654    TrexVmSplitter vm_splitter;
655    vm_splitter.split( (TrexStream *)stream, core_streams);
656
657    /* attach the compiled stream of every core to its object */
658    for (uint8_t i = 0; i < dp_core_count; i++) {
659        objs[i]->add_compiled_stream(core_streams[i]);
660    }
661
662}
663
664/**
665 * compile the stream on core 0
666 *
667 */
668void
669TrexStreamsCompiler::compile_stream_on_single_core(TrexStream *stream,
670                                                   uint8_t dp_core_count,
671                                                   std::vector<TrexStreamsCompiledObj *> &objs,
672                                                   int new_id,
673                                                   int new_next_id) {
674
675    TrexStream *dp_stream = stream->clone();
676
677    /* fix stream ID */
678    dp_stream->fix_dp_stream_id(new_id, new_next_id);
679
680    /* compile the VM if exists */
681    if (!stream->m_vm.is_vm_empty()) {
682        stream->vm_compile();
683        dp_stream->m_vm_dp = stream->m_vm_dp->clone();
684    }
685
686    /* update core 0 with the real stream */
687    objs[0]->add_compiled_stream(dp_stream);
688
689
690    /* create dummy streams for the other cores */
691    for (uint8_t i = 1; i < dp_core_count; i++) {
692        TrexStream *null_dp_stream = stream->clone();
693
694        /* fix stream ID */
695        null_dp_stream->fix_dp_stream_id(new_id, new_next_id);
696
697        /* mark as null stream and add */
698        null_dp_stream->set_null_stream(true);
699        objs[i]->add_compiled_stream(null_dp_stream);
700    }
701}
702
703/**************************************
704 * streams graph
705 *************************************/
706
707/**
708 * for each stream we create the right rate events (up/down)
709 *
710 * @author imarom (24-Nov-15)
711 *
712 * @param offset_usec
713 * @param stream
714 */
715void
716TrexStreamsGraph::add_rate_events_for_stream(double &offset_usec, TrexStream *stream) {
717
718    /* for fixed rate streams such as latency - simply add a fix rate addition */
719    if (stream->is_fixed_rate_stream()) {
720        BW bw(stream->get_pps(), stream->get_bps_L2(), stream->get_bps_L1());
721        m_graph_obj->add_fixed_rate(bw);
722        return;
723    }
724
725    switch (stream->get_type()) {
726
727    case TrexStream::stCONTINUOUS:
728        add_rate_events_for_stream_cont(offset_usec, stream);
729        return;
730
731    case TrexStream::stSINGLE_BURST:
732        add_rate_events_for_stream_single_burst(offset_usec, stream);
733        return;
734
735    case TrexStream::stMULTI_BURST:
736        add_rate_events_for_stream_multi_burst(offset_usec, stream);
737        return;
738    }
739}
740
741/**
742 * continous stream
743 *
744 */
745void
746TrexStreamsGraph::add_rate_events_for_stream_cont(double &offset_usec, TrexStream *stream) {
747
748    TrexStreamsGraphObj::rate_event_st start_event;
749
750    /* for debug purposes */
751    start_event.stream_id = stream->m_stream_id;
752
753    start_event.time         = offset_usec + stream->m_isg_usec;
754    start_event.diff_pps     = stream->get_pps();
755    start_event.diff_bps_l2  = stream->get_bps_L2();
756    start_event.diff_bps_l1  = stream->get_bps_L1();
757    m_graph_obj->add_rate_event(start_event);
758
759    /* no more events after this stream */
760    offset_usec = -1;
761
762    /* also mark we have an inifite time */
763    m_graph_obj->m_expected_duration = -1;
764}
765
766/**
767 * single burst stream
768 *
769 */
770void
771TrexStreamsGraph::add_rate_events_for_stream_single_burst(double &offset_usec, TrexStream *stream) {
772    TrexStreamsGraphObj::rate_event_st start_event;
773    TrexStreamsGraphObj::rate_event_st stop_event;
774
775
776    /* for debug purposes */
777    start_event.stream_id  = stream->m_stream_id;
778    stop_event.stream_id   = stream->m_stream_id;
779
780     /* start event */
781    start_event.time = offset_usec + stream->m_isg_usec;
782    start_event.diff_pps    = stream->get_pps();
783    start_event.diff_bps_l2 = stream->get_bps_L2();
784    start_event.diff_bps_l1 = stream->get_bps_L1();
785    m_graph_obj->add_rate_event(start_event);
786
787    /* stop event */
788    stop_event.time = start_event.time + stream->get_burst_length_usec();
789    stop_event.diff_pps = -(start_event.diff_pps);
790    stop_event.diff_bps_l2 = -(start_event.diff_bps_l2);
791    stop_event.diff_bps_l1 = -(start_event.diff_bps_l1);
792    m_graph_obj->add_rate_event(stop_event);
793
794    /* next stream starts from here */
795    offset_usec = stop_event.time;
796
797}
798
799/**
800 * multi burst stream
801 *
802 */
803void
804TrexStreamsGraph::add_rate_events_for_stream_multi_burst(double &offset_usec, TrexStream *stream) {
805    TrexStreamsGraphObj::rate_event_st start_event;
806    TrexStreamsGraphObj::rate_event_st stop_event;
807
808    /* first the delay is the inter stream gap */
809    double delay = stream->m_isg_usec;
810
811    /* for debug purposes */
812
813    start_event.diff_pps     = stream->get_pps();
814    start_event.diff_bps_l2  = stream->get_bps_L2();
815    start_event.diff_bps_l1  = stream->get_bps_L1();
816    start_event.stream_id    = stream->m_stream_id;
817
818    stop_event.diff_pps      = -(start_event.diff_pps);
819    stop_event.diff_bps_l2   = -(start_event.diff_bps_l2);
820    stop_event.diff_bps_l1   = -(start_event.diff_bps_l1);
821    stop_event.stream_id     = stream->m_stream_id;
822
823    /* for each burst create up/down events */
824    for (int i = 0; i < stream->m_num_bursts; i++) {
825
826        start_event.time = offset_usec + delay;
827        m_graph_obj->add_rate_event(start_event);
828
829        stop_event.time = start_event.time + stream->get_burst_length_usec();
830        m_graph_obj->add_rate_event(stop_event);
831
832        /* after the first burst, the delay is inter burst gap */
833        delay = stream->m_ibg_usec;
834
835        offset_usec = stop_event.time;
836    }
837}
838
839/**
840 * for a single root we can until done or a loop detected
841 *
842 * @author imarom (24-Nov-15)
843 *
844 * @param root_stream_id
845 */
846void
847TrexStreamsGraph::generate_graph_for_one_root(uint32_t root_stream_id) {
848
849    std::unordered_map<uint32_t, bool> loop_hash;
850    std::stringstream ss;
851
852    uint32_t stream_id = root_stream_id;
853    double offset = 0;
854
855    while (true) {
856        TrexStream *stream;
857
858        /* fetch the stream from the hash - if it is not present, report an error */
859        try {
860            stream = m_streams_hash.at(stream_id);
861        } catch (const std::out_of_range &e) {
862            ss << "stream id " << stream_id << " does not exists";
863            throw TrexException(ss.str());
864        }
865
866        /* add the node to the hash for loop detection */
867        loop_hash[stream_id] = true;
868
869        /* create the right rate events for the stream */
870        add_rate_events_for_stream(offset, stream);
871
872        /* do we have a next stream ? */
873        if (stream->m_next_stream_id == -1) {
874            break;
875        }
876
877        /* loop detection */
878        auto search = loop_hash.find(stream->m_next_stream_id);
879        if (search != loop_hash.end()) {
880            m_graph_obj->on_loop_detection();
881            break;
882        }
883
884        /* handle the next one */
885        stream_id = stream->m_next_stream_id;
886    }
887}
888
889/**
890 * for a vector of streams generate a graph of BW
891 * see graph object for more details
892 *
893 */
894const TrexStreamsGraphObj *
895TrexStreamsGraph::generate(const std::vector<TrexStream *> &streams) {
896
897    /* main object to hold the graph - returned to the user */
898    m_graph_obj = new TrexStreamsGraphObj();
899
900    std::vector <uint32_t> root_streams;
901
902    /* before anything we create a hash streams ID
903       and grab the root nodes
904     */
905    for (TrexStream *stream : streams) {
906
907        /* skip non enabled streams */
908        if (!stream->m_enabled) {
909            continue;
910        }
911
912        /* for fast search we populate all the streams in a hash */
913        m_streams_hash[stream->m_stream_id] = stream;
914
915        /* hold all the self start nodes in a vector */
916        if (stream->m_self_start) {
917            root_streams.push_back(stream->m_stream_id);
918        }
919    }
920
921    /* for each node - scan until done or loop */
922    for (uint32_t root_id : root_streams) {
923        generate_graph_for_one_root(root_id);
924    }
925
926
927    m_graph_obj->generate();
928
929    return m_graph_obj;
930}
931
932/**************************************
933 * streams graph object
934 *************************************/
935void
936TrexStreamsGraphObj::find_max_rate() {
937
938    BW max_bw;
939    BW current_bw;
940
941    /* now we simply walk the list and hold the max */
942    for (auto &ev : m_rate_events) {
943
944        current_bw.m_pps     += ev.diff_pps;
945        current_bw.m_bps_l2  += ev.diff_bps_l2;
946        current_bw.m_bps_l1  += ev.diff_bps_l1;
947
948        max_bw.m_pps    = std::max(max_bw.m_pps,      current_bw.m_pps);
949        max_bw.m_bps_l2 = std::max(max_bw.m_bps_l2,   current_bw.m_bps_l2);
950        max_bw.m_bps_l1 = std::max(max_bw.m_bps_l1,   current_bw.m_bps_l1);
951
952    }
953
954    /* if not mark as inifite - get the last event time */
955    if ( (m_expected_duration != -1) && (m_rate_events.size() > 0) ) {
956        m_expected_duration = m_rate_events.back().time;
957    }
958
959    /* save it to the class member */
960    m_var = max_bw;
961
962    /* for total - max and fixed */
963    m_total = m_var + m_fixed;
964}
965
966static
967bool event_compare (const TrexStreamsGraphObj::rate_event_st &first, const TrexStreamsGraphObj::rate_event_st &second) {
968    return (first.time < second.time);
969}
970
971void
972TrexStreamsGraphObj::generate() {
973    m_rate_events.sort(event_compare);
974    find_max_rate();
975}
976
977