trex_stateless_dp_core.cpp revision 6294136d
1/*
2 Itay Marom
3 Cisco Systems, Inc.
4*/
5
6/*
7Copyright (c) 2015-2015 Cisco Systems, Inc.
8
9Licensed under the Apache License, Version 2.0 (the "License");
10you may not use this file except in compliance with the License.
11You may obtain a copy of the License at
12
13    http://www.apache.org/licenses/LICENSE-2.0
14
15Unless required by applicable law or agreed to in writing, software
16distributed under the License is distributed on an "AS IS" BASIS,
17WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18See the License for the specific language governing permissions and
19limitations under the License.
20*/
21#include <trex_stateless_dp_core.h>
22#include <trex_stateless_messaging.h>
23#include <trex_streams_compiler.h>
24#include <trex_stream_node.h>
25
26#include <bp_sim.h>
27
28static inline double
29usec_to_sec(double usec) {
30    return (usec / (1000 * 1000));
31}
32
33
34
35void CGenNodeStateless::free_stl_node(){
36    /* if we have cache mbuf free it */
37    rte_mbuf_t * m=get_cache_mbuf();
38    if (m) {
39        rte_pktmbuf_free(m);
40        m_cache_mbuf=0;
41    }
42}
43
44
45
46void
47TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) {
48    m_thread_id = thread_id;
49    m_core = core;
50
51    CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
52
53    m_ring_from_cp = cp_dp->getRingCpToDp(thread_id);
54    m_ring_to_cp   = cp_dp->getRingDpToCp(thread_id);
55
56    m_state = STATE_IDLE;
57}
58
59/**
60 * in idle state loop, the processor most of the time sleeps
61 * and periodically checks for messages
62 *
63 * @author imarom (01-Nov-15)
64 */
65void
66TrexStatelessDpCore::idle_state_loop() {
67
68    while (m_state == STATE_IDLE) {
69        periodic_check_for_cp_messages();
70        delay(200);
71    }
72}
73
74/**
75 * scehduler runs when traffic exists
76 * it will return when no more transmitting is done on this
77 * core
78 *
79 * @author imarom (01-Nov-15)
80 */
81void
82TrexStatelessDpCore::start_scheduler() {
83    /* creates a maintenace job using the scheduler */
84    CGenNode * node_sync = m_core->create_node() ;
85    node_sync->m_type = CGenNode::FLOW_SYNC;
86    node_sync->m_time = m_core->m_cur_time_sec + SYNC_TIME_OUT;
87    m_core->m_node_gen.add_node(node_sync);
88
89    double old_offset = 0.0;
90    m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset);
91    m_core->m_node_gen.close_file(m_core);
92}
93
94
95void
96TrexStatelessDpCore::run_once(){
97
98    idle_state_loop();
99    start_scheduler();
100}
101
102
103void
104TrexStatelessDpCore::start() {
105
106    while (true) {
107        run_once();
108    }
109}
110
111void
112TrexStatelessDpCore::add_duration(double duration){
113    if (duration > 0.0) {
114        CGenNode *node = m_core->create_node() ;
115
116        node->m_type = CGenNode::EXIT_SCHED;
117
118        /* make sure it will be scheduled after the current node */
119        node->m_time = m_core->m_cur_time_sec + duration ;
120
121        m_core->m_node_gen.add_node(node);
122    }
123}
124
125
126void
127TrexStatelessDpCore::add_cont_stream(uint8_t port_id,
128                                     double isg_usec,
129                                     double pps,
130                                     const uint8_t *pkt,
131                                     uint16_t pkt_len) {
132
133    CGenNodeStateless *node = m_core->create_node_sl();
134
135    /* add periodic */
136    node->m_type = CGenNode::STATELESS_PKT;
137
138    node->m_time = m_core->m_cur_time_sec + usec_to_sec(isg_usec);
139
140    pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(port_id);
141    node->m_flags = 0;
142
143    /* set socket id */
144    node->set_socket_id(m_core->m_node_gen.m_socket_id);
145
146    /* build a mbuf from a packet */
147    uint16_t pkt_size = pkt_len;
148    const uint8_t *stream_pkt = pkt;
149
150    /* stateless specific fields */
151    node->m_next_time_offset = 1.0 / pps;
152    node->m_is_stream_active = 1;
153    node->m_port_id = port_id;
154
155    /* allocate const mbuf */
156    rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size);
157    assert(m);
158
159    char *p = rte_pktmbuf_append(m, pkt_size);
160    assert(p);
161    /* copy the packet */
162    memcpy(p,stream_pkt,pkt_size);
163
164    /* set dir 0 or 1 client or server */
165    node->set_mbuf_cache_dir(dir);
166
167    /* TBD repace the mac if req we should add flag  */
168    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, m);
169
170    /* set the packet as a readonly */
171    node->set_cache_mbuf(m);
172
173    /* keep track */
174    m_active_nodes.push_back(node);
175
176    /* schedule */
177    m_core->m_node_gen.add_node((CGenNode *)node);
178
179    m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
180
181}
182
183void
184TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj) {
185    for (auto single_stream : obj->get_objects()) {
186        add_cont_stream(single_stream.m_port_id,
187                        single_stream.m_isg_usec,
188                        single_stream.m_pps,
189                        single_stream.m_pkt,
190                        single_stream.m_pkt_len);
191    }
192
193    double duration=obj->get_simulation_duration();
194    printf("duration %f \n",duration);
195
196    if ( duration >0.0){
197        add_duration( duration );
198    }
199}
200
201void
202TrexStatelessDpCore::stop_traffic(uint8_t port_id) {
203    /* we cannot remove nodes not from the top of the queue so
204       for every active node - make sure next time
205       the scheduler invokes it, it will be free */
206    for (auto node : m_active_nodes) {
207        if (node->m_port_id == port_id) {
208            node->m_is_stream_active = 0;
209        }
210    }
211
212    /* remove all the non active nodes */
213    auto pred = std::remove_if(m_active_nodes.begin(),
214                               m_active_nodes.end(),
215                               [](CGenNodeStateless *node) { return (!node->m_is_stream_active); });
216
217    m_active_nodes.erase(pred, m_active_nodes.end());
218
219    if (m_active_nodes.size() == 0) {
220        m_state = STATE_IDLE;
221        /* stop the scheduler */
222
223        CGenNode *node = m_core->create_node() ;
224
225        node->m_type = CGenNode::EXIT_SCHED;
226
227        /* make sure it will be scheduled after the current node */
228        node->m_time = m_core->m_cur_time_sec + 0.0001;
229
230        m_core->m_node_gen.add_node(node);
231    }
232
233}
234
235/**
236 * handle a message from CP to DP
237 *
238 */
239void
240TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) {
241    msg->handle(this);
242    delete msg;
243}
244
245