tcp_stream.c revision 85b0a328
1/*
2 * Copyright (c) 2016-2017  Intel Corporation.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#include <string.h>
17#include <rte_malloc.h>
18#include <rte_errno.h>
19#include <rte_ethdev.h>
20#include <rte_ip.h>
21#include <rte_tcp.h>
22
23#include "tcp_stream.h"
24#include "tcp_timer.h"
25#include "stream_table.h"
26#include "misc.h"
27#include "tcp_ctl.h"
28#include "tcp_ofo.h"
29#include "tcp_txq.h"
30
31static void
32unuse_stream(struct tle_tcp_stream *s)
33{
34	s->s.type = TLE_VNUM;
35	rte_atomic32_set(&s->use, INT32_MIN);
36}
37
38static void
39fini_stream(struct tle_tcp_stream *s)
40{
41	if (s != NULL) {
42		rte_free(s->rx.q);
43		tcp_ofo_free(s->rx.ofo);
44		rte_free(s->tx.q);
45		rte_free(s->tx.drb.r);
46	}
47}
48
49static void
50tcp_fini_streams(struct tle_ctx *ctx)
51{
52	uint32_t i;
53	struct tcp_streams *ts;
54
55	ts = CTX_TCP_STREAMS(ctx);
56	if (ts != NULL) {
57		stbl_fini(&ts->st);
58		for (i = 0; i != ctx->prm.max_streams; i++)
59			fini_stream(&ts->s[i]);
60
61		/* free the timer wheel */
62		tle_timer_free(ts->tmr);
63		rte_free(ts->tsq);
64
65		STAILQ_INIT(&ts->dr.fe);
66		STAILQ_INIT(&ts->dr.be);
67	}
68
69	rte_free(ts);
70	ctx->streams.buf = NULL;
71	STAILQ_INIT(&ctx->streams.free);
72}
73
74static struct rte_ring *
75alloc_ring(uint32_t n, uint32_t flags, int32_t socket)
76{
77	struct rte_ring *r;
78	size_t sz;
79	char name[RTE_RING_NAMESIZE];
80
81	n = rte_align32pow2(n);
82	sz =  rte_ring_get_memsize(n);
83
84	r = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, socket);
85	if (r == NULL) {
86		TCP_LOG(ERR, "%s: allocation of %zu bytes on socket %d "
87			"failed with error code: %d\n",
88			__func__, sz, socket, rte_errno);
89		return NULL;
90	}
91
92	snprintf(name, sizeof(name), "%p@%zu", r, sz);
93	rte_ring_init(r, name, n, flags);
94	return r;
95}
96
97static int
98init_stream(struct tle_ctx *ctx, struct tle_tcp_stream *s)
99{
100	size_t bsz, rsz, sz;
101	uint32_t f, i, k, n, nb;
102	struct tle_drb *drb;
103	char name[RTE_RING_NAMESIZE];
104
105	f = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0) ? 0 :
106		(RING_F_SP_ENQ |  RING_F_SC_DEQ);
107
108	/* init RX part. */
109
110	n = RTE_MAX(ctx->prm.max_stream_rbufs, 1U);
111	s->rx.q = alloc_ring(n, f | RING_F_SP_ENQ, ctx->prm.socket_id);
112	if (s->rx.q == NULL)
113		return -ENOMEM;
114
115	s->rx.ofo = tcp_ofo_alloc(n, ctx->prm.socket_id);
116	if (s->rx.ofo == NULL)
117		return -ENOMEM;
118
119	/* init TX part. */
120
121	n = RTE_MAX(ctx->prm.max_stream_sbufs, 1U);
122	s->tx.q = alloc_ring(n, f | RING_F_SC_DEQ, ctx->prm.socket_id);
123	if (s->tx.q == NULL)
124		return -ENOMEM;
125
126	nb = drb_nb_elem(ctx);
127	k = calc_stream_drb_num(ctx, nb);
128	n = rte_align32pow2(k);
129
130	/* size of the drbs ring */
131	rsz = rte_ring_get_memsize(n);
132	rsz = RTE_ALIGN_CEIL(rsz, RTE_CACHE_LINE_SIZE);
133
134	/* size of the drb. */
135	bsz = tle_drb_calc_size(nb);
136
137	/* total stream drbs size. */
138	sz = rsz + bsz * k;
139
140	s->tx.drb.r = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
141		ctx->prm.socket_id);
142	if (s->tx.drb.r == NULL) {
143		TCP_LOG(ERR, "%s(%p): allocation of %zu bytes on socket %d "
144			"failed with error code: %d\n",
145			__func__, s, sz, ctx->prm.socket_id, rte_errno);
146		return -ENOMEM;
147	}
148
149	snprintf(name, sizeof(name), "%p@%zu", s, sz);
150	rte_ring_init(s->tx.drb.r, name, n, f);
151
152	for (i = 0; i != k; i++) {
153		drb = (struct tle_drb *)((uintptr_t)s->tx.drb.r +
154			rsz + bsz * i);
155		drb->udata = s;
156		drb->size = nb;
157		rte_ring_enqueue(s->tx.drb.r, drb);
158	}
159
160	s->tx.drb.nb_elem = nb;
161	s->tx.drb.nb_max = k;
162
163	/* mark stream as avaialble to use. */
164
165	s->s.ctx = ctx;
166	unuse_stream(s);
167	STAILQ_INSERT_TAIL(&ctx->streams.free, &s->s, link);
168
169	return 0;
170}
171
172static void
173tcp_free_drbs(struct tle_stream *s, struct tle_drb *drb[], uint32_t nb_drb)
174{
175	struct tle_tcp_stream *us;
176
177	us = (struct tle_tcp_stream *)s;
178	_rte_ring_enqueue_burst(us->tx.drb.r, (void **)drb, nb_drb);
179}
180
181static struct tle_timer_wheel *
182alloc_timers(uint32_t num, uint32_t mshift, int32_t socket)
183{
184	struct tle_timer_wheel_args twprm;
185
186	twprm.tick_size = TCP_RTO_GRANULARITY;
187	twprm.max_timer = num;
188	twprm.socket_id = socket;
189	return tle_timer_create(&twprm, tcp_get_tms(mshift));
190}
191
192static int
193tcp_init_streams(struct tle_ctx *ctx)
194{
195	size_t sz;
196	uint32_t f, i;
197	int32_t rc;
198	struct tcp_streams *ts;
199
200	f = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0) ? 0 :
201		(RING_F_SP_ENQ |  RING_F_SC_DEQ);
202
203	sz = sizeof(*ts) + sizeof(ts->s[0]) * ctx->prm.max_streams;
204	ts = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
205		ctx->prm.socket_id);
206	if (ts == NULL) {
207		TCP_LOG(ERR, "allocation of %zu bytes on socket %d "
208			"for %u tcp_streams failed\n",
209			sz, ctx->prm.socket_id, ctx->prm.max_streams);
210		return -ENOMEM;
211	}
212
213	STAILQ_INIT(&ts->dr.fe);
214	STAILQ_INIT(&ts->dr.be);
215
216	ctx->streams.buf = ts;
217	STAILQ_INIT(&ctx->streams.free);
218
219	ts->tmr = alloc_timers(ctx->prm.max_streams, ctx->cycles_ms_shift,
220		ctx->prm.socket_id);
221	if (ts->tmr == NULL) {
222		TCP_LOG(ERR, "alloc_timers(ctx=%p) failed with error=%d\n",
223			ctx, rte_errno);
224		rc = -ENOMEM;
225	} else {
226		ts->tsq = alloc_ring(ctx->prm.max_streams,
227			f | RING_F_SC_DEQ, ctx->prm.socket_id);
228		if (ts->tsq == NULL)
229			rc = -ENOMEM;
230		else
231			rc = stbl_init(&ts->st, ctx->prm.max_streams,
232				ctx->prm.socket_id);
233	}
234
235	for (i = 0; rc == 0 && i != ctx->prm.max_streams; i++)
236		rc = init_stream(ctx, &ts->s[i]);
237
238	if (rc != 0) {
239		TCP_LOG(ERR, "initalisation of %u-th stream failed", i);
240		tcp_fini_streams(ctx);
241	}
242
243	return rc;
244}
245
246static void __attribute__((constructor))
247tcp_stream_setup(void)
248{
249	static const struct stream_ops tcp_ops = {
250		.init_streams = tcp_init_streams,
251		.fini_streams = tcp_fini_streams,
252		.free_drbs = tcp_free_drbs,
253	};
254
255	tle_stream_ops[TLE_PROTO_TCP] = tcp_ops;
256}
257
258/*
259 * Helper routine, check that input event and callback are mutually exclusive.
260 */
261static int
262check_cbev(const struct tle_event *ev, const struct tle_stream_cb *cb)
263{
264	if (ev != NULL && cb->func != NULL)
265		return -EINVAL;
266	return 0;
267}
268
269static int
270check_stream_prm(const struct tle_ctx *ctx,
271	const struct tle_tcp_stream_param *prm)
272{
273	if ((prm->addr.local.ss_family != AF_INET &&
274			prm->addr.local.ss_family != AF_INET6) ||
275			prm->addr.local.ss_family != prm->addr.remote.ss_family)
276		return -EINVAL;
277
278	/* callback and event notifications mechanisms are mutually exclusive */
279	if (check_cbev(prm->cfg.recv_ev, &prm->cfg.recv_cb) != 0 ||
280			check_cbev(prm->cfg.recv_ev, &prm->cfg.recv_cb) != 0 ||
281			check_cbev(prm->cfg.err_ev, &prm->cfg.err_cb) != 0)
282		return -EINVAL;
283
284	/* check does context support desired address family. */
285	if ((prm->addr.local.ss_family == AF_INET &&
286			ctx->prm.lookup4 == NULL) ||
287			(prm->addr.local.ss_family == AF_INET6 &&
288			ctx->prm.lookup6 == NULL))
289		return -EINVAL;
290
291	return 0;
292}
293
294struct tle_stream *
295tle_tcp_stream_open(struct tle_ctx *ctx,
296	const struct tle_tcp_stream_param *prm)
297{
298	struct tle_tcp_stream *s;
299	int32_t rc;
300
301	if (ctx == NULL || prm == NULL || check_stream_prm(ctx, prm) != 0) {
302		rte_errno = EINVAL;
303		return NULL;
304	}
305
306	s = (struct tle_tcp_stream *)get_stream(ctx);
307	if (s == NULL)	{
308		rte_errno = ENFILE;
309		return NULL;
310
311	/* some TX still pending for that stream. */
312	} else if (TCP_STREAM_TX_PENDING(s)) {
313		put_stream(ctx, &s->s, 0);
314		rte_errno = EAGAIN;
315		return NULL;
316	}
317
318	/* setup L4 ports and L3 addresses fields. */
319	rc = stream_fill_ctx(ctx, &s->s,
320		(const struct sockaddr *)&prm->addr.local,
321		(const struct sockaddr *)&prm->addr.remote);
322
323	if (rc != 0) {
324		put_stream(ctx, &s->s, 1);
325		rte_errno = rc;
326		return NULL;
327	}
328
329	/* setup stream notification menchanism */
330	s->rx.ev = prm->cfg.recv_ev;
331	s->rx.cb = prm->cfg.recv_cb;
332	s->tx.ev = prm->cfg.send_ev;
333	s->tx.cb = prm->cfg.send_cb;
334	s->err.ev = prm->cfg.err_ev;
335	s->err.cb = prm->cfg.err_cb;
336
337	/* store other params */
338	s->flags = ctx->prm.flags;
339	s->tcb.snd.nb_retm = (prm->cfg.nb_retries != 0) ? prm->cfg.nb_retries :
340		TLE_TCP_DEFAULT_RETRIES;
341	s->tcb.snd.cwnd = (ctx->prm.icw == 0) ? TCP_INITIAL_CWND_MAX :
342				ctx->prm.icw;
343	s->tcb.snd.rto_tw = (ctx->prm.timewait == TLE_TCP_TIMEWAIT_DEFAULT) ?
344				TCP_RTO_2MSL : ctx->prm.timewait;
345
346	tcp_stream_up(s);
347	return &s->s;
348}
349
350/*
351 * Helper functions, used by close API.
352 */
353static inline int
354stream_close(struct tle_ctx *ctx, struct tle_tcp_stream *s)
355{
356	uint16_t uop;
357	uint32_t state;
358	static const struct tle_stream_cb zcb;
359
360	/* check was close() already invoked */
361	uop = s->tcb.uop;
362	if ((uop & TCP_OP_CLOSE) != 0)
363		return -EDEADLK;
364
365	/* record that close() was already invoked */
366	if (rte_atomic16_cmpset(&s->tcb.uop, uop, uop | TCP_OP_CLOSE) == 0)
367		return -EDEADLK;
368
369	/* mark stream as unavaialbe for RX/TX. */
370	tcp_stream_down(s);
371
372	/* reset events/callbacks */
373	s->rx.ev = NULL;
374	s->tx.ev = NULL;
375	s->err.ev = NULL;
376
377	s->rx.cb = zcb;
378	s->tx.cb = zcb;
379	s->err.cb = zcb;
380
381	state = s->tcb.state;
382
383	/* CLOSED, LISTEN, SYN_SENT - we can close the stream straighway */
384	if (state <= TCP_ST_SYN_SENT) {
385		tcp_stream_reset(ctx, s);
386		return 0;
387	}
388
389	/* generate FIN and proceed with normal connection termination */
390	if (state == TCP_ST_ESTABLISHED || state == TCP_ST_CLOSE_WAIT) {
391
392		/* change state */
393		s->tcb.state = (state == TCP_ST_ESTABLISHED) ?
394			TCP_ST_FIN_WAIT_1 : TCP_ST_LAST_ACK;
395
396		/* mark stream as writable/readable again */
397		tcp_stream_up(s);
398
399		/* queue stream into to-send queue */
400		txs_enqueue(ctx, s);
401		return 0;
402	}
403
404	/*
405	 * accroding to the state, close() was already invoked,
406	 * should never that point.
407	 */
408	RTE_ASSERT(0);
409	return -EINVAL;
410}
411
412uint32_t
413tle_tcp_stream_close_bulk(struct tle_stream *ts[], uint32_t num)
414{
415	int32_t rc;
416	uint32_t i;
417	struct tle_ctx *ctx;
418	struct tle_tcp_stream *s;
419
420	rc = 0;
421
422	for (i = 0; i != num; i++) {
423
424		s = TCP_STREAM(ts[i]);
425		if (ts[i] == NULL || s->s.type >= TLE_VNUM) {
426			rc = EINVAL;
427			break;
428		}
429
430		ctx = s->s.ctx;
431		rc = stream_close(ctx, s);
432		if (rc != 0)
433			break;
434	}
435
436	if (rc != 0)
437		rte_errno = -rc;
438	return i;
439}
440
441int
442tle_tcp_stream_close(struct tle_stream *ts)
443{
444	struct tle_ctx *ctx;
445	struct tle_tcp_stream *s;
446
447	s = TCP_STREAM(ts);
448	if (ts == NULL || s->s.type >= TLE_VNUM)
449		return -EINVAL;
450
451	ctx = s->s.ctx;
452	return stream_close(ctx, s);
453}
454
455int
456tle_tcp_stream_get_addr(const struct tle_stream *ts,
457	struct tle_tcp_stream_addr *addr)
458{
459	struct sockaddr_in *lin4, *rin4;
460	struct sockaddr_in6 *lin6, *rin6;
461	struct tle_tcp_stream *s;
462
463	s = TCP_STREAM(ts);
464	if (addr == NULL || ts == NULL || s->s.type >= TLE_VNUM)
465		return -EINVAL;
466
467	if (s->s.type == TLE_V4) {
468
469		lin4 = (struct sockaddr_in *)&addr->local;
470		rin4 = (struct sockaddr_in *)&addr->remote;
471
472		addr->local.ss_family = AF_INET;
473		addr->remote.ss_family = AF_INET;
474
475		lin4->sin_port = s->s.port.dst;
476		rin4->sin_port = s->s.port.src;
477		lin4->sin_addr.s_addr = s->s.ipv4.addr.dst;
478		rin4->sin_addr.s_addr = s->s.ipv4.addr.src;
479
480	} else if (s->s.type == TLE_V6) {
481
482		lin6 = (struct sockaddr_in6 *)&addr->local;
483		rin6 = (struct sockaddr_in6 *)&addr->remote;
484
485		addr->local.ss_family = AF_INET6;
486		addr->remote.ss_family = AF_INET6;
487
488		lin6->sin6_port = s->s.port.dst;
489		rin6->sin6_port = s->s.port.src;
490		memcpy(&lin6->sin6_addr, &s->s.ipv6.addr.dst,
491			sizeof(lin6->sin6_addr));
492		memcpy(&rin6->sin6_addr, &s->s.ipv6.addr.src,
493			sizeof(rin6->sin6_addr));
494	}
495
496	return 0;
497}
498
499int
500tle_tcp_stream_listen(struct tle_stream *ts)
501{
502	struct tle_tcp_stream *s;
503	int32_t rc;
504
505	s = TCP_STREAM(ts);
506	if (ts == NULL || s->s.type >= TLE_VNUM)
507		return -EINVAL;
508
509	/* app may listen for multiple times to change backlog,
510	 * we will just return success for such cases.
511	 */
512	if (s->tcb.state == TCP_ST_LISTEN)
513		return 0;
514
515	/* mark stream as not closable. */
516	if (tcp_stream_try_acquire(s) > 0) {
517		rc = rte_atomic16_cmpset(&s->tcb.state, TCP_ST_CLOSED,
518				TCP_ST_LISTEN);
519		if (rc != 0) {
520			s->tcb.uop |= TCP_OP_LISTEN;
521			s->tcb.rcv.wnd = calc_rx_wnd(s, TCP_WSCALE_DEFAULT);
522			rc = 0;
523		} else
524			rc = -EDEADLK;
525	} else
526		rc = -EINVAL;
527
528	tcp_stream_release(s);
529	return rc;
530}
531
532/*
533 * helper function, updates stream config
534 */
535static inline int
536stream_update_cfg(struct tle_stream *ts,struct tle_tcp_stream_cfg *prm)
537{
538	struct tle_tcp_stream *s;
539
540	s = TCP_STREAM(ts);
541
542	if (tcp_stream_try_acquire(s) < 0 || (s->tcb.uop & TCP_OP_CLOSE) != 0) {
543		tcp_stream_release(s);
544		return -EINVAL;
545	}
546
547	/* setup stream notification menchanism */
548	s->rx.ev = prm->recv_ev;
549	s->tx.ev = prm->send_ev;
550	s->err.ev = prm->err_ev;
551
552	s->rx.cb.data = prm->recv_cb.data;
553	s->tx.cb.data = prm->send_cb.data;
554	s->err.cb.data = prm->err_cb.data;
555
556	rte_smp_wmb();
557
558	s->rx.cb.func = prm->recv_cb.func;
559	s->tx.cb.func = prm->send_cb.func;
560	s->err.cb.func = prm->err_cb.func;
561
562	/* store other params */
563	s->tcb.snd.nb_retm = (prm->nb_retries != 0) ? prm->nb_retries :
564		TLE_TCP_DEFAULT_RETRIES;
565
566	/* invoke async notifications, if any */
567	if (rte_ring_count(s->rx.q) != 0) {
568		if (s->rx.ev != NULL)
569			tle_event_raise(s->rx.ev);
570		else if (s->rx.cb.func != NULL)
571			s->rx.cb.func(s->rx.cb.data, &s->s);
572	}
573	if (rte_ring_free_count(s->tx.q) != 0) {
574		if (s->tx.ev != NULL)
575			tle_event_raise(s->tx.ev);
576		else if (s->tx.cb.func != NULL)
577			s->tx.cb.func(s->tx.cb.data, &s->s);
578	}
579	if (s->tcb.state == TCP_ST_CLOSE_WAIT ||
580			s->tcb.state ==  TCP_ST_CLOSED) {
581		if (s->err.ev != NULL)
582			tle_event_raise(s->err.ev);
583		else if (s->err.cb.func != NULL)
584			s->err.cb.func(s->err.cb.data, &s->s);
585	}
586
587	tcp_stream_release(s);
588	return 0;
589}
590
591uint32_t
592tle_tcp_stream_update_cfg(struct tle_stream *ts[],
593	struct tle_tcp_stream_cfg prm[], uint32_t num)
594{
595	int32_t rc;
596	uint32_t i;
597
598	for (i = 0; i != num; i++) {
599		rc = stream_update_cfg(ts[i], &prm[i]);
600		if (rc != 0) {
601			rte_errno = -rc;
602			break;
603		}
604	}
605
606	return i;
607}
608
609int
610tle_tcp_stream_get_mss(const struct tle_stream * ts)
611{
612	struct tle_tcp_stream *s;
613
614	if (ts == NULL)
615		return -EINVAL;
616
617	s = TCP_STREAM(ts);
618	return s->tcb.snd.mss;
619}
620