trex_stateless_dp_core.cpp revision 3978adce
1/*
2 Itay Marom
3 Cisco Systems, Inc.
4*/
5
6/*
7Copyright (c) 2015-2015 Cisco Systems, Inc.
8
9Licensed under the Apache License, Version 2.0 (the "License");
10you may not use this file except in compliance with the License.
11You may obtain a copy of the License at
12
13    http://www.apache.org/licenses/LICENSE-2.0
14
15Unless required by applicable law or agreed to in writing, software
16distributed under the License is distributed on an "AS IS" BASIS,
17WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18See the License for the specific language governing permissions and
19limitations under the License.
20*/
21#include <trex_stateless_dp_core.h>
22#include <trex_stateless_messaging.h>
23#include <trex_streams_compiler.h>
24
25#include <bp_sim.h>
26
27/**
28 * extended info for the stateless node
29 * TODO:
30 * static_assert(sizeof(dp_node_extended_info_st) <= sizeof(CGenNodeStateless::m_pad_end), "hello");
31 */
32typedef struct dp_node_extended_info_ {
33    double   next_time_offset;
34    uint8_t  is_stream_active;
35
36} dp_node_extended_info_st;
37
38TrexStatelessDpCore::TrexStatelessDpCore(uint8_t thread_id, CFlowGenListPerThread *core) {
39    m_thread_id = thread_id;
40    m_core = core;
41
42    CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
43
44    m_ring_from_cp = cp_dp->getRingCpToDp(thread_id);
45    m_ring_to_cp   = cp_dp->getRingDpToCp(thread_id);
46
47    m_state = STATE_IDLE;
48}
49
50void
51TrexStatelessDpCore::start() {
52
53    /* creates a maintenace job using the scheduler */
54    CGenNode * node_sync = m_core->create_node() ;
55    node_sync->m_type = CGenNode::FLOW_SYNC;
56    node_sync->m_time = m_core->m_cur_time_sec + SYNC_TIME_OUT;
57    m_core->m_node_gen.add_node(node_sync);
58
59    double old_offset = 0.0;
60    m_core->m_node_gen.flush_file(100000000, 0.0, false, m_core, old_offset);
61
62}
63
64void
65TrexStatelessDpCore::handle_pkt_event(CGenNode *node) {
66
67    //TODO: optimize the fast path here...
68
69    CGenNodeStateless *node_sl = (CGenNodeStateless *)node;
70    dp_node_extended_info_st *opaque = (dp_node_extended_info_st *)node_sl->get_opaque_storage();
71
72    /* is this stream active ? */
73    if (!opaque->is_stream_active) {
74        m_core->free_node(node);
75        return;
76    }
77
78    m_core->m_node_gen.m_v_if->send_node(node);
79
80    /* in case of continues */
81    node->m_time += opaque->next_time_offset;
82
83    /* insert a new event */
84    m_core->m_node_gen.m_p_queue.push(node);
85}
86
87void
88TrexStatelessDpCore::add_cont_stream(double pps, const uint8_t *pkt, uint16_t pkt_len) {
89    CGenNodeStateless *node = m_core->create_node_sl();
90
91    /* add periodic */
92    node->m_type = CGenNode::STATELESS_PKT;
93    node->m_time = m_core->m_cur_time_sec + 0.0 /* STREAM ISG */;
94    node->m_flags = 0;
95
96    /* set socket id */
97    node->set_socket_id(m_core->m_node_gen.m_socket_id);
98
99    /* build a mbuf from a packet */
100    uint16_t pkt_size = pkt_len;
101    const uint8_t *stream_pkt = pkt;
102
103    dp_node_extended_info_st *opaque = (dp_node_extended_info_st *)node->get_opaque_storage();
104    opaque->next_time_offset = 1.0 / pps;
105    opaque->is_stream_active = 1;
106
107    /* allocate const mbuf */
108    rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size);
109    assert(m);
110
111    char *p = rte_pktmbuf_append(m, pkt_size);
112    assert(p);
113    /* copy the packet */
114    memcpy(p,stream_pkt,pkt_size);
115
116    /* set dir 0 or 1 client or server */
117    pkt_dir_t dir = 0;
118    node->set_mbuf_cache_dir(dir);
119
120    /* TBD repace the mac if req we should add flag  */
121    m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, m);
122
123    /* set the packet as a readonly */
124    node->set_cache_mbuf(m);
125
126    m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
127
128    /* keep track */
129    m_active_nodes.push_back(node);
130
131    /* schedule */
132    m_core->m_node_gen.add_node((CGenNode *)node);
133}
134
135void
136TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj) {
137    for (auto single_stream : obj->get_objects()) {
138        add_cont_stream(single_stream.m_pps, single_stream.m_pkt, single_stream.m_pkt_len);
139    }
140}
141
142void
143TrexStatelessDpCore::stop_traffic() {
144    /* we cannot remove nodes not from the top of the queue so
145       for every active node - make sure next time
146       the scheduler invokes it, it will be free */
147    for (auto node : m_active_nodes) {
148        dp_node_extended_info_st *opaque = (dp_node_extended_info_st *)node->get_opaque_storage();
149        opaque->is_stream_active = 0;
150    }
151    m_active_nodes.clear();
152
153    m_state = STATE_IDLE;
154}
155
156/**
157 * handle a message from CP to DP
158 *
159 */
160void
161TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) {
162    msg->handle(this);
163    delete msg;
164}
165
166