tcp_stream.c revision 47eb00f2
1/*
2 * Copyright (c) 2016-2017  Intel Corporation.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#include <string.h>
17#include <rte_malloc.h>
18#include <rte_errno.h>
19#include <rte_ethdev.h>
20#include <rte_ip.h>
21#include <rte_tcp.h>
22
23#include "tcp_stream.h"
24#include "tcp_timer.h"
25#include "stream_table.h"
26#include "misc.h"
27#include "tcp_ctl.h"
28#include "tcp_ofo.h"
29#include "tcp_txq.h"
30
31static void
32unuse_stream(struct tle_tcp_stream *s)
33{
34	s->s.type = TLE_VNUM;
35	rte_atomic32_set(&s->use, INT32_MIN);
36}
37
38static void
39tcp_fini_streams(struct tle_ctx *ctx)
40{
41	struct tcp_streams *ts;
42
43	ts = CTX_TCP_STREAMS(ctx);
44	if (ts != NULL) {
45		stbl_fini(&ts->st);
46
47		/* free the timer wheel */
48		tle_timer_free(ts->tmr);
49		rte_free(ts->tsq);
50
51		STAILQ_INIT(&ts->dr.fe);
52		STAILQ_INIT(&ts->dr.be);
53	}
54
55	rte_free(ts);
56	ctx->streams.buf = NULL;
57	STAILQ_INIT(&ctx->streams.free);
58}
59
60static struct rte_ring *
61alloc_ring(uint32_t n, uint32_t flags, int32_t socket)
62{
63	struct rte_ring *r;
64	size_t sz;
65	char name[RTE_RING_NAMESIZE];
66
67	n = rte_align32pow2(n);
68	sz =  rte_ring_get_memsize(n);
69
70	r = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, socket);
71	if (r == NULL) {
72		TCP_LOG(ERR, "%s: allocation of %zu bytes on socket %d "
73			"failed with error code: %d\n",
74			__func__, sz, socket, rte_errno);
75		return NULL;
76	}
77
78	snprintf(name, sizeof(name), "%p@%zu", r, sz);
79	rte_ring_init(r, name, n, flags);
80	return r;
81}
82
83static void
84calc_stream_szofs(struct tle_ctx *ctx, struct stream_szofs *szofs)
85{
86	uint32_t n, na, sz, tsz;
87
88	sz = sizeof(struct tle_tcp_stream);
89
90	n = RTE_MAX(ctx->prm.max_stream_rbufs, 1U);
91	tcp_ofo_calc_elems(n, &szofs->ofo.nb_obj, &szofs->ofo.nb_max, &tsz);
92	szofs->ofo.ofs = sz;
93
94	sz += tsz;
95	sz = RTE_ALIGN_CEIL(sz, RTE_CACHE_LINE_SIZE);
96
97	na = rte_align32pow2(n);
98	szofs->rxq.ofs = sz;
99	szofs->rxq.nb_obj = na;
100
101	sz += rte_ring_get_memsize(na);
102	sz = RTE_ALIGN_CEIL(sz, RTE_CACHE_LINE_SIZE);
103
104	n = RTE_MAX(ctx->prm.max_stream_sbufs, 1U);
105	na = rte_align32pow2(n);
106	szofs->txq.ofs = sz;
107	szofs->txq.nb_obj = na;
108
109	sz += rte_ring_get_memsize(na);
110	sz = RTE_ALIGN_CEIL(sz, RTE_CACHE_LINE_SIZE);
111
112	szofs->drb.nb_obj = drb_nb_elem(ctx);
113	szofs->drb.nb_max = calc_stream_drb_num(ctx, szofs->drb.nb_obj);
114	szofs->drb.nb_rng = rte_align32pow2(szofs->drb.nb_max);
115	szofs->drb.rng_sz = rte_ring_get_memsize(szofs->drb.nb_rng);
116	szofs->drb.blk_sz = tle_drb_calc_size(szofs->drb.nb_obj);
117	szofs->drb.ofs = sz;
118
119	sz += szofs->drb.rng_sz + szofs->drb.blk_sz * szofs->drb.nb_max;
120	sz = RTE_ALIGN_CEIL(sz, RTE_CACHE_LINE_SIZE);
121
122	szofs->size = sz;
123}
124
125static int
126init_stream(struct tle_ctx *ctx, struct tle_tcp_stream *s,
127	const struct stream_szofs *szofs)
128{
129	uint32_t f, i;
130	struct tle_drb *drb;
131
132	f = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0) ? 0 :
133		(RING_F_SP_ENQ |  RING_F_SC_DEQ);
134
135	/* init RX part. */
136
137	s->rx.ofo = (void *)((uintptr_t)s + szofs->ofo.ofs);
138	tcp_ofo_init(s->rx.ofo, szofs->ofo.nb_obj, szofs->ofo.nb_max);
139
140	s->rx.q = (void *)((uintptr_t)s + szofs->rxq.ofs);
141	rte_ring_init(s->rx.q, __func__, szofs->rxq.nb_obj, f | RING_F_SP_ENQ);
142
143	/* init TX part. */
144
145	s->tx.q = (void *)((uintptr_t)s + szofs->txq.ofs);
146	rte_ring_init(s->tx.q, __func__, szofs->txq.nb_obj, f | RING_F_SC_DEQ);
147
148	s->tx.drb.r = (void *)((uintptr_t)s + szofs->drb.ofs);
149	rte_ring_init(s->tx.drb.r, __func__, szofs->drb.nb_rng, f);
150
151	for (i = 0; i != szofs->drb.nb_max; i++) {
152		drb = (struct tle_drb *)((uintptr_t)s->tx.drb.r +
153			szofs->drb.rng_sz + szofs->drb.blk_sz * i);
154		drb->udata = s;
155		drb->size = szofs->drb.nb_obj;
156		rte_ring_enqueue(s->tx.drb.r, drb);
157	}
158
159	s->tx.drb.nb_elem = szofs->drb.nb_obj;
160	s->tx.drb.nb_max = szofs->drb.nb_max;
161
162	/* mark stream as avaialble to use. */
163
164	s->s.ctx = ctx;
165	unuse_stream(s);
166	STAILQ_INSERT_TAIL(&ctx->streams.free, &s->s, link);
167
168	return 0;
169}
170
171static void
172tcp_free_drbs(struct tle_stream *s, struct tle_drb *drb[], uint32_t nb_drb)
173{
174	struct tle_tcp_stream *us;
175
176	us = (struct tle_tcp_stream *)s;
177	_rte_ring_enqueue_burst(us->tx.drb.r, (void **)drb, nb_drb);
178}
179
180static struct tle_timer_wheel *
181alloc_timers(uint32_t num, uint32_t mshift, int32_t socket)
182{
183	struct tle_timer_wheel_args twprm;
184
185	twprm.tick_size = TCP_RTO_GRANULARITY;
186	twprm.max_timer = num;
187	twprm.socket_id = socket;
188	return tle_timer_create(&twprm, tcp_get_tms(mshift));
189}
190
191static int
192tcp_init_streams(struct tle_ctx *ctx)
193{
194	size_t sz;
195	uint32_t f, i;
196	int32_t rc;
197	struct tcp_streams *ts;
198	struct tle_tcp_stream *ps;
199	struct stream_szofs szofs;
200
201	f = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0) ? 0 :
202		(RING_F_SP_ENQ |  RING_F_SC_DEQ);
203
204	calc_stream_szofs(ctx, &szofs);
205
206	sz = sizeof(*ts) + szofs.size * ctx->prm.max_streams;
207	ts = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
208		ctx->prm.socket_id);
209
210	TCP_LOG(NOTICE, "allocation of %zu bytes on socket %d "
211			"for %u tcp_streams returns %p\n",
212			sz, ctx->prm.socket_id, ctx->prm.max_streams, ts);
213	if (ts == NULL)
214		return -ENOMEM;
215
216	ts->szofs = szofs;
217
218	STAILQ_INIT(&ts->dr.fe);
219	STAILQ_INIT(&ts->dr.be);
220
221	ctx->streams.buf = ts;
222	STAILQ_INIT(&ctx->streams.free);
223
224	ts->tmr = alloc_timers(ctx->prm.max_streams, ctx->cycles_ms_shift,
225		ctx->prm.socket_id);
226	if (ts->tmr == NULL) {
227		TCP_LOG(ERR, "alloc_timers(ctx=%p) failed with error=%d\n",
228			ctx, rte_errno);
229		rc = -ENOMEM;
230	} else {
231		ts->tsq = alloc_ring(ctx->prm.max_streams,
232			f | RING_F_SC_DEQ, ctx->prm.socket_id);
233		if (ts->tsq == NULL)
234			rc = -ENOMEM;
235		else
236			rc = stbl_init(&ts->st, ctx->prm.max_streams,
237				ctx->prm.socket_id);
238	}
239
240	for (i = 0; rc == 0 && i != ctx->prm.max_streams; i++) {
241		ps = (void *)((uintptr_t)ts->s + i * ts->szofs.size);
242		rc = init_stream(ctx, ps, &ts->szofs);
243	}
244
245	if (rc != 0) {
246		TCP_LOG(ERR, "initalisation of %u-th stream failed", i);
247		tcp_fini_streams(ctx);
248	}
249
250	return rc;
251}
252
253static void __attribute__((constructor))
254tcp_stream_setup(void)
255{
256	static const struct stream_ops tcp_ops = {
257		.init_streams = tcp_init_streams,
258		.fini_streams = tcp_fini_streams,
259		.free_drbs = tcp_free_drbs,
260	};
261
262	tle_stream_ops[TLE_PROTO_TCP] = tcp_ops;
263}
264
265/*
266 * Helper routine, check that input event and callback are mutually exclusive.
267 */
268static int
269check_cbev(const struct tle_event *ev, const struct tle_stream_cb *cb)
270{
271	if (ev != NULL && cb->func != NULL)
272		return -EINVAL;
273	return 0;
274}
275
276static int
277check_stream_prm(const struct tle_ctx *ctx,
278	const struct tle_tcp_stream_param *prm)
279{
280	if ((prm->addr.local.ss_family != AF_INET &&
281			prm->addr.local.ss_family != AF_INET6) ||
282			prm->addr.local.ss_family != prm->addr.remote.ss_family)
283		return -EINVAL;
284
285	/* callback and event notifications mechanisms are mutually exclusive */
286	if (check_cbev(prm->cfg.recv_ev, &prm->cfg.recv_cb) != 0 ||
287			check_cbev(prm->cfg.recv_ev, &prm->cfg.recv_cb) != 0 ||
288			check_cbev(prm->cfg.err_ev, &prm->cfg.err_cb) != 0)
289		return -EINVAL;
290
291	/* check does context support desired address family. */
292	if ((prm->addr.local.ss_family == AF_INET &&
293			ctx->prm.lookup4 == NULL) ||
294			(prm->addr.local.ss_family == AF_INET6 &&
295			ctx->prm.lookup6 == NULL))
296		return -EINVAL;
297
298	return 0;
299}
300
301struct tle_stream *
302tle_tcp_stream_open(struct tle_ctx *ctx,
303	const struct tle_tcp_stream_param *prm)
304{
305	struct tle_tcp_stream *s;
306	int32_t rc;
307
308	if (ctx == NULL || prm == NULL || check_stream_prm(ctx, prm) != 0) {
309		rte_errno = EINVAL;
310		return NULL;
311	}
312
313	s = (struct tle_tcp_stream *)get_stream(ctx);
314	if (s == NULL)	{
315		rte_errno = ENFILE;
316		return NULL;
317
318	/* some TX still pending for that stream. */
319	} else if (TCP_STREAM_TX_PENDING(s)) {
320		put_stream(ctx, &s->s, 0);
321		rte_errno = EAGAIN;
322		return NULL;
323	}
324
325	/* setup L4 ports and L3 addresses fields. */
326	rc = stream_fill_ctx(ctx, &s->s,
327		(const struct sockaddr *)&prm->addr.local,
328		(const struct sockaddr *)&prm->addr.remote);
329
330	if (rc != 0) {
331		put_stream(ctx, &s->s, 1);
332		rte_errno = rc;
333		return NULL;
334	}
335
336	/* setup stream notification menchanism */
337	s->rx.ev = prm->cfg.recv_ev;
338	s->rx.cb = prm->cfg.recv_cb;
339	s->tx.ev = prm->cfg.send_ev;
340	s->tx.cb = prm->cfg.send_cb;
341	s->err.ev = prm->cfg.err_ev;
342	s->err.cb = prm->cfg.err_cb;
343
344	/* store other params */
345	s->flags = ctx->prm.flags;
346	s->tcb.snd.nb_retm = (prm->cfg.nb_retries != 0) ? prm->cfg.nb_retries :
347		TLE_TCP_DEFAULT_RETRIES;
348	s->tcb.snd.cwnd = (ctx->prm.icw == 0) ? TCP_INITIAL_CWND_MAX :
349				ctx->prm.icw;
350	s->tcb.snd.rto_tw = (ctx->prm.timewait == TLE_TCP_TIMEWAIT_DEFAULT) ?
351				TCP_RTO_2MSL : ctx->prm.timewait;
352
353	tcp_stream_up(s);
354	return &s->s;
355}
356
357/*
358 * Helper functions, used by close API.
359 */
360static inline int
361stream_close(struct tle_ctx *ctx, struct tle_tcp_stream *s)
362{
363	uint16_t uop;
364	uint32_t state;
365	static const struct tle_stream_cb zcb;
366
367	/* check was close() already invoked */
368	uop = s->tcb.uop;
369	if ((uop & TCP_OP_CLOSE) != 0)
370		return -EDEADLK;
371
372	/* record that close() was already invoked */
373	if (rte_atomic16_cmpset(&s->tcb.uop, uop, uop | TCP_OP_CLOSE) == 0)
374		return -EDEADLK;
375
376	/* mark stream as unavaialbe for RX/TX. */
377	tcp_stream_down(s);
378
379	/* reset events/callbacks */
380	s->rx.ev = NULL;
381	s->tx.ev = NULL;
382	s->err.ev = NULL;
383
384	s->rx.cb = zcb;
385	s->tx.cb = zcb;
386	s->err.cb = zcb;
387
388	state = s->tcb.state;
389
390	/* CLOSED, LISTEN, SYN_SENT - we can close the stream straighway */
391	if (state <= TCP_ST_SYN_SENT) {
392		tcp_stream_reset(ctx, s);
393		return 0;
394	}
395
396	/* generate FIN and proceed with normal connection termination */
397	if (state == TCP_ST_ESTABLISHED || state == TCP_ST_CLOSE_WAIT) {
398
399		/* change state */
400		s->tcb.state = (state == TCP_ST_ESTABLISHED) ?
401			TCP_ST_FIN_WAIT_1 : TCP_ST_LAST_ACK;
402
403		/* mark stream as writable/readable again */
404		tcp_stream_up(s);
405
406		/* queue stream into to-send queue */
407		txs_enqueue(ctx, s);
408		return 0;
409	}
410
411	/*
412	 * accroding to the state, close() was already invoked,
413	 * should never that point.
414	 */
415	RTE_ASSERT(0);
416	return -EINVAL;
417}
418
419uint32_t
420tle_tcp_stream_close_bulk(struct tle_stream *ts[], uint32_t num)
421{
422	int32_t rc;
423	uint32_t i;
424	struct tle_ctx *ctx;
425	struct tle_tcp_stream *s;
426
427	rc = 0;
428
429	for (i = 0; i != num; i++) {
430
431		s = TCP_STREAM(ts[i]);
432		if (ts[i] == NULL || s->s.type >= TLE_VNUM) {
433			rc = EINVAL;
434			break;
435		}
436
437		ctx = s->s.ctx;
438		rc = stream_close(ctx, s);
439		if (rc != 0)
440			break;
441	}
442
443	if (rc != 0)
444		rte_errno = -rc;
445	return i;
446}
447
448int
449tle_tcp_stream_close(struct tle_stream *ts)
450{
451	struct tle_ctx *ctx;
452	struct tle_tcp_stream *s;
453
454	s = TCP_STREAM(ts);
455	if (ts == NULL || s->s.type >= TLE_VNUM)
456		return -EINVAL;
457
458	ctx = s->s.ctx;
459	return stream_close(ctx, s);
460}
461
462int
463tle_tcp_stream_get_addr(const struct tle_stream *ts,
464	struct tle_tcp_stream_addr *addr)
465{
466	struct sockaddr_in *lin4, *rin4;
467	struct sockaddr_in6 *lin6, *rin6;
468	struct tle_tcp_stream *s;
469
470	s = TCP_STREAM(ts);
471	if (addr == NULL || ts == NULL || s->s.type >= TLE_VNUM)
472		return -EINVAL;
473
474	if (s->s.type == TLE_V4) {
475
476		lin4 = (struct sockaddr_in *)&addr->local;
477		rin4 = (struct sockaddr_in *)&addr->remote;
478
479		addr->local.ss_family = AF_INET;
480		addr->remote.ss_family = AF_INET;
481
482		lin4->sin_port = s->s.port.dst;
483		rin4->sin_port = s->s.port.src;
484		lin4->sin_addr.s_addr = s->s.ipv4.addr.dst;
485		rin4->sin_addr.s_addr = s->s.ipv4.addr.src;
486
487	} else if (s->s.type == TLE_V6) {
488
489		lin6 = (struct sockaddr_in6 *)&addr->local;
490		rin6 = (struct sockaddr_in6 *)&addr->remote;
491
492		addr->local.ss_family = AF_INET6;
493		addr->remote.ss_family = AF_INET6;
494
495		lin6->sin6_port = s->s.port.dst;
496		rin6->sin6_port = s->s.port.src;
497		memcpy(&lin6->sin6_addr, &s->s.ipv6.addr.dst,
498			sizeof(lin6->sin6_addr));
499		memcpy(&rin6->sin6_addr, &s->s.ipv6.addr.src,
500			sizeof(rin6->sin6_addr));
501	}
502
503	return 0;
504}
505
506int
507tle_tcp_stream_listen(struct tle_stream *ts)
508{
509	struct tle_tcp_stream *s;
510	int32_t rc;
511
512	s = TCP_STREAM(ts);
513	if (ts == NULL || s->s.type >= TLE_VNUM)
514		return -EINVAL;
515
516	/* app may listen for multiple times to change backlog,
517	 * we will just return success for such cases.
518	 */
519	if (s->tcb.state == TCP_ST_LISTEN)
520		return 0;
521
522	/* mark stream as not closable. */
523	if (tcp_stream_try_acquire(s) > 0) {
524		rc = rte_atomic16_cmpset(&s->tcb.state, TCP_ST_CLOSED,
525				TCP_ST_LISTEN);
526		if (rc != 0) {
527			s->tcb.uop |= TCP_OP_LISTEN;
528			s->tcb.rcv.wnd = calc_rx_wnd(s, TCP_WSCALE_DEFAULT);
529			rc = 0;
530		} else
531			rc = -EDEADLK;
532	} else
533		rc = -EINVAL;
534
535	tcp_stream_release(s);
536	return rc;
537}
538
539/*
540 * helper function, updates stream config
541 */
542static inline int
543stream_update_cfg(struct tle_stream *ts,struct tle_tcp_stream_cfg *prm)
544{
545	struct tle_tcp_stream *s;
546
547	s = TCP_STREAM(ts);
548
549	if (tcp_stream_try_acquire(s) < 0 || (s->tcb.uop & TCP_OP_CLOSE) != 0) {
550		tcp_stream_release(s);
551		return -EINVAL;
552	}
553
554	/* setup stream notification menchanism */
555	s->rx.ev = prm->recv_ev;
556	s->tx.ev = prm->send_ev;
557	s->err.ev = prm->err_ev;
558
559	s->rx.cb.data = prm->recv_cb.data;
560	s->tx.cb.data = prm->send_cb.data;
561	s->err.cb.data = prm->err_cb.data;
562
563	rte_smp_wmb();
564
565	s->rx.cb.func = prm->recv_cb.func;
566	s->tx.cb.func = prm->send_cb.func;
567	s->err.cb.func = prm->err_cb.func;
568
569	/* store other params */
570	s->tcb.snd.nb_retm = (prm->nb_retries != 0) ? prm->nb_retries :
571		TLE_TCP_DEFAULT_RETRIES;
572
573	/* invoke async notifications, if any */
574	if (rte_ring_count(s->rx.q) != 0) {
575		if (s->rx.ev != NULL)
576			tle_event_raise(s->rx.ev);
577		else if (s->rx.cb.func != NULL)
578			s->rx.cb.func(s->rx.cb.data, &s->s);
579	}
580	if (rte_ring_free_count(s->tx.q) != 0) {
581		if (s->tx.ev != NULL)
582			tle_event_raise(s->tx.ev);
583		else if (s->tx.cb.func != NULL)
584			s->tx.cb.func(s->tx.cb.data, &s->s);
585	}
586	if (s->tcb.state == TCP_ST_CLOSE_WAIT ||
587			s->tcb.state ==  TCP_ST_CLOSED) {
588		if (s->err.ev != NULL)
589			tle_event_raise(s->err.ev);
590		else if (s->err.cb.func != NULL)
591			s->err.cb.func(s->err.cb.data, &s->s);
592	}
593
594	tcp_stream_release(s);
595	return 0;
596}
597
598uint32_t
599tle_tcp_stream_update_cfg(struct tle_stream *ts[],
600	struct tle_tcp_stream_cfg prm[], uint32_t num)
601{
602	int32_t rc;
603	uint32_t i;
604
605	for (i = 0; i != num; i++) {
606		rc = stream_update_cfg(ts[i], &prm[i]);
607		if (rc != 0) {
608			rte_errno = -rc;
609			break;
610		}
611	}
612
613	return i;
614}
615
616int
617tle_tcp_stream_get_mss(const struct tle_stream * ts)
618{
619	struct tle_tcp_stream *s;
620
621	if (ts == NULL)
622		return -EINVAL;
623
624	s = TCP_STREAM(ts);
625	return s->tcb.snd.mss;
626}
627