trex_stateless_dp_core.cpp revision 3b8eb91e
1/*
2 Itay Marom
3 Cisco Systems, Inc.
4*/
5
6/*
7Copyright (c) 2015-2015 Cisco Systems, Inc.
8
9Licensed under the Apache License, Version 2.0 (the "License");
10you may not use this file except in compliance with the License.
11You may obtain a copy of the License at
12
13    http://www.apache.org/licenses/LICENSE-2.0
14
15Unless required by applicable law or agreed to in writing, software
16distributed under the License is distributed on an "AS IS" BASIS,
17WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18See the License for the specific language governing permissions and
19limitations under the License.
20*/
21#include <trex_stateless_dp_core.h>
22#include <trex_stateless_messaging.h>
23#include <trex_streams_compiler.h>
24#include <trex_stream_node.h>
25#include <trex_stream.h>
26
27#include <bp_sim.h>
28
29static inline double
30usec_to_sec(double usec) {
31    return (usec / (1000 * 1000));
32}
33
34
35
36void CGenNodeStateless::free_stl_node(){
37    /* if we have cache mbuf free it */
38    rte_mbuf_t * m=get_cache_mbuf();
39    if (m) {
40        rte_pktmbuf_free(m);
41        m_cache_mbuf=0;
42    }
43}
44
45
46
47void
48TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) {
49    m_thread_id = thread_id;
50    m_core = core;
51
52    CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
53
54    m_ring_from_cp = cp_dp->getRingCpToDp(thread_id);
55    m_ring_to_cp   = cp_dp->getRingDpToCp(thread_id);
56
57    m_state = STATE_IDLE;
58}
59
60/**
61 * in idle state loop, the processor most of the time sleeps
62 * and periodically checks for messages
63 *
64 * @author imarom (01-Nov-15)
65 */
66void
67TrexStatelessDpCore::idle_state_loop() {
68
69    while (m_state == STATE_IDLE) {
70        periodic_check_for_cp_messages();
71        delay(200);
72    }
73}
74
75
76
77void TrexStatelessDpCore::quit_main_loop(){
78    m_core->set_terminate_mode(true); /* mark it as terminated */
79    add_duration(0.0001); /* add message to terminate */
80}
81
82
83/**
84 * scehduler runs when traffic exists
85 * it will return when no more transmitting is done on this
86 * core
87 *
88 * @author imarom (01-Nov-15)
89 */
90void
91TrexStatelessDpCore::start_scheduler() {
92    /* creates a maintenace job using the scheduler */
93    CGenNode * node_sync = m_core->create_node() ;
94    node_sync->m_type = CGenNode::FLOW_SYNC;
95    node_sync->m_time = m_core->m_cur_time_sec + SYNC_TIME_OUT;
96    m_core->m_node_gen.add_node(node_sync);
97
98    double old_offset = 0.0;
99    m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset);
100    m_core->m_node_gen.close_file(m_core);
101}
102
103
104void
105TrexStatelessDpCore::run_once(){
106
107    idle_state_loop();
108    start_scheduler();
109}
110
111
112void
113TrexStatelessDpCore::start() {
114
115    while (true) {
116        run_once();
117
118        if ( m_core->is_terminated_by_master() ) {
119            break;
120        }
121    }
122}
123
124void
125TrexStatelessDpCore::add_duration(double duration){
126    if (duration > 0.0) {
127        CGenNode *node = m_core->create_node() ;
128
129        node->m_type = CGenNode::EXIT_SCHED;
130
131        /* make sure it will be scheduled after the current node */
132        node->m_time = m_core->m_cur_time_sec + duration ;
133
134        m_core->m_node_gen.add_node(node);
135    }
136}
137
138
139void
140TrexStatelessDpCore::add_cont_stream(TrexStream * stream,
141                                     TrexStreamsCompiledObj *comp) {
142
143    CGenNodeStateless *node = m_core->create_node_sl();
144
145    /* add periodic */
146    node->m_type = CGenNode::STATELESS_PKT;
147
148    node->m_time = m_core->m_cur_time_sec + usec_to_sec(stream->m_isg_usec);
149
150    pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(stream->m_port_id);
151    node->m_flags = 0;
152
153    /* set socket id */
154    node->set_socket_id(m_core->m_node_gen.m_socket_id);
155
156    /* build a mbuf from a packet */
157
158    uint16_t pkt_size = stream->m_pkt.len;
159    const uint8_t *stream_pkt = stream->m_pkt.binary;
160
161    node->m_stream_type = stream->m_type;
162    node->m_next_time_offset =  1.0 / (stream->get_pps() * comp->get_multiplier()) ;
163
164
165    /* stateless specific fields */
166    switch ( stream->m_type ) {
167
168    case TrexStream::stCONTINUOUS :
169        break;
170
171    case TrexStream::stSINGLE_BURST :
172        node->m_stream_type             = TrexStream::stMULTI_BURST;
173        node->m_single_burst            = stream->m_burst_total_pkts;
174        node->m_single_burst_refill     = stream->m_burst_total_pkts;
175        node->m_multi_bursts            = 1;  /* single burst in multi burst of 1 */
176        node->m_ibg_sec                 = 0.0;
177        break;
178
179    case TrexStream::stMULTI_BURST :
180        node->m_single_burst        = stream->m_burst_total_pkts;
181        node->m_single_burst_refill = stream->m_burst_total_pkts;
182        node->m_multi_bursts        = stream->m_num_bursts;
183        node->m_ibg_sec             = usec_to_sec( stream->m_ibg_usec );
184        break;
185    default:
186
187        assert(0);
188    };
189
190    node->m_is_stream_active = 1;
191    node->m_port_id = stream->m_port_id;
192
193    /* allocate const mbuf */
194    rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size);
195    assert(m);
196
197    char *p = rte_pktmbuf_append(m, pkt_size);
198    assert(p);
199    /* copy the packet */
200    memcpy(p,stream_pkt,pkt_size);
201
202    /* set dir 0 or 1 client or server */
203    node->set_mbuf_cache_dir(dir);
204
205    /* TBD repace the mac if req we should add flag  */
206    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, m);
207
208    /* set the packet as a readonly */
209    node->set_cache_mbuf(m);
210
211    /* keep track */
212    m_active_nodes.push_back(node);
213
214    /* schedule */
215    m_core->m_node_gen.add_node((CGenNode *)node);
216
217    m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
218
219}
220
221void
222TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj) {
223    for (auto single_stream : obj->get_objects()) {
224        add_cont_stream(single_stream.m_stream,obj);
225    }
226
227    double duration=obj->get_simulation_duration();
228
229    if ( duration >0.0){
230        add_duration( duration );
231    }
232}
233
234void
235TrexStatelessDpCore::stop_traffic(uint8_t port_id) {
236    /* we cannot remove nodes not from the top of the queue so
237       for every active node - make sure next time
238       the scheduler invokes it, it will be free */
239    for (auto node : m_active_nodes) {
240        if (node->m_port_id == port_id) {
241            node->m_is_stream_active = 0;
242        }
243    }
244
245    /* remove all the non active nodes */
246    auto pred = std::remove_if(m_active_nodes.begin(),
247                               m_active_nodes.end(),
248                               [](CGenNodeStateless *node) { return (!node->m_is_stream_active); });
249
250    m_active_nodes.erase(pred, m_active_nodes.end());
251
252    if (m_active_nodes.size() == 0) {
253        m_state = STATE_IDLE;
254        /* stop the scheduler */
255
256        CGenNode *node = m_core->create_node() ;
257
258        node->m_type = CGenNode::EXIT_SCHED;
259
260        /* make sure it will be scheduled after the current node */
261        node->m_time = m_core->m_cur_time_sec + 0.0001;
262
263        m_core->m_node_gen.add_node(node);
264    }
265
266}
267
268/**
269 * handle a message from CP to DP
270 *
271 */
272void
273TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) {
274    msg->handle(this);
275    delete msg;
276}
277
278