tcp_stream.c revision aa97dd1c
1/*
2 * Copyright (c) 2016  Intel Corporation.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#include <string.h>
17#include <rte_malloc.h>
18#include <rte_errno.h>
19#include <rte_ethdev.h>
20#include <rte_ip.h>
21#include <rte_tcp.h>
22
23#include "tcp_stream.h"
24#include "tcp_timer.h"
25#include "stream_table.h"
26#include "misc.h"
27#include "tcp_ctl.h"
28#include "tcp_ofo.h"
29#include "tcp_txq.h"
30
31
32static void
33unuse_stream(struct tle_tcp_stream *s)
34{
35	s->s.type = TLE_VNUM;
36	rte_atomic32_set(&s->rx.use, INT32_MIN);
37	rte_atomic32_set(&s->tx.use, INT32_MIN);
38}
39
40static void
41fini_stream(struct tle_tcp_stream *s)
42{
43	if (s != NULL) {
44		rte_free(s->rx.q);
45		tcp_ofo_free(s->rx.ofo);
46		rte_free(s->tx.q);
47		rte_free(s->tx.drb.r);
48	}
49}
50
51static void
52tcp_fini_streams(struct tle_ctx *ctx)
53{
54	uint32_t i;
55	struct tcp_streams *ts;
56
57	ts = CTX_TCP_STREAMS(ctx);
58	if (ts != NULL) {
59		stbl_fini(&ts->st);
60		for (i = 0; i != ctx->prm.max_streams; i++)
61			fini_stream(&ts->s[i]);
62
63		/* free the timer wheel */
64		tle_timer_free(ts->tmr);
65		rte_free(ts->tsq);
66
67		STAILQ_INIT(&ts->dr.fe);
68		STAILQ_INIT(&ts->dr.be);
69	}
70
71	rte_free(ts);
72	ctx->streams.buf = NULL;
73	STAILQ_INIT(&ctx->streams.free);
74}
75
76static struct rte_ring *
77alloc_ring(uint32_t n, uint32_t flags, int32_t socket)
78{
79	struct rte_ring *r;
80	size_t sz;
81	char name[RTE_RING_NAMESIZE];
82
83	n = rte_align32pow2(n);
84	sz = sizeof(*r) + n * sizeof(r->ring[0]);
85
86	r = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, socket);
87	if (r == NULL) {
88		TCP_LOG(ERR, "%s: allocation of %zu bytes on socket %d "
89			"failed with error code: %d\n",
90			__func__, sz, socket, rte_errno);
91		return NULL;
92	}
93
94	snprintf(name, sizeof(name), "%p@%zu", r, sz);
95	rte_ring_init(r, name, n, flags);
96	return r;
97}
98
99static int
100init_stream(struct tle_ctx *ctx, struct tle_tcp_stream *s)
101{
102	size_t bsz, rsz, sz;
103	uint32_t i, k, n, nb;
104	struct tle_drb *drb;
105	char name[RTE_RING_NAMESIZE];
106
107	/* init RX part. */
108
109	n = RTE_MAX(ctx->prm.max_stream_rbufs, 1U);
110	s->rx.q = alloc_ring(n, RING_F_SP_ENQ, ctx->prm.socket_id);
111	if (s->rx.q == NULL)
112		return -ENOMEM;
113
114	s->rx.ofo = tcp_ofo_alloc(n, ctx->prm.socket_id);
115	if (s->rx.ofo == NULL)
116		return -ENOMEM;
117
118	/* init TX part. */
119
120	n = RTE_MAX(ctx->prm.max_stream_sbufs, 1U);
121	s->tx.q = alloc_ring(n, RING_F_SC_DEQ, ctx->prm.socket_id);
122	if (s->tx.q == NULL)
123		return -ENOMEM;
124
125	nb = drb_nb_elem(ctx);
126	k = calc_stream_drb_num(ctx, nb);
127	n = rte_align32pow2(k);
128
129	/* size of the drbs ring */
130	rsz = sizeof(*s->tx.drb.r) + n * sizeof(s->tx.drb.r->ring[0]);
131	rsz = RTE_ALIGN_CEIL(rsz, RTE_CACHE_LINE_SIZE);
132
133	/* size of the drb. */
134	bsz = tle_drb_calc_size(nb);
135
136	/* total stream drbs size. */
137	sz = rsz + bsz * k;
138
139	s->tx.drb.r = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
140		ctx->prm.socket_id);
141	if (s->tx.drb.r == NULL) {
142		TCP_LOG(ERR, "%s(%p): allocation of %zu bytes on socket %d "
143			"failed with error code: %d\n",
144			__func__, s, sz, ctx->prm.socket_id, rte_errno);
145		return -ENOMEM;
146	}
147
148	snprintf(name, sizeof(name), "%p@%zu", s, sz);
149	rte_ring_init(s->tx.drb.r, name, n, 0);
150
151	for (i = 0; i != k; i++) {
152		drb = (struct tle_drb *)((uintptr_t)s->tx.drb.r +
153			rsz + bsz * i);
154		drb->udata = s;
155		drb->size = nb;
156		rte_ring_enqueue(s->tx.drb.r, drb);
157	}
158
159	s->tx.drb.nb_elem = nb;
160	s->tx.drb.nb_max = k;
161
162	/* mark stream as avaialble to use. */
163
164	s->s.ctx = ctx;
165	unuse_stream(s);
166	STAILQ_INSERT_TAIL(&ctx->streams.free, &s->s, link);
167
168	return 0;
169}
170
171static void
172tcp_free_drbs(struct tle_stream *s, struct tle_drb *drb[], uint32_t nb_drb)
173{
174	struct tle_tcp_stream *us;
175
176	us = (struct tle_tcp_stream *)s;
177	rte_ring_enqueue_burst(us->tx.drb.r, (void **)drb, nb_drb);
178}
179
180static struct tle_timer_wheel *
181alloc_timers(uint32_t num, int32_t socket)
182{
183	struct tle_timer_wheel_args twprm;
184
185	twprm.tick_size = TCP_RTO_GRANULARITY;
186	twprm.max_timer = num;
187	twprm.socket_id = socket;
188	return tle_timer_create(&twprm, tcp_get_tms());
189}
190
191static int
192tcp_init_streams(struct tle_ctx *ctx)
193{
194	size_t sz;
195	uint32_t i;
196	int32_t rc;
197	struct tcp_streams *ts;
198
199	sz = sizeof(*ts) + sizeof(ts->s[0]) * ctx->prm.max_streams;
200	ts = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
201		ctx->prm.socket_id);
202	if (ts == NULL) {
203		TCP_LOG(ERR, "allocation of %zu bytes on socket %d "
204			"for %u tcp_streams failed\n",
205			sz, ctx->prm.socket_id, ctx->prm.max_streams);
206		return -ENOMEM;
207	}
208
209	STAILQ_INIT(&ts->dr.fe);
210	STAILQ_INIT(&ts->dr.be);
211
212	ctx->streams.buf = ts;
213	STAILQ_INIT(&ctx->streams.free);
214
215	ts->tmr = alloc_timers(ctx->prm.max_streams, ctx->prm.socket_id);
216	if (ts->tmr == NULL) {
217		TCP_LOG(ERR, "alloc_timers(ctx=%p) failed with error=%d\n",
218			ctx, rte_errno);
219		rc = -ENOMEM;
220	} else {
221		ts->tsq = alloc_ring(ctx->prm.max_streams,
222			RING_F_SC_DEQ, ctx->prm.socket_id);
223		if (ts->tsq == NULL)
224			rc = -ENOMEM;
225		else
226			rc = stbl_init(&ts->st, ctx->prm.max_streams,
227				ctx->prm.socket_id);
228	}
229
230	for (i = 0; rc == 0 && i != ctx->prm.max_streams; i++)
231		rc = init_stream(ctx, &ts->s[i]);
232
233	if (rc != 0) {
234		TCP_LOG(ERR, "initalisation of %u-th stream failed", i);
235		tcp_fini_streams(ctx);
236	}
237
238	return rc;
239}
240
241static void __attribute__((constructor))
242tcp_stream_setup(void)
243{
244	static const struct stream_ops tcp_ops = {
245		.init_streams = tcp_init_streams,
246		.fini_streams = tcp_fini_streams,
247		.free_drbs = tcp_free_drbs,
248	};
249
250	tle_stream_ops[TLE_PROTO_TCP] = tcp_ops;
251}
252
253/*
254 * Helper routine, check that input event and callback are mutually exclusive.
255 */
256static int
257check_cbev(const struct tle_event *ev, const struct tle_stream_cb *cb)
258{
259	if (ev != NULL && cb->func != NULL)
260		return -EINVAL;
261	return 0;
262}
263
264static int
265check_stream_prm(const struct tle_ctx *ctx,
266	const struct tle_tcp_stream_param *prm)
267{
268	if ((prm->addr.local.ss_family != AF_INET &&
269			prm->addr.local.ss_family != AF_INET6) ||
270			prm->addr.local.ss_family != prm->addr.remote.ss_family)
271		return -EINVAL;
272
273	/* callback and event notifications mechanisms are mutually exclusive */
274	if (check_cbev(prm->cfg.recv_ev, &prm->cfg.recv_cb) != 0 ||
275			check_cbev(prm->cfg.recv_ev, &prm->cfg.recv_cb) != 0 ||
276			check_cbev(prm->cfg.err_ev, &prm->cfg.err_cb) != 0)
277		return -EINVAL;
278
279	/* check does context support desired address family. */
280	if ((prm->addr.local.ss_family == AF_INET &&
281			ctx->prm.lookup4 == NULL) ||
282			(prm->addr.local.ss_family == AF_INET6 &&
283			ctx->prm.lookup6 == NULL))
284		return -EINVAL;
285
286	return 0;
287}
288
289struct tle_stream *
290tle_tcp_stream_open(struct tle_ctx *ctx,
291	const struct tle_tcp_stream_param *prm)
292{
293	struct tle_tcp_stream *s;
294	int32_t rc;
295
296	if (ctx == NULL || prm == NULL || check_stream_prm(ctx, prm) != 0) {
297		rte_errno = EINVAL;
298		return NULL;
299	}
300
301	s = (struct tle_tcp_stream *)get_stream(ctx);
302	if (s == NULL)	{
303		rte_errno = ENFILE;
304		return NULL;
305
306	/* some TX still pending for that stream. */
307	} else if (TCP_STREAM_TX_PENDING(s)) {
308		put_stream(ctx, &s->s, 0);
309		rte_errno = EAGAIN;
310		return NULL;
311	}
312
313	/* setup L4 ports and L3 addresses fields. */
314	rc = stream_fill_ctx(ctx, &s->s,
315		(const struct sockaddr *)&prm->addr.local,
316		(const struct sockaddr *)&prm->addr.remote);
317
318	if (rc != 0) {
319		put_stream(ctx, &s->s, 1);
320		rte_errno = rc;
321		return NULL;
322	}
323
324	/* setup stream notification menchanism */
325	s->rx.ev = prm->cfg.recv_ev;
326	s->rx.cb = prm->cfg.recv_cb;
327	s->tx.ev = prm->cfg.send_ev;
328	s->tx.cb = prm->cfg.send_cb;
329	s->err.ev = prm->cfg.err_ev;
330	s->err.cb = prm->cfg.err_cb;
331
332	/* store other params */
333	s->tcb.snd.nb_retm = (prm->cfg.nb_retries != 0) ? prm->cfg.nb_retries :
334		TLE_TCP_DEFAULT_RETRIES;
335
336	tcp_stream_up(s);
337	return &s->s;
338}
339
340/*
341 * Helper functions, used by close API.
342 */
343static inline int
344stream_close(struct tle_ctx *ctx, struct tle_tcp_stream *s)
345{
346	uint16_t uop;
347	uint32_t state;
348	static const struct tle_stream_cb zcb;
349
350	/* check was close() already invoked */
351	uop = s->tcb.uop;
352	if ((uop & TCP_OP_CLOSE) != 0)
353		return -EDEADLK;
354
355	/* record that close() was already invoked */
356	if (rte_atomic16_cmpset(&s->tcb.uop, uop, uop | TCP_OP_CLOSE) == 0)
357		return -EDEADLK;
358
359	/* mark stream as unavaialbe for RX/TX. */
360	tcp_stream_down(s);
361
362	/* reset events/callbacks */
363	s->rx.ev = NULL;
364	s->tx.ev = NULL;
365	s->err.ev = NULL;
366
367	s->rx.cb = zcb;
368	s->tx.cb = zcb;
369	s->err.cb = zcb;
370
371	state = s->tcb.state;
372
373	/* CLOSED, LISTEN, SYN_SENT - we can close the stream straighway */
374	if (state <= TCP_ST_SYN_SENT) {
375		tcp_stream_reset(ctx, s);
376		return 0;
377	}
378
379	/* generate FIN and proceed with normal connection termination */
380	if (state == TCP_ST_ESTABLISHED || state == TCP_ST_CLOSE_WAIT) {
381
382		/* change state */
383		s->tcb.state = (state == TCP_ST_ESTABLISHED) ?
384			TCP_ST_FIN_WAIT_1 : TCP_ST_LAST_ACK;
385
386		/* mark stream as writable/readable again */
387		tcp_stream_up(s);
388
389		/* queue stream into to-send queue */
390		txs_enqueue(ctx, s);
391		return 0;
392	}
393
394	/*
395	 * accroding to the state, close() was already invoked,
396	 * should never that point.
397	 */
398	RTE_ASSERT(0);
399	return -EINVAL;
400}
401
402uint32_t
403tle_tcp_stream_close_bulk(struct tle_stream *ts[], uint32_t num)
404{
405	int32_t rc;
406	uint32_t i;
407	struct tle_ctx *ctx;
408	struct tle_tcp_stream *s;
409
410	rc = 0;
411
412	for (i = 0; i != num; i++) {
413
414		s = TCP_STREAM(ts[i]);
415		if (ts[i] == NULL || s->s.type >= TLE_VNUM) {
416			rc = EINVAL;
417			break;
418		}
419
420		ctx = s->s.ctx;
421		rc = stream_close(ctx, s);
422		if (rc != 0)
423			break;
424	}
425
426	if (rc != 0)
427		rte_errno = -rc;
428	return i;
429}
430
431int
432tle_tcp_stream_close(struct tle_stream *ts)
433{
434	struct tle_ctx *ctx;
435	struct tle_tcp_stream *s;
436
437	s = TCP_STREAM(ts);
438	if (ts == NULL || s->s.type >= TLE_VNUM)
439		return -EINVAL;
440
441	ctx = s->s.ctx;
442
443	/* reset stream events if any. */
444	if (s->rx.ev != NULL)
445		tle_event_idle(s->rx.ev);
446	if (s->tx.ev != NULL)
447		tle_event_idle(s->tx.ev);
448	if (s->err.ev != NULL)
449		tle_event_idle(s->err.ev);
450
451	return stream_close(ctx, s);
452}
453
454int
455tle_tcp_stream_get_addr(const struct tle_stream *ts,
456	struct tle_tcp_stream_addr *addr)
457{
458	struct sockaddr_in *lin4, *rin4;
459	struct sockaddr_in6 *lin6, *rin6;
460	struct tle_tcp_stream *s;
461
462	s = TCP_STREAM(ts);
463	if (addr == NULL || ts == NULL || s->s.type >= TLE_VNUM)
464		return -EINVAL;
465
466	if (s->s.type == TLE_V4) {
467
468		lin4 = (struct sockaddr_in *)&addr->local;
469		rin4 = (struct sockaddr_in *)&addr->remote;
470
471		addr->local.ss_family = AF_INET;
472		addr->remote.ss_family = AF_INET;
473
474		lin4->sin_port = s->s.port.dst;
475		rin4->sin_port = s->s.port.src;
476		lin4->sin_addr.s_addr = s->s.ipv4.addr.dst;
477		rin4->sin_addr.s_addr = s->s.ipv4.addr.src;
478
479	} else if (s->s.type == TLE_V6) {
480
481		lin6 = (struct sockaddr_in6 *)&addr->local;
482		rin6 = (struct sockaddr_in6 *)&addr->remote;
483
484		addr->local.ss_family = AF_INET6;
485		addr->remote.ss_family = AF_INET6;
486
487		lin6->sin6_port = s->s.port.dst;
488		rin6->sin6_port = s->s.port.src;
489		memcpy(&lin6->sin6_addr, &s->s.ipv6.addr.dst,
490			sizeof(lin6->sin6_addr));
491		memcpy(&rin6->sin6_addr, &s->s.ipv6.addr.src,
492			sizeof(rin6->sin6_addr));
493	}
494
495	return 0;
496}
497
498int
499tle_tcp_stream_listen(struct tle_stream *ts)
500{
501	struct tle_tcp_stream *s;
502	int32_t rc;
503
504	s = TCP_STREAM(ts);
505	if (ts == NULL || s->s.type >= TLE_VNUM)
506		return -EINVAL;
507
508	/* mark stream as not closable. */
509	if (rwl_try_acquire(&s->rx.use) > 0) {
510		rc = rte_atomic16_cmpset(&s->tcb.state, TCP_ST_CLOSED,
511				TCP_ST_LISTEN);
512		if (rc != 0) {
513			s->tcb.uop |= TCP_OP_LISTEN;
514			rc = 0;
515		} else
516			rc = -EDEADLK;
517	} else
518		rc = -EINVAL;
519
520	rwl_release(&s->rx.use);
521	return rc;
522}
523