trex_stateless_dp_core.cpp revision 94b12389
1/*
2 Itay Marom
3 Cisco Systems, Inc.
4*/
5
6/*
7Copyright (c) 2015-2015 Cisco Systems, Inc.
8
9Licensed under the Apache License, Version 2.0 (the "License");
10you may not use this file except in compliance with the License.
11You may obtain a copy of the License at
12
13    http://www.apache.org/licenses/LICENSE-2.0
14
15Unless required by applicable law or agreed to in writing, software
16distributed under the License is distributed on an "AS IS" BASIS,
17WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18See the License for the specific language governing permissions and
19limitations under the License.
20*/
21#include <trex_stateless_dp_core.h>
22#include <trex_stateless_messaging.h>
23#include <trex_streams_compiler.h>
24#include <trex_stream_node.h>
25#include <trex_stream.h>
26
27#include <bp_sim.h>
28
29static inline double
30usec_to_sec(double usec) {
31    return (usec / (1000 * 1000));
32}
33
34
35
36void CGenNodeStateless::free_stl_node(){
37    /* if we have cache mbuf free it */
38    rte_mbuf_t * m=get_cache_mbuf();
39    if (m) {
40        rte_pktmbuf_free(m);
41        m_cache_mbuf=0;
42    }
43}
44
45
46
47void
48TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) {
49    m_thread_id = thread_id;
50    m_core = core;
51
52    CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
53
54    m_ring_from_cp = cp_dp->getRingCpToDp(thread_id);
55    m_ring_to_cp   = cp_dp->getRingDpToCp(thread_id);
56
57    m_state = STATE_IDLE;
58}
59
60/**
61 * in idle state loop, the processor most of the time sleeps
62 * and periodically checks for messages
63 *
64 * @author imarom (01-Nov-15)
65 */
66void
67TrexStatelessDpCore::idle_state_loop() {
68
69    while (m_state == STATE_IDLE) {
70        periodic_check_for_cp_messages();
71        delay(200);
72    }
73}
74
75/**
76 * scehduler runs when traffic exists
77 * it will return when no more transmitting is done on this
78 * core
79 *
80 * @author imarom (01-Nov-15)
81 */
82void
83TrexStatelessDpCore::start_scheduler() {
84    /* creates a maintenace job using the scheduler */
85    CGenNode * node_sync = m_core->create_node() ;
86    node_sync->m_type = CGenNode::FLOW_SYNC;
87    node_sync->m_time = m_core->m_cur_time_sec + SYNC_TIME_OUT;
88    m_core->m_node_gen.add_node(node_sync);
89
90    double old_offset = 0.0;
91    m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset);
92    m_core->m_node_gen.close_file(m_core);
93}
94
95
96void
97TrexStatelessDpCore::run_once(){
98
99    idle_state_loop();
100    start_scheduler();
101}
102
103
104void
105TrexStatelessDpCore::start() {
106
107    while (true) {
108        run_once();
109    }
110}
111
112void
113TrexStatelessDpCore::add_duration(double duration){
114    if (duration > 0.0) {
115        CGenNode *node = m_core->create_node() ;
116
117        node->m_type = CGenNode::EXIT_SCHED;
118
119        /* make sure it will be scheduled after the current node */
120        node->m_time = m_core->m_cur_time_sec + duration ;
121
122        m_core->m_node_gen.add_node(node);
123    }
124}
125
126
127void
128TrexStatelessDpCore::add_cont_stream(TrexStream * stream,
129                                     TrexStreamsCompiledObj *comp) {
130
131    CGenNodeStateless *node = m_core->create_node_sl();
132
133    /* add periodic */
134    node->m_type = CGenNode::STATELESS_PKT;
135
136    node->m_time = m_core->m_cur_time_sec + usec_to_sec(stream->m_isg_usec);
137
138    pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(stream->m_port_id);
139    node->m_flags = 0;
140
141    /* set socket id */
142    node->set_socket_id(m_core->m_node_gen.m_socket_id);
143
144    /* build a mbuf from a packet */
145
146    uint16_t pkt_size = stream->m_pkt.len;
147    const uint8_t *stream_pkt = stream->m_pkt.binary;
148
149    node->m_stream_type = stream->m_type;
150    node->m_next_time_offset =  1.0 / (stream->get_pps() * comp->get_multiplier()) ;
151
152
153    /* stateless specific fields */
154    switch ( stream->m_type ) {
155
156    case TrexStream::stCONTINUOUS :
157        break;
158
159    case TrexStream::stSINGLE_BURST :
160        node->m_stream_type             = TrexStream::stMULTI_BURST;
161        node->m_single_burst            = stream->m_burst_total_pkts;
162        node->m_single_burst_refill     = stream->m_burst_total_pkts;
163        node->m_multi_bursts            = 1;  /* single burst in multi burst of 1 */
164        node->m_ibg_sec                 = 0.0;
165        break;
166
167    case TrexStream::stMULTI_BURST :
168        node->m_single_burst        = stream->m_burst_total_pkts;
169        node->m_single_burst_refill = stream->m_burst_total_pkts;
170        node->m_multi_bursts        = stream->m_num_bursts;
171        node->m_ibg_sec             = usec_to_sec( stream->m_ibg_usec );
172        break;
173    default:
174
175        assert(0);
176    };
177
178    node->m_is_stream_active = 1;
179    node->m_port_id = stream->m_port_id;
180
181    /* allocate const mbuf */
182    rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size);
183    assert(m);
184
185    char *p = rte_pktmbuf_append(m, pkt_size);
186    assert(p);
187    /* copy the packet */
188    memcpy(p,stream_pkt,pkt_size);
189
190    /* set dir 0 or 1 client or server */
191    node->set_mbuf_cache_dir(dir);
192
193    /* TBD repace the mac if req we should add flag  */
194    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, m);
195
196    /* set the packet as a readonly */
197    node->set_cache_mbuf(m);
198
199    /* keep track */
200    m_active_nodes.push_back(node);
201
202    /* schedule */
203    m_core->m_node_gen.add_node((CGenNode *)node);
204
205    m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
206
207}
208
209void
210TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj, double duration) {
211    for (auto single_stream : obj->get_objects()) {
212        add_cont_stream(single_stream.m_stream,obj);
213    }
214
215    if ( duration > 0.0 ){
216        add_duration( duration );
217    }
218}
219
220void
221TrexStatelessDpCore::stop_traffic(uint8_t port_id) {
222    /* we cannot remove nodes not from the top of the queue so
223       for every active node - make sure next time
224       the scheduler invokes it, it will be free */
225    for (auto node : m_active_nodes) {
226        if (node->m_port_id == port_id) {
227            node->m_is_stream_active = 0;
228        }
229    }
230
231    /* remove all the non active nodes */
232    auto pred = std::remove_if(m_active_nodes.begin(),
233                               m_active_nodes.end(),
234                               [](CGenNodeStateless *node) { return (!node->m_is_stream_active); });
235
236    m_active_nodes.erase(pred, m_active_nodes.end());
237
238    if (m_active_nodes.size() == 0) {
239        m_state = STATE_IDLE;
240        /* stop the scheduler */
241
242        CGenNode *node = m_core->create_node() ;
243
244        node->m_type = CGenNode::EXIT_SCHED;
245
246        /* make sure it will be scheduled after the current node */
247        node->m_time = m_core->m_cur_time_sec + 0.0001;
248
249        m_core->m_node_gen.add_node(node);
250    }
251
252}
253
254/**
255 * handle a message from CP to DP
256 *
257 */
258void
259TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) {
260    msg->handle(this);
261    delete msg;
262}
263
264