1/*
2 * Copyright (c) 2016  Intel Corporation.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#include <string.h>
17#include <rte_malloc.h>
18#include <rte_errno.h>
19#include <rte_ethdev.h>
20#include <rte_ip.h>
21#include <rte_udp.h>
22
23#include "udp_stream.h"
24#include "misc.h"
25
26static void
27unuse_stream(struct tle_udp_stream *s)
28{
29	s->s.type = TLE_VNUM;
30	rte_atomic32_set(&s->rx.use, INT32_MIN);
31	rte_atomic32_set(&s->tx.use, INT32_MIN);
32}
33
34static void
35fini_stream(struct tle_udp_stream *s)
36{
37	if (s != NULL) {
38		rte_free(s->rx.q);
39		rte_free(s->tx.drb.r);
40	}
41}
42
43static void
44udp_fini_streams(struct tle_ctx *ctx)
45{
46	uint32_t i;
47	struct tle_udp_stream *s;
48
49	s = ctx->streams.buf;
50	if (s != NULL) {
51		for (i = 0; i != ctx->prm.max_streams; i++)
52			fini_stream(s + i);
53	}
54
55	rte_free(s);
56	ctx->streams.buf = NULL;
57	STAILQ_INIT(&ctx->streams.free);
58}
59
60static int
61init_stream(struct tle_ctx *ctx, struct tle_udp_stream *s)
62{
63	size_t bsz, rsz, sz;
64	uint32_t i, k, n, nb;
65	struct tle_drb *drb;
66	char name[RTE_RING_NAMESIZE];
67
68	/* init RX part. */
69
70	n = RTE_MAX(ctx->prm.max_stream_rbufs, 1U);
71	n = rte_align32pow2(n);
72	sz = rte_ring_get_memsize(n);
73
74	s->rx.q = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
75		ctx->prm.socket_id);
76	if (s->rx.q == NULL) {
77		UDP_LOG(ERR, "%s(%p): allocation of %zu bytes on socket %d "
78			"failed with error code: %d\n",
79			__func__, s, sz, ctx->prm.socket_id, rte_errno);
80		return -ENOMEM;
81	}
82
83	snprintf(name, sizeof(name), "%p@%zu", s, sz);
84	rte_ring_init(s->rx.q, name, n, RING_F_SP_ENQ);
85
86	/* init TX part. */
87
88	nb = drb_nb_elem(ctx);
89	k = calc_stream_drb_num(ctx, nb);
90	n = rte_align32pow2(k);
91
92	/* size of the drbs ring */
93	rsz = rte_ring_get_memsize(n);
94	rsz = RTE_ALIGN_CEIL(rsz, RTE_CACHE_LINE_SIZE);
95
96	/* size of the drb. */
97	bsz = tle_drb_calc_size(nb);
98
99	/* total stream drbs size. */
100	sz = rsz + bsz * k;
101
102	s->tx.drb.r = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
103		ctx->prm.socket_id);
104	if (s->tx.drb.r == NULL) {
105		UDP_LOG(ERR, "%s(%p): allocation of %zu bytes on socket %d "
106			"failed with error code: %d\n",
107			__func__, s, sz, ctx->prm.socket_id, rte_errno);
108		return -ENOMEM;
109	}
110
111	snprintf(name, sizeof(name), "%p@%zu", s, sz);
112	rte_ring_init(s->tx.drb.r, name, n, 0);
113
114	for (i = 0; i != k; i++) {
115		drb = (struct tle_drb *)((uintptr_t)s->tx.drb.r +
116			rsz + bsz * i);
117		drb->udata = s;
118		drb->size = nb;
119		rte_ring_enqueue(s->tx.drb.r, drb);
120	}
121
122	s->tx.drb.nb_elem = nb;
123	s->tx.drb.nb_max = k;
124
125	/* mark stream as avaialble to use. */
126
127	s->s.ctx = ctx;
128	unuse_stream(s);
129	STAILQ_INSERT_TAIL(&ctx->streams.free, &s->s, link);
130
131	return 0;
132}
133
134static void
135udp_free_drbs(struct tle_stream *s, struct tle_drb *drb[], uint32_t nb_drb)
136{
137	struct tle_udp_stream *us;
138
139	us = (struct tle_udp_stream *)s;
140	_rte_ring_enqueue_burst(us->tx.drb.r, (void **)drb, nb_drb);
141}
142
143static int
144udp_init_streams(struct tle_ctx *ctx)
145{
146	size_t sz;
147	uint32_t i;
148	int32_t rc;
149	struct tle_udp_stream *s;
150
151	sz = sizeof(*s) * ctx->prm.max_streams;
152	s = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
153		ctx->prm.socket_id);
154	if (s == NULL) {
155		UDP_LOG(ERR, "allocation of %zu bytes on socket %d "
156			"for %u udp_streams failed\n",
157			sz, ctx->prm.socket_id, ctx->prm.max_streams);
158		return -ENOMEM;
159	}
160
161	ctx->streams.buf = s;
162	STAILQ_INIT(&ctx->streams.free);
163
164	for (i = 0; i != ctx->prm.max_streams; i++) {
165		rc = init_stream(ctx, s + i);
166		if (rc != 0) {
167			UDP_LOG(ERR, "initalisation of %u-th stream failed", i);
168			udp_fini_streams(ctx);
169			return rc;
170		}
171	}
172
173	ctx->streams.nb_free = ctx->prm.max_streams;
174	return 0;
175}
176
177static void __attribute__((constructor))
178udp_stream_setup(void)
179{
180	static const struct stream_ops udp_ops = {
181		.init_streams = udp_init_streams,
182		.fini_streams = udp_fini_streams,
183		.free_drbs = udp_free_drbs,
184	};
185
186	tle_stream_ops[TLE_PROTO_UDP] = udp_ops;
187}
188
189static inline void
190stream_down(struct tle_udp_stream *s)
191{
192	rwl_down(&s->rx.use);
193	rwl_down(&s->tx.use);
194}
195
196static inline void
197stream_up(struct tle_udp_stream *s)
198{
199	rwl_up(&s->rx.use);
200	rwl_up(&s->tx.use);
201}
202
203static int
204check_stream_prm(const struct tle_ctx *ctx,
205	const struct tle_udp_stream_param *prm)
206{
207	if ((prm->local_addr.ss_family != AF_INET &&
208			prm->local_addr.ss_family != AF_INET6) ||
209			prm->local_addr.ss_family != prm->remote_addr.ss_family)
210		return -EINVAL;
211
212	/* callback and event notifications mechanisms are mutually exclusive */
213	if ((prm->recv_ev != NULL && prm->recv_cb.func != NULL) ||
214			(prm->send_ev != NULL && prm->send_cb.func != NULL))
215		return -EINVAL;
216
217	/* check does context support desired address family. */
218	if ((prm->local_addr.ss_family == AF_INET &&
219			ctx->prm.lookup4 == NULL) ||
220			(prm->local_addr.ss_family == AF_INET6 &&
221			ctx->prm.lookup6 == NULL))
222		return -EINVAL;
223
224	return 0;
225}
226
227struct tle_stream *
228tle_udp_stream_open(struct tle_ctx *ctx,
229	const struct tle_udp_stream_param *prm)
230{
231	struct tle_udp_stream *s;
232	int32_t rc;
233
234	if (ctx == NULL || prm == NULL || check_stream_prm(ctx, prm) != 0) {
235		rte_errno = EINVAL;
236		return NULL;
237	}
238
239	s = (struct tle_udp_stream *)get_stream(ctx);
240	if (s == NULL)	{
241		rte_errno = ENFILE;
242		return NULL;
243
244	/* some TX still pending for that stream. */
245	} else if (UDP_STREAM_TX_PENDING(s)) {
246		put_stream(ctx, &s->s, 0);
247		rte_errno = EAGAIN;
248		return NULL;
249	}
250
251	/* copy input parameters. */
252	s->prm = *prm;
253
254	/* setup L4 ports and L3 addresses fields. */
255	rc = stream_fill_ctx(ctx, &s->s,
256		(const struct sockaddr *)&prm->local_addr,
257		(const struct sockaddr *)&prm->remote_addr);
258
259	if (rc != 0) {
260		put_stream(ctx, &s->s, 1);
261		s = NULL;
262		rte_errno = rc;
263	} else {
264		/* setup stream notification menchanism */
265		s->rx.ev = prm->recv_ev;
266		s->rx.cb = prm->recv_cb;
267		s->tx.ev = prm->send_ev;
268		s->tx.cb = prm->send_cb;
269
270		/* mark stream as avaialbe for RX/TX */
271		if (s->tx.ev != NULL)
272			tle_event_raise(s->tx.ev);
273		stream_up(s);
274	}
275
276	return &s->s;
277}
278
279int
280tle_udp_stream_close(struct tle_stream *us)
281{
282	int32_t rc;
283	struct tle_ctx *ctx;
284	struct tle_udp_stream *s;
285
286	static const struct tle_stream_cb zcb;
287
288	s = UDP_STREAM(us);
289	if (us == NULL || s->s.type >= TLE_VNUM)
290		return -EINVAL;
291
292	ctx = s->s.ctx;
293
294	/* mark stream as unavaialbe for RX/TX. */
295	stream_down(s);
296
297	/* reset stream events if any. */
298	if (s->rx.ev != NULL) {
299		tle_event_idle(s->rx.ev);
300		s->rx.ev = NULL;
301	}
302	if (s->tx.ev != NULL) {
303		tle_event_idle(s->tx.ev);
304		s->tx.ev = NULL;
305	}
306
307	s->rx.cb = zcb;
308	s->tx.cb = zcb;
309
310	/* free stream's destination port */
311	rc = stream_clear_ctx(ctx, &s->s);
312
313	/* empty stream's RX queue */
314	empty_mbuf_ring(s->rx.q);
315
316	/*
317	 * mark the stream as free again.
318	 * if there still are pkts queued for TX,
319	 * then put this stream to the tail of free list.
320	 */
321	put_stream(ctx, &s->s, UDP_STREAM_TX_FINISHED(s));
322	return rc;
323}
324
325int
326tle_udp_stream_get_param(const struct tle_stream *us,
327	struct tle_udp_stream_param *prm)
328{
329	struct sockaddr_in *lin4;
330	struct sockaddr_in6 *lin6;
331	const struct tle_udp_stream *s;
332
333	s = UDP_STREAM(us);
334	if (prm == NULL || us == NULL || s->s.type >= TLE_VNUM)
335		return -EINVAL;
336
337	prm[0] = s->prm;
338	if (prm->local_addr.ss_family == AF_INET) {
339		lin4 = (struct sockaddr_in *)&prm->local_addr;
340		lin4->sin_port = s->s.port.dst;
341	} else if (s->prm.local_addr.ss_family == AF_INET6) {
342		lin6 = (struct sockaddr_in6 *)&prm->local_addr;
343		lin6->sin6_port = s->s.port.dst;
344	}
345
346	return 0;
347}
348