udp_stream.c revision aa97dd1c
1/*
2 * Copyright (c) 2016  Intel Corporation.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#include <string.h>
17#include <rte_malloc.h>
18#include <rte_errno.h>
19#include <rte_ethdev.h>
20#include <rte_ip.h>
21#include <rte_udp.h>
22
23#include "udp_stream.h"
24#include "misc.h"
25
26static void
27unuse_stream(struct tle_udp_stream *s)
28{
29	s->s.type = TLE_VNUM;
30	rte_atomic32_set(&s->rx.use, INT32_MIN);
31	rte_atomic32_set(&s->tx.use, INT32_MIN);
32}
33
34static void
35fini_stream(struct tle_udp_stream *s)
36{
37	if (s != NULL) {
38		rte_free(s->rx.q);
39		rte_free(s->tx.drb.r);
40	}
41}
42
43static void
44udp_fini_streams(struct tle_ctx *ctx)
45{
46	uint32_t i;
47	struct tle_udp_stream *s;
48
49	s = ctx->streams.buf;
50	if (s != NULL) {
51		for (i = 0; i != ctx->prm.max_streams; i++)
52			fini_stream(s + i);
53	}
54
55	rte_free(s);
56	ctx->streams.buf = NULL;
57	STAILQ_INIT(&ctx->streams.free);
58}
59
60static int
61init_stream(struct tle_ctx *ctx, struct tle_udp_stream *s)
62{
63	size_t bsz, rsz, sz;
64	uint32_t i, k, n, nb;
65	struct tle_drb *drb;
66	char name[RTE_RING_NAMESIZE];
67
68	/* init RX part. */
69
70	n = RTE_MAX(ctx->prm.max_stream_rbufs, 1U);
71	n = rte_align32pow2(n);
72	sz = sizeof(*s->rx.q) + n * sizeof(s->rx.q->ring[0]);
73
74	s->rx.q = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
75		ctx->prm.socket_id);
76	if (s->rx.q == NULL) {
77		UDP_LOG(ERR, "%s(%p): allocation of %zu bytes on socket %d "
78			"failed with error code: %d\n",
79			__func__, s, sz, ctx->prm.socket_id, rte_errno);
80		return -ENOMEM;
81	}
82
83	snprintf(name, sizeof(name), "%p@%zu", s, sz);
84	rte_ring_init(s->rx.q, name, n, RING_F_SP_ENQ);
85
86	/* init TX part. */
87
88	nb = drb_nb_elem(ctx);
89	k = calc_stream_drb_num(ctx, nb);
90	n = rte_align32pow2(k);
91
92	/* size of the drbs ring */
93	rsz = sizeof(*s->tx.drb.r) + n * sizeof(s->tx.drb.r->ring[0]);
94	rsz = RTE_ALIGN_CEIL(rsz, RTE_CACHE_LINE_SIZE);
95
96	/* size of the drb. */
97	bsz = tle_drb_calc_size(nb);
98
99	/* total stream drbs size. */
100	sz = rsz + bsz * k;
101
102	s->tx.drb.r = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
103		ctx->prm.socket_id);
104	if (s->tx.drb.r == NULL) {
105		UDP_LOG(ERR, "%s(%p): allocation of %zu bytes on socket %d "
106			"failed with error code: %d\n",
107			__func__, s, sz, ctx->prm.socket_id, rte_errno);
108		return -ENOMEM;
109	}
110
111	snprintf(name, sizeof(name), "%p@%zu", s, sz);
112	rte_ring_init(s->tx.drb.r, name, n, 0);
113
114	for (i = 0; i != k; i++) {
115		drb = (struct tle_drb *)((uintptr_t)s->tx.drb.r +
116			rsz + bsz * i);
117		drb->udata = s;
118		drb->size = nb;
119		rte_ring_enqueue(s->tx.drb.r, drb);
120	}
121
122	s->tx.drb.nb_elem = nb;
123	s->tx.drb.nb_max = k;
124
125	/* mark stream as avaialble to use. */
126
127	s->s.ctx = ctx;
128	unuse_stream(s);
129	STAILQ_INSERT_TAIL(&ctx->streams.free, &s->s, link);
130
131	return 0;
132}
133
134static void
135udp_free_drbs(struct tle_stream *s, struct tle_drb *drb[], uint32_t nb_drb)
136{
137	struct tle_udp_stream *us;
138
139	us = (struct tle_udp_stream *)s;
140	rte_ring_enqueue_burst(us->tx.drb.r, (void **)drb, nb_drb);
141}
142
143static int
144udp_init_streams(struct tle_ctx *ctx)
145{
146	size_t sz;
147	uint32_t i;
148	int32_t rc;
149	struct tle_udp_stream *s;
150
151	sz = sizeof(*s) * ctx->prm.max_streams;
152	s = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
153		ctx->prm.socket_id);
154	if (s == NULL) {
155		UDP_LOG(ERR, "allocation of %zu bytes on socket %d "
156			"for %u udp_streams failed\n",
157			sz, ctx->prm.socket_id, ctx->prm.max_streams);
158		return -ENOMEM;
159	}
160
161	ctx->streams.buf = s;
162	STAILQ_INIT(&ctx->streams.free);
163
164	for (i = 0; i != ctx->prm.max_streams; i++) {
165		rc = init_stream(ctx, s + i);
166		if (rc != 0) {
167			UDP_LOG(ERR, "initalisation of %u-th stream failed", i);
168			udp_fini_streams(ctx);
169			return rc;
170		}
171	}
172
173	return 0;
174}
175
176static void __attribute__((constructor))
177udp_stream_setup(void)
178{
179	static const struct stream_ops udp_ops = {
180		.init_streams = udp_init_streams,
181		.fini_streams = udp_fini_streams,
182		.free_drbs = udp_free_drbs,
183	};
184
185	tle_stream_ops[TLE_PROTO_UDP] = udp_ops;
186}
187
188static inline void
189stream_down(struct tle_udp_stream *s)
190{
191	rwl_down(&s->rx.use);
192	rwl_down(&s->tx.use);
193}
194
195static inline void
196stream_up(struct tle_udp_stream *s)
197{
198	rwl_up(&s->rx.use);
199	rwl_up(&s->tx.use);
200}
201
202static int
203check_stream_prm(const struct tle_ctx *ctx,
204	const struct tle_udp_stream_param *prm)
205{
206	if ((prm->local_addr.ss_family != AF_INET &&
207			prm->local_addr.ss_family != AF_INET6) ||
208			prm->local_addr.ss_family != prm->remote_addr.ss_family)
209		return -EINVAL;
210
211	/* callback and event notifications mechanisms are mutually exclusive */
212	if ((prm->recv_ev != NULL && prm->recv_cb.func != NULL) ||
213			(prm->send_ev != NULL && prm->send_cb.func != NULL))
214		return -EINVAL;
215
216	/* check does context support desired address family. */
217	if ((prm->local_addr.ss_family == AF_INET &&
218			ctx->prm.lookup4 == NULL) ||
219			(prm->local_addr.ss_family == AF_INET6 &&
220			ctx->prm.lookup6 == NULL))
221		return -EINVAL;
222
223	return 0;
224}
225
226struct tle_stream *
227tle_udp_stream_open(struct tle_ctx *ctx,
228	const struct tle_udp_stream_param *prm)
229{
230	struct tle_udp_stream *s;
231	int32_t rc;
232
233	if (ctx == NULL || prm == NULL || check_stream_prm(ctx, prm) != 0) {
234		rte_errno = EINVAL;
235		return NULL;
236	}
237
238	s = (struct tle_udp_stream *)get_stream(ctx);
239	if (s == NULL)	{
240		rte_errno = ENFILE;
241		return NULL;
242
243	/* some TX still pending for that stream. */
244	} else if (UDP_STREAM_TX_PENDING(s)) {
245		put_stream(ctx, &s->s, 0);
246		rte_errno = EAGAIN;
247		return NULL;
248	}
249
250	/* copy input parameters. */
251	s->prm = *prm;
252
253	/* setup L4 ports and L3 addresses fields. */
254	rc = stream_fill_ctx(ctx, &s->s,
255		(const struct sockaddr *)&prm->local_addr,
256		(const struct sockaddr *)&prm->remote_addr);
257
258	if (rc != 0) {
259		put_stream(ctx, &s->s, 1);
260		s = NULL;
261		rte_errno = rc;
262	} else {
263		/* setup stream notification menchanism */
264		s->rx.ev = prm->recv_ev;
265		s->rx.cb = prm->recv_cb;
266		s->tx.ev = prm->send_ev;
267		s->tx.cb = prm->send_cb;
268
269		/* mark stream as avaialbe for RX/TX */
270		if (s->tx.ev != NULL)
271			tle_event_raise(s->tx.ev);
272		stream_up(s);
273	}
274
275	return &s->s;
276}
277
278int
279tle_udp_stream_close(struct tle_stream *us)
280{
281	int32_t rc;
282	struct tle_ctx *ctx;
283	struct tle_udp_stream *s;
284
285	static const struct tle_stream_cb zcb;
286
287	s = UDP_STREAM(us);
288	if (us == NULL || s->s.type >= TLE_VNUM)
289		return -EINVAL;
290
291	ctx = s->s.ctx;
292
293	/* mark stream as unavaialbe for RX/TX. */
294	stream_down(s);
295
296	/* reset stream events if any. */
297	if (s->rx.ev != NULL) {
298		tle_event_idle(s->rx.ev);
299		s->rx.ev = NULL;
300	}
301	if (s->tx.ev != NULL) {
302		tle_event_idle(s->tx.ev);
303		s->tx.ev = NULL;
304	}
305
306	s->rx.cb = zcb;
307	s->tx.cb = zcb;
308
309	/* free stream's destination port */
310	rc = stream_clear_ctx(ctx, &s->s);
311
312	/* empty stream's RX queue */
313	empty_mbuf_ring(s->rx.q);
314
315	/*
316	 * mark the stream as free again.
317	 * if there still are pkts queued for TX,
318	 * then put this stream to the tail of free list.
319	 */
320	put_stream(ctx, &s->s, UDP_STREAM_TX_FINISHED(s));
321	return rc;
322}
323
324int
325tle_udp_stream_get_param(const struct tle_stream *us,
326	struct tle_udp_stream_param *prm)
327{
328	struct sockaddr_in *lin4;
329	struct sockaddr_in6 *lin6;
330	const struct tle_udp_stream *s;
331
332	s = UDP_STREAM(us);
333	if (prm == NULL || us == NULL || s->s.type >= TLE_VNUM)
334		return -EINVAL;
335
336	prm[0] = s->prm;
337	if (prm->local_addr.ss_family == AF_INET) {
338		lin4 = (struct sockaddr_in *)&prm->local_addr;
339		lin4->sin_port = s->s.port.dst;
340	} else if (s->prm.local_addr.ss_family == AF_INET6) {
341		lin6 = (struct sockaddr_in6 *)&prm->local_addr;
342		lin6->sin6_port = s->s.port.dst;
343	}
344
345	return 0;
346}
347