trex_stateless_dp_core.cpp revision bc7d9ee8
1/*
2 Itay Marom
3 Cisco Systems, Inc.
4*/
5
6/*
7Copyright (c) 2015-2015 Cisco Systems, Inc.
8
9Licensed under the Apache License, Version 2.0 (the "License");
10you may not use this file except in compliance with the License.
11You may obtain a copy of the License at
12
13    http://www.apache.org/licenses/LICENSE-2.0
14
15Unless required by applicable law or agreed to in writing, software
16distributed under the License is distributed on an "AS IS" BASIS,
17WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18See the License for the specific language governing permissions and
19limitations under the License.
20*/
21#include <trex_stateless_dp_core.h>
22#include <trex_stateless_messaging.h>
23#include <trex_streams_compiler.h>
24#include <trex_stream_node.h>
25
26#include <bp_sim.h>
27
28static inline double
29usec_to_sec(double usec) {
30    return (usec / (1000 * 1000));
31}
32
33
34void
35TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) {
36    m_thread_id = thread_id;
37    m_core = core;
38
39    CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
40
41    m_ring_from_cp = cp_dp->getRingCpToDp(thread_id);
42    m_ring_to_cp   = cp_dp->getRingDpToCp(thread_id);
43
44    m_state = STATE_IDLE;
45}
46
47/**
48 * in idle state loop, the processor most of the time sleeps
49 * and periodically checks for messages
50 *
51 * @author imarom (01-Nov-15)
52 */
53void
54TrexStatelessDpCore::idle_state_loop() {
55
56    while (m_state == STATE_IDLE) {
57        periodic_check_for_cp_messages();
58        delay(200);
59    }
60}
61
62/**
63 * scehduler runs when traffic exists
64 * it will return when no more transmitting is done on this
65 * core
66 *
67 * @author imarom (01-Nov-15)
68 */
69void
70TrexStatelessDpCore::start_scheduler() {
71    /* creates a maintenace job using the scheduler */
72    CGenNode * node_sync = m_core->create_node() ;
73    node_sync->m_type = CGenNode::FLOW_SYNC;
74    node_sync->m_time = m_core->m_cur_time_sec + SYNC_TIME_OUT;
75    m_core->m_node_gen.add_node(node_sync);
76
77    double old_offset = 0.0;
78    m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset);
79}
80
81void
82TrexStatelessDpCore::start() {
83
84    while (true) {
85        idle_state_loop();
86
87        start_scheduler();
88    }
89}
90
91void
92TrexStatelessDpCore::add_cont_stream(uint8_t port_id,
93                                     double isg_usec,
94                                     double pps,
95                                     const uint8_t *pkt,
96                                     uint16_t pkt_len) {
97
98    CGenNodeStateless *node = m_core->create_node_sl();
99
100    /* add periodic */
101    node->m_type = CGenNode::STATELESS_PKT;
102
103    node->m_time = m_core->m_cur_time_sec + usec_to_sec(isg_usec);
104
105    pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(port_id);
106    node->m_flags = 0;
107
108    /* set socket id */
109    node->set_socket_id(m_core->m_node_gen.m_socket_id);
110
111    /* build a mbuf from a packet */
112    uint16_t pkt_size = pkt_len;
113    const uint8_t *stream_pkt = pkt;
114
115    /* stateless specific fields */
116    node->m_next_time_offset = 1.0 / pps;
117    node->m_is_stream_active = 1;
118    node->m_port_id = port_id;
119
120    /* allocate const mbuf */
121    rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size);
122    assert(m);
123
124    char *p = rte_pktmbuf_append(m, pkt_size);
125    assert(p);
126    /* copy the packet */
127    memcpy(p,stream_pkt,pkt_size);
128
129    /* set dir 0 or 1 client or server */
130    node->set_mbuf_cache_dir(dir);
131
132    /* TBD repace the mac if req we should add flag  */
133    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, m);
134
135    /* set the packet as a readonly */
136    node->set_cache_mbuf(m);
137
138    /* keep track */
139    m_active_nodes.push_back(node);
140
141    /* schedule */
142    m_core->m_node_gen.add_node((CGenNode *)node);
143
144    m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
145
146}
147
148void
149TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj) {
150    for (auto single_stream : obj->get_objects()) {
151        add_cont_stream(single_stream.m_port_id,
152                        single_stream.m_isg_usec,
153                        single_stream.m_pps,
154                        single_stream.m_pkt,
155                        single_stream.m_pkt_len);
156    }
157}
158
159void
160TrexStatelessDpCore::stop_traffic(uint8_t port_id) {
161    /* we cannot remove nodes not from the top of the queue so
162       for every active node - make sure next time
163       the scheduler invokes it, it will be free */
164    for (auto node : m_active_nodes) {
165        if (node->m_port_id == port_id) {
166            node->m_is_stream_active = 0;
167        }
168    }
169
170    /* remove all the non active nodes */
171    auto pred = std::remove_if(m_active_nodes.begin(),
172                               m_active_nodes.end(),
173                               [](CGenNodeStateless *node) { return (!node->m_is_stream_active); });
174
175    m_active_nodes.erase(pred, m_active_nodes.end());
176
177    if (m_active_nodes.size() == 0) {
178        m_state = STATE_IDLE;
179        /* stop the scheduler */
180
181        CGenNode *node = m_core->create_node() ;
182
183        node->m_type = CGenNode::EXIT_SCHED;
184
185        /* make sure it will be scheduled after the current node */
186        node->m_time = m_core->m_cur_time_sec + 0.0001;
187
188        m_core->m_node_gen.add_node(node);
189    }
190
191}
192
193/**
194 * handle a message from CP to DP
195 *
196 */
197void
198TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) {
199    msg->handle(this);
200    delete msg;
201}
202
203