trex_stateless_dp_core.cpp revision cc352e04
1/*
2 Itay Marom
3 Cisco Systems, Inc.
4*/
5
6/*
7Copyright (c) 2015-2015 Cisco Systems, Inc.
8
9Licensed under the Apache License, Version 2.0 (the "License");
10you may not use this file except in compliance with the License.
11You may obtain a copy of the License at
12
13    http://www.apache.org/licenses/LICENSE-2.0
14
15Unless required by applicable law or agreed to in writing, software
16distributed under the License is distributed on an "AS IS" BASIS,
17WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18See the License for the specific language governing permissions and
19limitations under the License.
20*/
21#include <trex_stateless_dp_core.h>
22#include <trex_stateless_messaging.h>
23#include <trex_streams_compiler.h>
24#include <trex_stream_node.h>
25
26#include <bp_sim.h>
27
28static inline double
29usec_to_sec(double usec) {
30    return (usec / (1000 * 1000));
31}
32
33
34
35void CGenNodeStateless::free_stl_node(){
36    /* if we have cache mbuf free it */
37    rte_mbuf_t * m=get_cache_mbuf();
38    if (m) {
39        rte_pktmbuf_free(m);
40        m_cache_mbuf=0;
41    }
42}
43
44
45
46void
47TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) {
48    m_thread_id = thread_id;
49    m_core = core;
50
51    CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
52
53    m_ring_from_cp = cp_dp->getRingCpToDp(thread_id);
54    m_ring_to_cp   = cp_dp->getRingDpToCp(thread_id);
55
56    m_state = STATE_IDLE;
57}
58
59/**
60 * in idle state loop, the processor most of the time sleeps
61 * and periodically checks for messages
62 *
63 * @author imarom (01-Nov-15)
64 */
65void
66TrexStatelessDpCore::idle_state_loop() {
67
68    while (m_state == STATE_IDLE) {
69        periodic_check_for_cp_messages();
70        delay(200);
71    }
72}
73
74/**
75 * scehduler runs when traffic exists
76 * it will return when no more transmitting is done on this
77 * core
78 *
79 * @author imarom (01-Nov-15)
80 */
81void
82TrexStatelessDpCore::start_scheduler() {
83    /* creates a maintenace job using the scheduler */
84    CGenNode * node_sync = m_core->create_node() ;
85    node_sync->m_type = CGenNode::FLOW_SYNC;
86    node_sync->m_time = m_core->m_cur_time_sec + SYNC_TIME_OUT;
87    m_core->m_node_gen.add_node(node_sync);
88
89    double old_offset = 0.0;
90    m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset);
91    m_core->m_node_gen.close_file(m_core);
92}
93
94
95void
96TrexStatelessDpCore::run_once(){
97
98    idle_state_loop();
99    start_scheduler();
100}
101
102
103void
104TrexStatelessDpCore::start() {
105
106    while (true) {
107        run_once();
108    }
109}
110
111void
112TrexStatelessDpCore::add_duration(uint8_t port_id,
113                                  double duration){
114    if (duration > 0.0) {
115
116        CGenNode *node = m_core->create_node() ;
117
118        node->m_type = CGenNode::EXIT_SCHED;
119
120        /* make sure it will be scheduled after the current node */
121        node->m_time = m_core->m_cur_time_sec + duration ;
122
123        m_core->m_node_gen.add_node(node);
124
125    }
126}
127
128
129void
130TrexStatelessDpCore::add_cont_stream(uint8_t port_id,
131                                     double isg_usec,
132                                     double pps,
133                                     const uint8_t *pkt,
134                                     uint16_t pkt_len) {
135
136    CGenNodeStateless *node = m_core->create_node_sl();
137
138    /* add periodic */
139    node->m_type = CGenNode::STATELESS_PKT;
140
141    node->m_time = m_core->m_cur_time_sec + usec_to_sec(isg_usec);
142
143    pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(port_id);
144    node->m_flags = 0;
145
146    /* set socket id */
147    node->set_socket_id(m_core->m_node_gen.m_socket_id);
148
149    /* build a mbuf from a packet */
150    uint16_t pkt_size = pkt_len;
151    const uint8_t *stream_pkt = pkt;
152
153    /* stateless specific fields */
154    node->m_next_time_offset = 1.0 / pps;
155    node->m_is_stream_active = 1;
156    node->m_port_id = port_id;
157
158    /* allocate const mbuf */
159    rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size);
160    assert(m);
161
162    char *p = rte_pktmbuf_append(m, pkt_size);
163    assert(p);
164    /* copy the packet */
165    memcpy(p,stream_pkt,pkt_size);
166
167    /* set dir 0 or 1 client or server */
168    node->set_mbuf_cache_dir(dir);
169
170    /* TBD repace the mac if req we should add flag  */
171    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, m);
172
173    /* set the packet as a readonly */
174    node->set_cache_mbuf(m);
175
176    /* keep track */
177    m_active_nodes.push_back(node);
178
179    /* schedule */
180    m_core->m_node_gen.add_node((CGenNode *)node);
181
182    m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
183
184}
185
186void
187TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj) {
188    for (auto single_stream : obj->get_objects()) {
189        add_cont_stream(single_stream.m_port_id,
190                        single_stream.m_isg_usec,
191                        single_stream.m_pps,
192                        single_stream.m_pkt,
193                        single_stream.m_pkt_len);
194    }
195
196    /* TBD need to fix this */
197    add_duration(0,10.0);
198}
199
200void
201TrexStatelessDpCore::stop_traffic(uint8_t port_id) {
202    /* we cannot remove nodes not from the top of the queue so
203       for every active node - make sure next time
204       the scheduler invokes it, it will be free */
205    for (auto node : m_active_nodes) {
206        if (node->m_port_id == port_id) {
207            node->m_is_stream_active = 0;
208        }
209    }
210
211    /* remove all the non active nodes */
212    auto pred = std::remove_if(m_active_nodes.begin(),
213                               m_active_nodes.end(),
214                               [](CGenNodeStateless *node) { return (!node->m_is_stream_active); });
215
216    m_active_nodes.erase(pred, m_active_nodes.end());
217
218    if (m_active_nodes.size() == 0) {
219        m_state = STATE_IDLE;
220        /* stop the scheduler */
221
222        CGenNode *node = m_core->create_node() ;
223
224        node->m_type = CGenNode::EXIT_SCHED;
225
226        /* make sure it will be scheduled after the current node */
227        node->m_time = m_core->m_cur_time_sec + 0.0001;
228
229        m_core->m_node_gen.add_node(node);
230    }
231
232}
233
234/**
235 * handle a message from CP to DP
236 *
237 */
238void
239TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) {
240    msg->handle(this);
241    delete msg;
242}
243
244