1/*
2 * Copyright (c) 2016-2017  Intel Corporation.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#include <string.h>
17#include <rte_malloc.h>
18#include <rte_errno.h>
19#include <rte_ethdev.h>
20#include <rte_ip.h>
21#include <rte_tcp.h>
22
23#include "tcp_stream.h"
24#include "tcp_timer.h"
25#include "stream_table.h"
26#include "misc.h"
27#include "tcp_ctl.h"
28#include "tcp_ofo.h"
29#include "tcp_txq.h"
30
31#define MAX_STREAM_BURST	0x40
32
33static void
34unuse_stream(struct tle_tcp_stream *s)
35{
36	s->s.type = TLE_VNUM;
37	rte_atomic32_set(&s->use, INT32_MIN);
38}
39
40static void
41tcp_fini_streams(struct tle_ctx *ctx)
42{
43	struct tcp_streams *ts;
44
45	ts = CTX_TCP_STREAMS(ctx);
46	if (ts != NULL) {
47
48		stbl_fini(&ts->st);
49		tle_timer_free(ts->tmr);
50		rte_free(ts->tsq);
51		tle_memtank_dump(stdout, ts->mts, TLE_MTANK_DUMP_STAT);
52		tle_memtank_sanity_check(ts->mts, 0);
53		tle_memtank_destroy(ts->mts);
54
55		STAILQ_INIT(&ts->dr.fe);
56		STAILQ_INIT(&ts->dr.be);
57	}
58
59	rte_free(ts);
60	ctx->streams.buf = NULL;
61	STAILQ_INIT(&ctx->streams.free);
62}
63
64static struct rte_ring *
65alloc_ring(uint32_t n, uint32_t flags, int32_t socket)
66{
67	struct rte_ring *r;
68	size_t sz;
69	char name[RTE_RING_NAMESIZE];
70
71	n = rte_align32pow2(n);
72	sz =  rte_ring_get_memsize(n);
73
74	r = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, socket);
75	if (r == NULL) {
76		TCP_LOG(ERR, "%s: allocation of %zu bytes on socket %d "
77			"failed with error code: %d\n",
78			__func__, sz, socket, rte_errno);
79		return NULL;
80	}
81
82	snprintf(name, sizeof(name), "%p@%zu", r, sz);
83	rte_ring_init(r, name, n, flags);
84	return r;
85}
86
87static void
88calc_stream_szofs(struct tle_ctx *ctx, struct stream_szofs *szofs)
89{
90	uint32_t n, na, sz, tsz;
91
92	sz = sizeof(struct tle_tcp_stream);
93
94	n = RTE_MAX(ctx->prm.max_stream_rbufs, 1U);
95	tcp_ofo_calc_elems(n, &szofs->ofo.nb_obj, &szofs->ofo.nb_max, &tsz);
96	szofs->ofo.ofs = sz;
97
98	sz += tsz;
99	sz = RTE_ALIGN_CEIL(sz, RTE_CACHE_LINE_SIZE);
100
101	na = rte_align32pow2(n);
102	szofs->rxq.ofs = sz;
103	szofs->rxq.nb_obj = na;
104
105	sz += rte_ring_get_memsize(na);
106	sz = RTE_ALIGN_CEIL(sz, RTE_CACHE_LINE_SIZE);
107
108	n = RTE_MAX(ctx->prm.max_stream_sbufs, 1U);
109	na = rte_align32pow2(n);
110	szofs->txq.ofs = sz;
111	szofs->txq.nb_obj = na;
112
113	sz += rte_ring_get_memsize(na);
114	sz = RTE_ALIGN_CEIL(sz, RTE_CACHE_LINE_SIZE);
115
116	szofs->drb.nb_obj = drb_nb_elem(ctx);
117	szofs->drb.nb_max = calc_stream_drb_num(ctx, szofs->drb.nb_obj);
118	szofs->drb.nb_rng = rte_align32pow2(szofs->drb.nb_max);
119	szofs->drb.rng_sz = rte_ring_get_memsize(szofs->drb.nb_rng);
120	szofs->drb.blk_sz = tle_drb_calc_size(szofs->drb.nb_obj);
121	szofs->drb.ofs = sz;
122
123	sz += szofs->drb.rng_sz + szofs->drb.blk_sz * szofs->drb.nb_max;
124	sz = RTE_ALIGN_CEIL(sz, RTE_CACHE_LINE_SIZE);
125
126	szofs->size = sz;
127}
128
129static void
130init_stream(struct tle_ctx *ctx, struct tle_tcp_stream *s,
131	const struct stream_szofs *szofs)
132{
133	uint32_t f, i;
134	struct tle_drb *drb;
135
136	f = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0) ? 0 :
137		(RING_F_SP_ENQ |  RING_F_SC_DEQ);
138
139	/* init RX part. */
140
141	s->rx.ofo = (void *)((uintptr_t)s + szofs->ofo.ofs);
142	tcp_ofo_init(s->rx.ofo, szofs->ofo.nb_obj, szofs->ofo.nb_max);
143
144	s->rx.q = (void *)((uintptr_t)s + szofs->rxq.ofs);
145	rte_ring_init(s->rx.q, __func__, szofs->rxq.nb_obj, f | RING_F_SP_ENQ);
146
147	/* init TX part. */
148
149	s->tx.q = (void *)((uintptr_t)s + szofs->txq.ofs);
150	rte_ring_init(s->tx.q, __func__, szofs->txq.nb_obj, f | RING_F_SC_DEQ);
151
152	s->tx.drb.r = (void *)((uintptr_t)s + szofs->drb.ofs);
153	rte_ring_init(s->tx.drb.r, __func__, szofs->drb.nb_rng, f);
154
155	for (i = 0; i != szofs->drb.nb_max; i++) {
156		drb = (struct tle_drb *)((uintptr_t)s->tx.drb.r +
157			szofs->drb.rng_sz + szofs->drb.blk_sz * i);
158		drb->udata = s;
159		drb->size = szofs->drb.nb_obj;
160		rte_ring_enqueue(s->tx.drb.r, drb);
161	}
162
163	s->tx.drb.nb_elem = szofs->drb.nb_obj;
164	s->tx.drb.nb_max = szofs->drb.nb_max;
165
166	/* mark stream as avaialble to use. */
167
168	s->s.ctx = ctx;
169	unuse_stream(s);
170}
171
172static void
173tcp_free_drbs(struct tle_stream *s, struct tle_drb *drb[], uint32_t nb_drb)
174{
175	struct tle_tcp_stream *us;
176
177	us = (struct tle_tcp_stream *)s;
178	_rte_ring_enqueue_burst(us->tx.drb.r, (void **)drb, nb_drb);
179}
180
181static struct tle_timer_wheel *
182alloc_timers(const struct tle_ctx *ctx)
183{
184	struct tle_timer_wheel *twl;
185	struct tle_timer_wheel_args twprm;
186
187	twprm.tick_size = TCP_RTO_GRANULARITY;
188	twprm.max_timer = ctx->prm.max_streams;
189	twprm.socket_id = ctx->prm.socket_id;
190
191	twl = tle_timer_create(&twprm, tcp_get_tms(ctx->cycles_ms_shift));
192	if (twl == NULL)
193		TCP_LOG(ERR, "alloc_timers(ctx=%p) failed with error=%d\n",
194			ctx, rte_errno);
195	return twl;
196}
197
198static void *
199mts_alloc(size_t sz, void *udata)
200{
201	struct tle_ctx *ctx;
202
203	ctx = udata;
204	return rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
205		ctx->prm.socket_id);
206}
207
208static void
209mts_free(void *p, void *udata)
210{
211	RTE_SET_USED(udata);
212	rte_free(p);
213}
214
215static void
216mts_init(void *pa[], uint32_t num, void *udata)
217{
218	uint32_t i;
219	struct tle_ctx *ctx;
220	struct tcp_streams *ts;
221
222	ctx = udata;
223	ts = CTX_TCP_STREAMS(ctx);
224
225	for (i = 0; i != num; i++)
226		init_stream(ctx, pa[i], &ts->szofs);
227}
228
229static struct tle_memtank *
230alloc_mts(struct tle_ctx *ctx, uint32_t stream_size)
231{
232	struct tle_memtank *mts;
233	struct tle_memtank_prm prm;
234
235	static const struct tle_memtank_prm cprm = {
236		.obj_align = RTE_CACHE_LINE_SIZE,
237		.flags = TLE_MTANK_OBJ_DBG,
238		.alloc = mts_alloc,
239		.free = mts_free,
240		.init = mts_init,
241	};
242
243	prm = cprm;
244	prm.udata = ctx;
245
246	prm.obj_size = stream_size;
247
248	prm.min_free = (ctx->prm.free_streams.min != 0) ?
249		ctx->prm.free_streams.min : ctx->prm.max_streams;
250	prm.max_free = (ctx->prm.free_streams.max > prm.min_free) ?
251		ctx->prm.free_streams.max : prm.min_free;
252
253	prm.nb_obj_chunk = MAX_STREAM_BURST;
254	prm.max_obj = ctx->prm.max_streams;
255
256	mts = tle_memtank_create(&prm);
257	if (mts == NULL)
258		TCP_LOG(ERR, "%s(ctx=%p) failed with error=%d\n",
259			__func__, ctx, rte_errno);
260	else
261		tle_memtank_grow(mts);
262
263	return mts;
264}
265
266static int
267tcp_init_streams(struct tle_ctx *ctx)
268{
269	uint32_t f;
270	int32_t rc;
271	struct tcp_streams *ts;
272	struct stream_szofs szofs;
273
274	f = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0) ? 0 :
275		(RING_F_SP_ENQ |  RING_F_SC_DEQ);
276
277	calc_stream_szofs(ctx, &szofs);
278	TCP_LOG(NOTICE, "ctx:%p, caluclated stream size: %u\n",
279		ctx, szofs.size);
280
281	ts = rte_zmalloc_socket(NULL, sizeof(*ts), RTE_CACHE_LINE_SIZE,
282		ctx->prm.socket_id);
283	if (ts == NULL)
284		return -ENOMEM;
285
286	ts->szofs = szofs;
287
288	STAILQ_INIT(&ts->dr.fe);
289	STAILQ_INIT(&ts->dr.be);
290
291	ctx->streams.buf = ts;
292	STAILQ_INIT(&ctx->streams.free);
293
294	rc = stbl_init(&ts->st, ctx->prm.max_streams, ctx->prm.socket_id);
295
296	if (rc == 0) {
297		ts->tsq = alloc_ring(ctx->prm.max_streams, f | RING_F_SC_DEQ,
298			ctx->prm.socket_id);
299		ts->tmr = alloc_timers(ctx);
300		ts->mts = alloc_mts(ctx, szofs.size);
301
302		if (ts->tsq == NULL || ts->tmr == NULL || ts->mts == NULL)
303			rc = -ENOMEM;
304
305		tle_memtank_dump(stdout, ts->mts, TLE_MTANK_DUMP_STAT);
306	}
307
308	if (rc != 0) {
309		TCP_LOG(ERR, "initalisation of tcp streams failed");
310		tcp_fini_streams(ctx);
311	}
312
313	return rc;
314}
315
316static void __attribute__((constructor))
317tcp_stream_setup(void)
318{
319	static const struct stream_ops tcp_ops = {
320		.init_streams = tcp_init_streams,
321		.fini_streams = tcp_fini_streams,
322		.free_drbs = tcp_free_drbs,
323	};
324
325	tle_stream_ops[TLE_PROTO_TCP] = tcp_ops;
326}
327
328/*
329 * Helper routine, check that input event and callback are mutually exclusive.
330 */
331static int
332check_cbev(const struct tle_event *ev, const struct tle_stream_cb *cb)
333{
334	if (ev != NULL && cb->func != NULL)
335		return -EINVAL;
336	return 0;
337}
338
339static int
340check_stream_prm(const struct tle_ctx *ctx,
341	const struct tle_tcp_stream_param *prm)
342{
343	if ((prm->addr.local.ss_family != AF_INET &&
344			prm->addr.local.ss_family != AF_INET6) ||
345			prm->addr.local.ss_family != prm->addr.remote.ss_family)
346		return -EINVAL;
347
348	/* callback and event notifications mechanisms are mutually exclusive */
349	if (check_cbev(prm->cfg.recv_ev, &prm->cfg.recv_cb) != 0 ||
350			check_cbev(prm->cfg.recv_ev, &prm->cfg.recv_cb) != 0 ||
351			check_cbev(prm->cfg.err_ev, &prm->cfg.err_cb) != 0)
352		return -EINVAL;
353
354	/* check does context support desired address family. */
355	if ((prm->addr.local.ss_family == AF_INET &&
356			ctx->prm.lookup4 == NULL) ||
357			(prm->addr.local.ss_family == AF_INET6 &&
358			ctx->prm.lookup6 == NULL))
359		return -EINVAL;
360
361	return 0;
362}
363
364struct tle_stream *
365tle_tcp_stream_open(struct tle_ctx *ctx,
366	const struct tle_tcp_stream_param *prm)
367{
368	struct tcp_streams *ts;
369	struct tle_tcp_stream *s;
370	int32_t rc;
371
372	if (ctx == NULL || prm == NULL || check_stream_prm(ctx, prm) != 0) {
373		rte_errno = EINVAL;
374		return NULL;
375	}
376
377	ts = CTX_TCP_STREAMS(ctx);
378
379	s = tcp_stream_get(ctx, TLE_MTANK_ALLOC_CHUNK | TLE_MTANK_ALLOC_GROW);
380	if (s == NULL) {
381		rte_errno = ENFILE;
382		return NULL;
383	}
384
385	/* setup L4 ports and L3 addresses fields. */
386	rc = stream_fill_ctx(ctx, &s->s,
387		(const struct sockaddr *)&prm->addr.local,
388		(const struct sockaddr *)&prm->addr.remote);
389
390	if (rc != 0) {
391		tle_memtank_free(ts->mts, (void **)&s, 1,
392			TLE_MTANK_FREE_SHRINK);
393		rte_errno = rc;
394		return NULL;
395	}
396
397	/* setup stream notification menchanism */
398	s->rx.ev = prm->cfg.recv_ev;
399	s->rx.cb = prm->cfg.recv_cb;
400	s->tx.ev = prm->cfg.send_ev;
401	s->tx.cb = prm->cfg.send_cb;
402	s->err.ev = prm->cfg.err_ev;
403	s->err.cb = prm->cfg.err_cb;
404
405	/* store other params */
406	s->flags = ctx->prm.flags;
407	s->tcb.snd.nb_retm = (prm->cfg.nb_retries != 0) ? prm->cfg.nb_retries :
408		TLE_TCP_DEFAULT_RETRIES;
409	s->tcb.snd.cwnd = (ctx->prm.icw == 0) ? TCP_INITIAL_CWND_MAX :
410				ctx->prm.icw;
411	s->tcb.snd.rto_tw = (ctx->prm.timewait == TLE_TCP_TIMEWAIT_DEFAULT) ?
412				TCP_RTO_2MSL : ctx->prm.timewait;
413
414	tcp_stream_up(s);
415	return &s->s;
416}
417
418/*
419 * Helper functions, used by close API.
420 */
421static inline int
422stream_close(struct tle_ctx *ctx, struct tle_tcp_stream *s)
423{
424	uint16_t uop;
425	uint32_t state;
426	static const struct tle_stream_cb zcb;
427
428	/* check was close() already invoked */
429	uop = s->tcb.uop;
430	if ((uop & TCP_OP_CLOSE) != 0)
431		return -EDEADLK;
432
433	/* record that close() was already invoked */
434	if (rte_atomic16_cmpset(&s->tcb.uop, uop, uop | TCP_OP_CLOSE) == 0)
435		return -EDEADLK;
436
437	/* mark stream as unavaialbe for RX/TX. */
438	tcp_stream_down(s);
439
440	/* reset events/callbacks */
441	s->rx.ev = NULL;
442	s->tx.ev = NULL;
443	s->err.ev = NULL;
444
445	s->rx.cb = zcb;
446	s->tx.cb = zcb;
447	s->err.cb = zcb;
448
449	state = s->tcb.state;
450
451	/* CLOSED, LISTEN, SYN_SENT - we can close the stream straighway */
452	if (state <= TCP_ST_SYN_SENT) {
453		tcp_stream_reset(ctx, s);
454		return 0;
455	}
456
457	/* generate FIN and proceed with normal connection termination */
458	if (state == TCP_ST_ESTABLISHED || state == TCP_ST_CLOSE_WAIT) {
459
460		/* change state */
461		s->tcb.state = (state == TCP_ST_ESTABLISHED) ?
462			TCP_ST_FIN_WAIT_1 : TCP_ST_LAST_ACK;
463
464		/* mark stream as writable/readable again */
465		tcp_stream_up(s);
466
467		/* queue stream into to-send queue */
468		txs_enqueue(ctx, s);
469		return 0;
470	}
471
472	/*
473	 * accroding to the state, close() was already invoked,
474	 * should never that point.
475	 */
476	RTE_ASSERT(0);
477	return -EINVAL;
478}
479
480uint32_t
481tle_tcp_stream_close_bulk(struct tle_stream *ts[], uint32_t num)
482{
483	int32_t rc;
484	uint32_t i;
485	struct tle_ctx *ctx;
486	struct tle_tcp_stream *s;
487
488	rc = 0;
489
490	for (i = 0; i != num && rc == 0; i++) {
491
492		s = TCP_STREAM(ts[i]);
493		if (ts[i] == NULL || s->s.type >= TLE_VNUM)
494			rc = EINVAL;
495
496		else {
497			ctx = s->s.ctx;
498			rc = stream_close(ctx, s);
499			tle_memtank_shrink(CTX_TCP_MTS(ctx));
500		}
501	}
502
503	if (rc != 0)
504		rte_errno = -rc;
505	return i;
506}
507
508int
509tle_tcp_stream_close(struct tle_stream *ts)
510{
511	int32_t rc;
512	struct tle_ctx *ctx;
513	struct tle_tcp_stream *s;
514
515	s = TCP_STREAM(ts);
516	if (ts == NULL || s->s.type >= TLE_VNUM)
517		return -EINVAL;
518
519	ctx = s->s.ctx;
520	rc = stream_close(ctx, s);
521	tle_memtank_shrink(CTX_TCP_MTS(ctx));
522	return rc;
523}
524
525int
526tle_tcp_stream_get_addr(const struct tle_stream *ts,
527	struct tle_tcp_stream_addr *addr)
528{
529	struct sockaddr_in *lin4, *rin4;
530	struct sockaddr_in6 *lin6, *rin6;
531	struct tle_tcp_stream *s;
532
533	s = TCP_STREAM(ts);
534	if (addr == NULL || ts == NULL || s->s.type >= TLE_VNUM)
535		return -EINVAL;
536
537	if (s->s.type == TLE_V4) {
538
539		lin4 = (struct sockaddr_in *)&addr->local;
540		rin4 = (struct sockaddr_in *)&addr->remote;
541
542		addr->local.ss_family = AF_INET;
543		addr->remote.ss_family = AF_INET;
544
545		lin4->sin_port = s->s.port.dst;
546		rin4->sin_port = s->s.port.src;
547		lin4->sin_addr.s_addr = s->s.ipv4.addr.dst;
548		rin4->sin_addr.s_addr = s->s.ipv4.addr.src;
549
550	} else if (s->s.type == TLE_V6) {
551
552		lin6 = (struct sockaddr_in6 *)&addr->local;
553		rin6 = (struct sockaddr_in6 *)&addr->remote;
554
555		addr->local.ss_family = AF_INET6;
556		addr->remote.ss_family = AF_INET6;
557
558		lin6->sin6_port = s->s.port.dst;
559		rin6->sin6_port = s->s.port.src;
560		memcpy(&lin6->sin6_addr, &s->s.ipv6.addr.dst,
561			sizeof(lin6->sin6_addr));
562		memcpy(&rin6->sin6_addr, &s->s.ipv6.addr.src,
563			sizeof(rin6->sin6_addr));
564	}
565
566	return 0;
567}
568
569int
570tle_tcp_stream_listen(struct tle_stream *ts)
571{
572	struct tle_tcp_stream *s;
573	int32_t rc;
574
575	s = TCP_STREAM(ts);
576	if (ts == NULL || s->s.type >= TLE_VNUM)
577		return -EINVAL;
578
579	/* app may listen for multiple times to change backlog,
580	 * we will just return success for such cases.
581	 */
582	if (s->tcb.state == TCP_ST_LISTEN)
583		return 0;
584
585	/* mark stream as not closable. */
586	if (tcp_stream_try_acquire(s) > 0) {
587		rc = rte_atomic16_cmpset(&s->tcb.state, TCP_ST_CLOSED,
588				TCP_ST_LISTEN);
589		if (rc != 0) {
590			s->tcb.uop |= TCP_OP_LISTEN;
591			s->tcb.rcv.wnd = calc_rx_wnd(s, TCP_WSCALE_DEFAULT);
592			rc = 0;
593		} else
594			rc = -EDEADLK;
595	} else
596		rc = -EINVAL;
597
598	tcp_stream_release(s);
599	return rc;
600}
601
602/*
603 * helper function, updates stream config
604 */
605static inline int
606stream_update_cfg(struct tle_stream *ts,struct tle_tcp_stream_cfg *prm)
607{
608	struct tle_tcp_stream *s;
609
610	s = TCP_STREAM(ts);
611
612	if (tcp_stream_try_acquire(s) < 0 || (s->tcb.uop & TCP_OP_CLOSE) != 0) {
613		tcp_stream_release(s);
614		return -EINVAL;
615	}
616
617	/* setup stream notification menchanism */
618	s->rx.ev = prm->recv_ev;
619	s->tx.ev = prm->send_ev;
620	s->err.ev = prm->err_ev;
621
622	s->rx.cb.data = prm->recv_cb.data;
623	s->tx.cb.data = prm->send_cb.data;
624	s->err.cb.data = prm->err_cb.data;
625
626	rte_smp_wmb();
627
628	s->rx.cb.func = prm->recv_cb.func;
629	s->tx.cb.func = prm->send_cb.func;
630	s->err.cb.func = prm->err_cb.func;
631
632	/* store other params */
633	s->tcb.snd.nb_retm = (prm->nb_retries != 0) ? prm->nb_retries :
634		TLE_TCP_DEFAULT_RETRIES;
635
636	/* invoke async notifications, if any */
637	if (rte_ring_count(s->rx.q) != 0) {
638		if (s->rx.ev != NULL)
639			tle_event_raise(s->rx.ev);
640		else if (s->rx.cb.func != NULL)
641			s->rx.cb.func(s->rx.cb.data, &s->s);
642	}
643	if (rte_ring_free_count(s->tx.q) != 0) {
644		if (s->tx.ev != NULL)
645			tle_event_raise(s->tx.ev);
646		else if (s->tx.cb.func != NULL)
647			s->tx.cb.func(s->tx.cb.data, &s->s);
648	}
649	if (s->tcb.state == TCP_ST_CLOSE_WAIT ||
650			s->tcb.state ==  TCP_ST_CLOSED) {
651		if (s->err.ev != NULL)
652			tle_event_raise(s->err.ev);
653		else if (s->err.cb.func != NULL)
654			s->err.cb.func(s->err.cb.data, &s->s);
655	}
656
657	tcp_stream_release(s);
658	return 0;
659}
660
661uint32_t
662tle_tcp_stream_update_cfg(struct tle_stream *ts[],
663	struct tle_tcp_stream_cfg prm[], uint32_t num)
664{
665	int32_t rc;
666	uint32_t i;
667
668	for (i = 0; i != num; i++) {
669		rc = stream_update_cfg(ts[i], &prm[i]);
670		if (rc != 0) {
671			rte_errno = -rc;
672			break;
673		}
674	}
675
676	return i;
677}
678
679int
680tle_tcp_stream_get_mss(const struct tle_stream * ts)
681{
682	struct tle_tcp_stream *s;
683
684	if (ts == NULL)
685		return -EINVAL;
686
687	s = TCP_STREAM(ts);
688	return s->tcb.snd.mss;
689}
690