tcp_stream.c revision e151ee29
1/*
2 * Copyright (c) 2016  Intel Corporation.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#include <string.h>
17#include <rte_malloc.h>
18#include <rte_errno.h>
19#include <rte_ethdev.h>
20#include <rte_ip.h>
21#include <rte_tcp.h>
22
23#include "tcp_stream.h"
24#include "tcp_timer.h"
25#include "stream_table.h"
26#include "misc.h"
27#include "tcp_ctl.h"
28#include "tcp_ofo.h"
29#include "tcp_txq.h"
30
31static void
32unuse_stream(struct tle_tcp_stream *s)
33{
34	s->s.type = TLE_VNUM;
35	rte_atomic32_set(&s->rx.use, INT32_MIN);
36	rte_atomic32_set(&s->tx.use, INT32_MIN);
37}
38
39static void
40fini_stream(struct tle_tcp_stream *s)
41{
42	if (s != NULL) {
43		rte_free(s->rx.q);
44		tcp_ofo_free(s->rx.ofo);
45		rte_free(s->tx.q);
46		rte_free(s->tx.drb.r);
47	}
48}
49
50static void
51tcp_fini_streams(struct tle_ctx *ctx)
52{
53	uint32_t i;
54	struct tcp_streams *ts;
55
56	ts = CTX_TCP_STREAMS(ctx);
57	if (ts != NULL) {
58		stbl_fini(&ts->st);
59		for (i = 0; i != ctx->prm.max_streams; i++)
60			fini_stream(&ts->s[i]);
61
62		/* free the timer wheel */
63		tle_timer_free(ts->tmr);
64		rte_free(ts->tsq);
65
66		STAILQ_INIT(&ts->dr.fe);
67		STAILQ_INIT(&ts->dr.be);
68	}
69
70	rte_free(ts);
71	ctx->streams.buf = NULL;
72	STAILQ_INIT(&ctx->streams.free);
73}
74
75static struct rte_ring *
76alloc_ring(uint32_t n, uint32_t flags, int32_t socket)
77{
78	struct rte_ring *r;
79	size_t sz;
80	char name[RTE_RING_NAMESIZE];
81
82	n = rte_align32pow2(n);
83	sz =  rte_ring_get_memsize(n);
84
85	r = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, socket);
86	if (r == NULL) {
87		TCP_LOG(ERR, "%s: allocation of %zu bytes on socket %d "
88			"failed with error code: %d\n",
89			__func__, sz, socket, rte_errno);
90		return NULL;
91	}
92
93	snprintf(name, sizeof(name), "%p@%zu", r, sz);
94	rte_ring_init(r, name, n, flags);
95	return r;
96}
97
98static int
99init_stream(struct tle_ctx *ctx, struct tle_tcp_stream *s)
100{
101	size_t bsz, rsz, sz;
102	uint32_t i, k, n, nb;
103	struct tle_drb *drb;
104	char name[RTE_RING_NAMESIZE];
105
106	/* init RX part. */
107
108	n = RTE_MAX(ctx->prm.max_stream_rbufs, 1U);
109	s->rx.q = alloc_ring(n, RING_F_SP_ENQ, ctx->prm.socket_id);
110	if (s->rx.q == NULL)
111		return -ENOMEM;
112
113	s->rx.ofo = tcp_ofo_alloc(n, ctx->prm.socket_id);
114	if (s->rx.ofo == NULL)
115		return -ENOMEM;
116
117	/* init TX part. */
118
119	n = RTE_MAX(ctx->prm.max_stream_sbufs, 1U);
120	s->tx.q = alloc_ring(n, RING_F_SC_DEQ, ctx->prm.socket_id);
121	if (s->tx.q == NULL)
122		return -ENOMEM;
123
124	nb = drb_nb_elem(ctx);
125	k = calc_stream_drb_num(ctx, nb);
126	n = rte_align32pow2(k);
127
128	/* size of the drbs ring */
129	rsz = rte_ring_get_memsize(n);
130	rsz = RTE_ALIGN_CEIL(rsz, RTE_CACHE_LINE_SIZE);
131
132	/* size of the drb. */
133	bsz = tle_drb_calc_size(nb);
134
135	/* total stream drbs size. */
136	sz = rsz + bsz * k;
137
138	s->tx.drb.r = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
139		ctx->prm.socket_id);
140	if (s->tx.drb.r == NULL) {
141		TCP_LOG(ERR, "%s(%p): allocation of %zu bytes on socket %d "
142			"failed with error code: %d\n",
143			__func__, s, sz, ctx->prm.socket_id, rte_errno);
144		return -ENOMEM;
145	}
146
147	snprintf(name, sizeof(name), "%p@%zu", s, sz);
148	rte_ring_init(s->tx.drb.r, name, n, 0);
149
150	for (i = 0; i != k; i++) {
151		drb = (struct tle_drb *)((uintptr_t)s->tx.drb.r +
152			rsz + bsz * i);
153		drb->udata = s;
154		drb->size = nb;
155		rte_ring_enqueue(s->tx.drb.r, drb);
156	}
157
158	s->tx.drb.nb_elem = nb;
159	s->tx.drb.nb_max = k;
160
161	/* mark stream as avaialble to use. */
162
163	s->s.ctx = ctx;
164	unuse_stream(s);
165	STAILQ_INSERT_TAIL(&ctx->streams.free, &s->s, link);
166
167	return 0;
168}
169
170static void
171tcp_free_drbs(struct tle_stream *s, struct tle_drb *drb[], uint32_t nb_drb)
172{
173	struct tle_tcp_stream *us;
174
175	us = (struct tle_tcp_stream *)s;
176	_rte_ring_enqueue_burst(us->tx.drb.r, (void **)drb, nb_drb);
177}
178
179static struct tle_timer_wheel *
180alloc_timers(uint32_t num, int32_t socket)
181{
182	struct tle_timer_wheel_args twprm;
183
184	twprm.tick_size = TCP_RTO_GRANULARITY;
185	twprm.max_timer = num;
186	twprm.socket_id = socket;
187	return tle_timer_create(&twprm, tcp_get_tms());
188}
189
190static int
191tcp_init_streams(struct tle_ctx *ctx)
192{
193	size_t sz;
194	uint32_t i;
195	int32_t rc;
196	struct tcp_streams *ts;
197
198	sz = sizeof(*ts) + sizeof(ts->s[0]) * ctx->prm.max_streams;
199	ts = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
200		ctx->prm.socket_id);
201	if (ts == NULL) {
202		TCP_LOG(ERR, "allocation of %zu bytes on socket %d "
203			"for %u tcp_streams failed\n",
204			sz, ctx->prm.socket_id, ctx->prm.max_streams);
205		return -ENOMEM;
206	}
207
208	STAILQ_INIT(&ts->dr.fe);
209	STAILQ_INIT(&ts->dr.be);
210
211	ctx->streams.buf = ts;
212	STAILQ_INIT(&ctx->streams.free);
213
214	ts->tmr = alloc_timers(ctx->prm.max_streams, ctx->prm.socket_id);
215	if (ts->tmr == NULL) {
216		TCP_LOG(ERR, "alloc_timers(ctx=%p) failed with error=%d\n",
217			ctx, rte_errno);
218		rc = -ENOMEM;
219	} else {
220		ts->tsq = alloc_ring(ctx->prm.max_streams,
221			RING_F_SC_DEQ, ctx->prm.socket_id);
222		if (ts->tsq == NULL)
223			rc = -ENOMEM;
224		else
225			rc = stbl_init(&ts->st, ctx->prm.max_streams,
226				ctx->prm.socket_id);
227	}
228
229	for (i = 0; rc == 0 && i != ctx->prm.max_streams; i++)
230		rc = init_stream(ctx, &ts->s[i]);
231
232	if (rc != 0) {
233		TCP_LOG(ERR, "initalisation of %u-th stream failed", i);
234		tcp_fini_streams(ctx);
235	}
236
237	return rc;
238}
239
240static void __attribute__((constructor))
241tcp_stream_setup(void)
242{
243	static const struct stream_ops tcp_ops = {
244		.init_streams = tcp_init_streams,
245		.fini_streams = tcp_fini_streams,
246		.free_drbs = tcp_free_drbs,
247	};
248
249	tle_stream_ops[TLE_PROTO_TCP] = tcp_ops;
250}
251
252/*
253 * Helper routine, check that input event and callback are mutually exclusive.
254 */
255static int
256check_cbev(const struct tle_event *ev, const struct tle_stream_cb *cb)
257{
258	if (ev != NULL && cb->func != NULL)
259		return -EINVAL;
260	return 0;
261}
262
263static int
264check_stream_prm(const struct tle_ctx *ctx,
265	const struct tle_tcp_stream_param *prm)
266{
267	if ((prm->addr.local.ss_family != AF_INET &&
268			prm->addr.local.ss_family != AF_INET6) ||
269			prm->addr.local.ss_family != prm->addr.remote.ss_family)
270		return -EINVAL;
271
272	/* callback and event notifications mechanisms are mutually exclusive */
273	if (check_cbev(prm->cfg.recv_ev, &prm->cfg.recv_cb) != 0 ||
274			check_cbev(prm->cfg.recv_ev, &prm->cfg.recv_cb) != 0 ||
275			check_cbev(prm->cfg.err_ev, &prm->cfg.err_cb) != 0)
276		return -EINVAL;
277
278	/* check does context support desired address family. */
279	if ((prm->addr.local.ss_family == AF_INET &&
280			ctx->prm.lookup4 == NULL) ||
281			(prm->addr.local.ss_family == AF_INET6 &&
282			ctx->prm.lookup6 == NULL))
283		return -EINVAL;
284
285	return 0;
286}
287
288struct tle_stream *
289tle_tcp_stream_open(struct tle_ctx *ctx,
290	const struct tle_tcp_stream_param *prm)
291{
292	struct tle_tcp_stream *s;
293	int32_t rc;
294
295	if (ctx == NULL || prm == NULL || check_stream_prm(ctx, prm) != 0) {
296		rte_errno = EINVAL;
297		return NULL;
298	}
299
300	s = (struct tle_tcp_stream *)get_stream(ctx);
301	if (s == NULL)	{
302		rte_errno = ENFILE;
303		return NULL;
304
305	/* some TX still pending for that stream. */
306	} else if (TCP_STREAM_TX_PENDING(s)) {
307		put_stream(ctx, &s->s, 0);
308		rte_errno = EAGAIN;
309		return NULL;
310	}
311
312	/* setup L4 ports and L3 addresses fields. */
313	rc = stream_fill_ctx(ctx, &s->s,
314		(const struct sockaddr *)&prm->addr.local,
315		(const struct sockaddr *)&prm->addr.remote);
316
317	if (rc != 0) {
318		put_stream(ctx, &s->s, 1);
319		rte_errno = rc;
320		return NULL;
321	}
322
323	/* setup stream notification menchanism */
324	s->rx.ev = prm->cfg.recv_ev;
325	s->rx.cb = prm->cfg.recv_cb;
326	s->tx.ev = prm->cfg.send_ev;
327	s->tx.cb = prm->cfg.send_cb;
328	s->err.ev = prm->cfg.err_ev;
329	s->err.cb = prm->cfg.err_cb;
330
331	/* store other params */
332	s->tcb.snd.nb_retm = (prm->cfg.nb_retries != 0) ? prm->cfg.nb_retries :
333		TLE_TCP_DEFAULT_RETRIES;
334
335	tcp_stream_up(s);
336	return &s->s;
337}
338
339/*
340 * Helper functions, used by close API.
341 */
342static inline int
343stream_close(struct tle_ctx *ctx, struct tle_tcp_stream *s)
344{
345	uint16_t uop;
346	uint32_t state;
347	static const struct tle_stream_cb zcb;
348
349	/* check was close() already invoked */
350	uop = s->tcb.uop;
351	if ((uop & TCP_OP_CLOSE) != 0)
352		return -EDEADLK;
353
354	/* record that close() was already invoked */
355	if (rte_atomic16_cmpset(&s->tcb.uop, uop, uop | TCP_OP_CLOSE) == 0)
356		return -EDEADLK;
357
358	/* mark stream as unavaialbe for RX/TX. */
359	tcp_stream_down(s);
360
361	/* reset events/callbacks */
362	s->rx.ev = NULL;
363	s->tx.ev = NULL;
364	s->err.ev = NULL;
365
366	s->rx.cb = zcb;
367	s->tx.cb = zcb;
368	s->err.cb = zcb;
369
370	state = s->tcb.state;
371
372	/* CLOSED, LISTEN, SYN_SENT - we can close the stream straighway */
373	if (state <= TCP_ST_SYN_SENT) {
374		tcp_stream_reset(ctx, s);
375		return 0;
376	}
377
378	/* generate FIN and proceed with normal connection termination */
379	if (state == TCP_ST_ESTABLISHED || state == TCP_ST_CLOSE_WAIT) {
380
381		/* change state */
382		s->tcb.state = (state == TCP_ST_ESTABLISHED) ?
383			TCP_ST_FIN_WAIT_1 : TCP_ST_LAST_ACK;
384
385		/* mark stream as writable/readable again */
386		tcp_stream_up(s);
387
388		/* queue stream into to-send queue */
389		txs_enqueue(ctx, s);
390		return 0;
391	}
392
393	/*
394	 * accroding to the state, close() was already invoked,
395	 * should never that point.
396	 */
397	RTE_ASSERT(0);
398	return -EINVAL;
399}
400
401uint32_t
402tle_tcp_stream_close_bulk(struct tle_stream *ts[], uint32_t num)
403{
404	int32_t rc;
405	uint32_t i;
406	struct tle_ctx *ctx;
407	struct tle_tcp_stream *s;
408
409	rc = 0;
410
411	for (i = 0; i != num; i++) {
412
413		s = TCP_STREAM(ts[i]);
414		if (ts[i] == NULL || s->s.type >= TLE_VNUM) {
415			rc = EINVAL;
416			break;
417		}
418
419		ctx = s->s.ctx;
420		rc = stream_close(ctx, s);
421		if (rc != 0)
422			break;
423	}
424
425	if (rc != 0)
426		rte_errno = -rc;
427	return i;
428}
429
430int
431tle_tcp_stream_close(struct tle_stream *ts)
432{
433	struct tle_ctx *ctx;
434	struct tle_tcp_stream *s;
435
436	s = TCP_STREAM(ts);
437	if (ts == NULL || s->s.type >= TLE_VNUM)
438		return -EINVAL;
439
440	ctx = s->s.ctx;
441
442	/* reset stream events if any. */
443	if (s->rx.ev != NULL)
444		tle_event_idle(s->rx.ev);
445	if (s->tx.ev != NULL)
446		tle_event_idle(s->tx.ev);
447	if (s->err.ev != NULL)
448		tle_event_idle(s->err.ev);
449
450	return stream_close(ctx, s);
451}
452
453int
454tle_tcp_stream_get_addr(const struct tle_stream *ts,
455	struct tle_tcp_stream_addr *addr)
456{
457	struct sockaddr_in *lin4, *rin4;
458	struct sockaddr_in6 *lin6, *rin6;
459	struct tle_tcp_stream *s;
460
461	s = TCP_STREAM(ts);
462	if (addr == NULL || ts == NULL || s->s.type >= TLE_VNUM)
463		return -EINVAL;
464
465	if (s->s.type == TLE_V4) {
466
467		lin4 = (struct sockaddr_in *)&addr->local;
468		rin4 = (struct sockaddr_in *)&addr->remote;
469
470		addr->local.ss_family = AF_INET;
471		addr->remote.ss_family = AF_INET;
472
473		lin4->sin_port = s->s.port.dst;
474		rin4->sin_port = s->s.port.src;
475		lin4->sin_addr.s_addr = s->s.ipv4.addr.dst;
476		rin4->sin_addr.s_addr = s->s.ipv4.addr.src;
477
478	} else if (s->s.type == TLE_V6) {
479
480		lin6 = (struct sockaddr_in6 *)&addr->local;
481		rin6 = (struct sockaddr_in6 *)&addr->remote;
482
483		addr->local.ss_family = AF_INET6;
484		addr->remote.ss_family = AF_INET6;
485
486		lin6->sin6_port = s->s.port.dst;
487		rin6->sin6_port = s->s.port.src;
488		memcpy(&lin6->sin6_addr, &s->s.ipv6.addr.dst,
489			sizeof(lin6->sin6_addr));
490		memcpy(&rin6->sin6_addr, &s->s.ipv6.addr.src,
491			sizeof(rin6->sin6_addr));
492	}
493
494	return 0;
495}
496
497int
498tle_tcp_stream_listen(struct tle_stream *ts)
499{
500	struct tle_tcp_stream *s;
501	int32_t rc;
502
503	s = TCP_STREAM(ts);
504	if (ts == NULL || s->s.type >= TLE_VNUM)
505		return -EINVAL;
506
507	/* mark stream as not closable. */
508	if (rwl_try_acquire(&s->rx.use) > 0) {
509		rc = rte_atomic16_cmpset(&s->tcb.state, TCP_ST_CLOSED,
510				TCP_ST_LISTEN);
511		if (rc != 0) {
512			s->tcb.uop |= TCP_OP_LISTEN;
513			s->tcb.rcv.wnd = calc_rx_wnd(s, TCP_WSCALE_DEFAULT);
514			rc = 0;
515		} else
516			rc = -EDEADLK;
517	} else
518		rc = -EINVAL;
519
520	rwl_release(&s->rx.use);
521	return rc;
522}
523
524/*
525 * helper function, updates stream config
526 */
527static inline int
528stream_update_cfg(struct tle_stream *ts,struct tle_tcp_stream_cfg *prm)
529{
530	int32_t rc1, rc2;
531	struct tle_tcp_stream *s;
532
533	s = TCP_STREAM(ts);
534
535	rc1 = rwl_try_acquire(&s->rx.use);
536	rc2 = rwl_try_acquire(&s->tx.use);
537
538	if (rc1 < 0 || rc2 < 0 || (s->tcb.uop & TCP_OP_CLOSE) != 0) {
539		rwl_release(&s->tx.use);
540		rwl_release(&s->rx.use);
541		return -EINVAL;
542	}
543
544	/* setup stream notification menchanism */
545	s->rx.ev = prm->recv_ev;
546	s->tx.ev = prm->send_ev;
547	s->err.ev = prm->err_ev;
548
549	s->rx.cb.data = prm->recv_cb.data;
550	s->tx.cb.data = prm->send_cb.data;
551	s->err.cb.data = prm->err_cb.data;
552
553	rte_smp_wmb();
554
555	s->rx.cb.func = prm->recv_cb.func;
556	s->tx.cb.func = prm->send_cb.func;
557	s->err.cb.func = prm->err_cb.func;
558
559	/* store other params */
560	s->tcb.snd.nb_retm = (prm->nb_retries != 0) ? prm->nb_retries :
561		TLE_TCP_DEFAULT_RETRIES;
562
563	/* invoke async notifications, if any */
564	if (rte_ring_count(s->rx.q) != 0) {
565		if (s->rx.ev != NULL)
566			tle_event_raise(s->rx.ev);
567		else if (s->rx.cb.func != NULL)
568			s->rx.cb.func(s->rx.cb.data, &s->s);
569	}
570	if (rte_ring_free_count(s->tx.q) != 0) {
571		if (s->tx.ev != NULL)
572			tle_event_raise(s->tx.ev);
573		else if (s->tx.cb.func != NULL)
574			s->tx.cb.func(s->tx.cb.data, &s->s);
575	}
576	if (s->tcb.state == TCP_ST_CLOSE_WAIT ||
577			s->tcb.state ==  TCP_ST_CLOSED) {
578		if (s->err.ev != NULL)
579			tle_event_raise(s->err.ev);
580		else if (s->err.cb.func != NULL)
581			s->err.cb.func(s->err.cb.data, &s->s);
582	}
583
584	rwl_release(&s->tx.use);
585	rwl_release(&s->rx.use);
586
587	return 0;
588}
589
590uint32_t
591tle_tcp_stream_update_cfg(struct tle_stream *ts[],
592	struct tle_tcp_stream_cfg prm[], uint32_t num)
593{
594	int32_t rc;
595	uint32_t i;
596
597	for (i = 0; i != num; i++) {
598		rc = stream_update_cfg(ts[i], &prm[i]);
599		if (rc != 0) {
600			rte_errno = -rc;
601			break;
602		}
603	}
604
605	return i;
606}
607
608int
609tle_tcp_stream_get_mss(const struct tle_stream * stream)
610{
611	struct tle_tcp_stream *tcp;
612
613	if (stream == NULL)
614		return -EINVAL;
615
616	tcp = TCP_STREAM(stream);
617	return tcp->tcb.snd.mss;
618}
619