tcp_rxtx.c revision 21e7392f
1/*
2 * Copyright (c) 2016  Intel Corporation.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#include <rte_errno.h>
17#include <rte_ethdev.h>
18#include <rte_ip.h>
19#include <rte_ip_frag.h>
20#include <rte_tcp.h>
21
22#include "tcp_stream.h"
23#include "tcp_timer.h"
24#include "stream_table.h"
25#include "syncookie.h"
26#include "misc.h"
27#include "tcp_ctl.h"
28#include "tcp_rxq.h"
29#include "tcp_txq.h"
30
31#define	TCP_MAX_PKT_SEG	0x20
32
33/*
34 * checks if input TCP ports and IP addresses match given stream.
35 * returns zero on success.
36 */
37static inline int
38rx_check_stream(const struct tle_tcp_stream *s, const union pkt_info *pi)
39{
40	int32_t rc;
41
42	if (pi->tf.type == TLE_V4)
43		rc = (pi->port.raw & s->s.pmsk.raw) != s->s.port.raw ||
44			(pi->addr4.raw & s->s.ipv4.mask.raw) !=
45			s->s.ipv4.addr.raw;
46	else
47		rc = (pi->port.raw & s->s.pmsk.raw) != s->s.port.raw ||
48			ymm_mask_cmp(&pi->addr6->raw, &s->s.ipv6.addr.raw,
49			&s->s.ipv6.mask.raw) != 0;
50
51	return rc;
52}
53
54static inline struct tle_tcp_stream *
55rx_obtain_listen_stream(const struct tle_dev *dev, const union pkt_info *pi,
56	uint32_t type)
57{
58	struct tle_tcp_stream *s;
59
60	s = (struct tle_tcp_stream *)dev->dp[type]->streams[pi->port.dst];
61	if (s == NULL || rwl_acquire(&s->rx.use) < 0)
62		return NULL;
63
64	/* check that we have a proper stream. */
65	if (s->tcb.state != TCP_ST_LISTEN) {
66		rwl_release(&s->rx.use);
67		s = NULL;
68	}
69
70	return s;
71}
72
73static inline struct tle_tcp_stream *
74rx_obtain_stream(const struct tle_dev *dev, struct stbl *st,
75	const union pkt_info *pi, uint32_t type)
76{
77	struct tle_tcp_stream *s;
78
79	s = stbl_find_data(st, pi);
80	if (s == NULL) {
81		if (pi->tf.flags == TCP_FLAG_ACK)
82			return rx_obtain_listen_stream(dev, pi, type);
83		return NULL;
84	}
85
86	if (stbl_data_pkt(s) || rwl_acquire(&s->rx.use) < 0)
87		return NULL;
88	/* check that we have a proper stream. */
89	else if (s->tcb.state == TCP_ST_CLOSED) {
90		rwl_release(&s->rx.use);
91		s = NULL;
92	}
93
94	return s;
95}
96
97/*
98 * Consider 2 pkt_info *equal* if their:
99 * - types (IPv4/IPv6)
100 * - TCP flags
101 * - checksum flags
102 * - TCP src and dst ports
103 * - IP src and dst addresses
104 * are equal.
105 */
106static inline int
107pkt_info_bulk_eq(const union pkt_info pi[], uint32_t num)
108{
109	uint32_t i;
110
111	i = 1;
112
113	if (pi[0].tf.type == TLE_V4) {
114		while (i != num && xmm_cmp(&pi[0].raw, &pi[i].raw) == 0)
115			i++;
116
117	} else if (pi[0].tf.type == TLE_V6) {
118		while (i != num &&
119				pi[0].raw.u64[0] == pi[i].raw.u64[0] &&
120				ymm_cmp(&pi[0].addr6->raw,
121				&pi[i].addr6->raw) == 0)
122			i++;
123	}
124
125	return i;
126}
127
128static inline int
129pkt_info_bulk_syneq(const union pkt_info pi[], uint32_t num)
130{
131	uint32_t i;
132
133	i = 1;
134
135	if (pi[0].tf.type == TLE_V4) {
136		while (i != num && pi[0].tf.raw == pi[i].tf.raw &&
137				pi[0].port.dst == pi[i].port.dst &&
138				pi[0].addr4.dst == pi[i].addr4.dst)
139			i++;
140
141	} else if (pi[0].tf.type == TLE_V6) {
142		while (i != num && pi[0].tf.raw == pi[i].tf.raw &&
143				pi[0].port.dst == pi[i].port.dst &&
144				xmm_cmp(&pi[0].addr6->dst,
145				&pi[i].addr6->dst) == 0)
146			i++;
147	}
148
149	return i;
150}
151
152static inline void
153stream_drb_free(struct tle_tcp_stream *s, struct tle_drb *drbs[],
154	uint32_t nb_drb)
155{
156	rte_ring_enqueue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
157}
158
159static inline uint32_t
160stream_drb_alloc(struct tle_tcp_stream *s, struct tle_drb *drbs[],
161	uint32_t nb_drb)
162{
163	return rte_ring_dequeue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
164}
165
166static inline void
167fill_tcph(struct tcp_hdr *l4h, const struct tcb *tcb, union l4_ports port,
168	uint32_t seq, uint8_t hlen, uint8_t flags)
169{
170	uint16_t wnd;
171
172	l4h->src_port = port.dst;
173	l4h->dst_port = port.src;
174
175	wnd = (flags & TCP_FLAG_SYN) ?
176		RTE_MIN(tcb->rcv.wnd, (uint32_t)UINT16_MAX) :
177		tcb->rcv.wnd >> tcb->rcv.wscale;
178
179	/* ??? use sse shuffle to hton all remaining 16 bytes at once. ??? */
180	l4h->sent_seq = rte_cpu_to_be_32(seq);
181	l4h->recv_ack = rte_cpu_to_be_32(tcb->rcv.nxt);
182	l4h->data_off = hlen / TCP_DATA_ALIGN << TCP_DATA_OFFSET;
183	l4h->tcp_flags = flags;
184	l4h->rx_win = rte_cpu_to_be_16(wnd);
185	l4h->cksum = 0;
186	l4h->tcp_urp = 0;
187
188	if (flags & TCP_FLAG_SYN)
189		fill_syn_opts(l4h + 1, &tcb->so);
190	else if ((flags & TCP_FLAG_RST) == 0 && tcb->so.ts.raw != 0)
191		fill_tms_opts(l4h + 1, tcb->snd.ts, tcb->rcv.ts);
192}
193
194static inline int
195tcp_fill_mbuf(struct rte_mbuf *m, const struct tle_tcp_stream *s,
196	const struct tle_dest *dst, uint64_t ol_flags,
197	union l4_ports port, uint32_t seq, uint32_t flags,
198	uint32_t pid, uint32_t swcsm)
199{
200	uint32_t l4, len, plen;
201	struct tcp_hdr *l4h;
202	char *l2h;
203
204	len = dst->l2_len + dst->l3_len;
205	plen = m->pkt_len;
206
207	if (flags & TCP_FLAG_SYN)
208		l4 = sizeof(*l4h) + TCP_TX_OPT_LEN_MAX;
209	else if ((flags & TCP_FLAG_RST) == 0 && s->tcb.rcv.ts != 0)
210		l4 = sizeof(*l4h) + TCP_TX_OPT_LEN_TMS;
211	else
212		l4 = sizeof(*l4h);
213
214	/* adjust mbuf to put L2/L3/L4 headers into it. */
215	l2h = rte_pktmbuf_prepend(m, len + l4);
216	if (l2h == NULL)
217		return -EINVAL;
218
219	/* copy L2/L3 header */
220	rte_memcpy(l2h, dst->hdr, len);
221
222	/* setup TCP header & options */
223	l4h = (struct tcp_hdr *)(l2h + len);
224	fill_tcph(l4h, &s->tcb, port, seq, l4, flags);
225
226	/* setup mbuf TX offload related fields. */
227	m->tx_offload = _mbuf_tx_offload(dst->l2_len, dst->l3_len, l4, 0, 0, 0);
228	m->ol_flags |= ol_flags;
229
230	/* update proto specific fields. */
231
232	if (s->s.type == TLE_V4) {
233		struct ipv4_hdr *l3h;
234		l3h = (struct ipv4_hdr *)(l2h + dst->l2_len);
235		l3h->packet_id = rte_cpu_to_be_16(pid);
236		l3h->total_length = rte_cpu_to_be_16(plen + dst->l3_len + l4);
237
238		if ((ol_flags & PKT_TX_TCP_CKSUM) != 0)
239			l4h->cksum = _ipv4x_phdr_cksum(l3h, m->l3_len,
240				ol_flags);
241		else if (swcsm != 0)
242			l4h->cksum = _ipv4_udptcp_mbuf_cksum(m, len, l3h);
243
244		if ((ol_flags & PKT_TX_IP_CKSUM) == 0 && swcsm != 0)
245			l3h->hdr_checksum = _ipv4x_cksum(l3h, m->l3_len);
246	} else {
247		struct ipv6_hdr *l3h;
248		l3h = (struct ipv6_hdr *)(l2h + dst->l2_len);
249		l3h->payload_len = rte_cpu_to_be_16(plen + l4);
250		if ((ol_flags & PKT_TX_TCP_CKSUM) != 0)
251			l4h->cksum = rte_ipv6_phdr_cksum(l3h, ol_flags);
252		else if (swcsm != 0)
253			l4h->cksum = _ipv6_udptcp_mbuf_cksum(m, len, l3h);
254	}
255
256	return 0;
257}
258
259/*
260 * That function supposed to be used only for data packets.
261 * Assumes that L2/L3/L4 headers and mbuf fields already setup properly.
262 *  - updates tcp SEG.SEQ, SEG.ACK, TS.VAL, TS.ECR.
263 *  - if no HW cksum offloads are enabled, calculates TCP checksum.
264 */
265static inline void
266tcp_update_mbuf(struct rte_mbuf *m, uint32_t type, const struct tcb *tcb,
267	uint32_t seq, uint32_t pid)
268{
269	struct tcp_hdr *l4h;
270	uint32_t len;
271
272	len = m->l2_len + m->l3_len;
273	l4h = rte_pktmbuf_mtod_offset(m, struct tcp_hdr *, len);
274
275	l4h->sent_seq = rte_cpu_to_be_32(seq);
276	l4h->recv_ack = rte_cpu_to_be_32(tcb->rcv.nxt);
277
278	if (tcb->so.ts.raw != 0)
279		fill_tms_opts(l4h + 1, tcb->snd.ts, tcb->rcv.ts);
280
281	if (type == TLE_V4) {
282		struct ipv4_hdr *l3h;
283		l3h = rte_pktmbuf_mtod_offset(m, struct ipv4_hdr *, m->l2_len);
284		l3h->hdr_checksum = 0;
285		l3h->packet_id = rte_cpu_to_be_16(pid);
286		if ((m->ol_flags & PKT_TX_IP_CKSUM) == 0)
287			l3h->hdr_checksum = _ipv4x_cksum(l3h, m->l3_len);
288	}
289
290	/* have to calculate TCP checksum in SW */
291	if ((m->ol_flags & PKT_TX_TCP_CKSUM) == 0) {
292
293		l4h->cksum = 0;
294
295		if (type == TLE_V4) {
296			struct ipv4_hdr *l3h;
297			l3h = rte_pktmbuf_mtod_offset(m, struct ipv4_hdr *,
298				m->l2_len);
299			l4h->cksum = _ipv4_udptcp_mbuf_cksum(m, len, l3h);
300
301		} else {
302			struct ipv6_hdr *l3h;
303			l3h = rte_pktmbuf_mtod_offset(m, struct ipv6_hdr *,
304				m->l2_len);
305			l4h->cksum = _ipv6_udptcp_mbuf_cksum(m, len, l3h);
306		}
307	}
308}
309
310/* Send data packets that need to be ACK-ed by peer */
311static inline uint32_t
312tx_data_pkts(struct tle_tcp_stream *s, struct rte_mbuf *const m[], uint32_t num)
313{
314	uint32_t bsz, i, nb, nbm;
315	struct tle_dev *dev;
316	struct tle_drb *drb[num];
317
318	/* calculate how many drbs are needed.*/
319	bsz = s->tx.drb.nb_elem;
320	nbm = (num + bsz - 1) / bsz;
321
322	/* allocate drbs, adjust number of packets. */
323	nb = stream_drb_alloc(s, drb, nbm);
324
325	/* drb ring is empty. */
326	if (nb == 0)
327		return 0;
328
329	else if (nb != nbm)
330		num = nb * bsz;
331
332	dev = s->tx.dst.dev;
333
334	/* enqueue pkts for TX. */
335	nbm = nb;
336	i = tle_dring_mp_enqueue(&dev->tx.dr, (const void * const*)m,
337		num, drb, &nb);
338
339	/* free unused drbs. */
340	if (nb != 0)
341		stream_drb_free(s, drb + nbm - nb, nb);
342
343	return i;
344}
345
346static inline uint32_t
347tx_data_bulk(struct tle_tcp_stream *s, union seqlen *sl, struct rte_mbuf *mi[],
348	uint32_t num)
349{
350	uint32_t fail, i, k, n, mss, pid, plen, sz, tn, type;
351	struct tle_dev *dev;
352	struct rte_mbuf *mb;
353	struct rte_mbuf *mo[MAX_PKT_BURST + TCP_MAX_PKT_SEG];
354
355	mss = s->tcb.snd.mss;
356	type = s->s.type;
357
358	dev = s->tx.dst.dev;
359	pid = rte_atomic32_add_return(&dev->tx.packet_id[type], num) - num;
360
361	k = 0;
362	tn = 0;
363	fail = 0;
364	for (i = 0; i != num && sl->len != 0 && fail == 0; i++) {
365
366		mb = mi[i];
367		sz = RTE_MIN(sl->len, mss);
368		plen = PKT_L4_PLEN(mb);
369
370		/*fast path, no need to use indirect mbufs. */
371		if (plen <= sz) {
372
373			/* update pkt TCP header */
374			tcp_update_mbuf(mb, type, &s->tcb, sl->seq, pid + i);
375
376			/* keep mbuf till ACK is received. */
377			rte_pktmbuf_refcnt_update(mb, 1);
378			sl->len -= plen;
379			sl->seq += plen;
380			mo[k++] = mb;
381		/* remaining snd.wnd is less them MSS, send nothing */
382		} else if (sz < mss)
383			break;
384		/* packet indirection needed */
385		else
386			RTE_VERIFY(0);
387
388		if (k >= MAX_PKT_BURST) {
389			n = tx_data_pkts(s, mo, k);
390			fail = k - n;
391			tn += n;
392			k = 0;
393		}
394	}
395
396	if (k != 0) {
397		n = tx_data_pkts(s, mo, k);
398		fail = k - n;
399		tn += n;
400	}
401
402	if (fail != 0) {
403		sz = tcp_mbuf_seq_free(mo + n, fail);
404		sl->seq -= sz;
405		sl->len += sz;
406	}
407
408	return tn;
409}
410
411/*
412 * gets data from stream send buffer, updates it and
413 * queues it into TX device queue.
414 * Note that this function and is not MT safe.
415 */
416static inline uint32_t
417tx_nxt_data(struct tle_tcp_stream *s, uint32_t tms)
418{
419	uint32_t n, num, tn, wnd;
420	struct rte_mbuf **mi;
421	union seqlen sl;
422
423	tn = 0;
424	wnd = s->tcb.snd.wnd - (uint32_t)(s->tcb.snd.nxt - s->tcb.snd.una);
425	sl.seq = s->tcb.snd.nxt;
426	sl.len = RTE_MIN(wnd, s->tcb.snd.cwnd);
427
428	if (sl.len == 0)
429		return tn;
430
431	/* update send timestamp */
432	s->tcb.snd.ts = tms;
433
434	do {
435		/* get group of packets */
436		mi = tcp_txq_get_nxt_objs(s, &num);
437
438		/* stream send buffer is empty */
439		if (num == 0)
440			break;
441
442		/* queue data packets for TX */
443		n = tx_data_bulk(s, &sl, mi, num);
444		tn += n;
445
446		/* update consumer head */
447		tcp_txq_set_nxt_head(s, n);
448	} while (n == num);
449
450	s->tcb.snd.nxt += sl.seq - (uint32_t)s->tcb.snd.nxt;
451	return tn;
452}
453
454static inline void
455free_una_data(struct tle_tcp_stream *s, uint32_t len)
456{
457	uint32_t i, n, num, plen;
458	struct rte_mbuf **mi;
459
460	n = 0;
461	plen = 0;
462
463	do {
464		/* get group of packets */
465		mi = tcp_txq_get_una_objs(s, &num);
466
467		if (num == 0)
468			break;
469
470		/* free acked data */
471		for (i = 0; i != num && n != len; i++, n = plen) {
472			plen += PKT_L4_PLEN(mi[i]);
473			if (plen > len) {
474				/* keep SND.UNA at the start of the packet */
475				len -= RTE_MIN(len, plen - len);
476				break;
477			}
478			rte_pktmbuf_free(mi[i]);
479		}
480
481		/* update consumer tail */
482		tcp_txq_set_una_tail(s, i);
483	} while (plen < len);
484
485	s->tcb.snd.una += len;
486
487	/*
488	 * that could happen in case of retransmit,
489	 * adjust SND.NXT with SND.UNA.
490	 */
491	if (s->tcb.snd.una > s->tcb.snd.nxt) {
492		tcp_txq_rst_nxt_head(s);
493		s->tcb.snd.nxt = s->tcb.snd.una;
494	}
495}
496
497static inline uint16_t
498calc_smss(uint16_t mss, const struct tle_dest *dst)
499{
500	uint16_t n;
501
502	n = dst->mtu - dst->l2_len - dst->l3_len - TCP_TX_HDR_DACK;
503	mss = RTE_MIN(n, mss);
504	return mss;
505}
506
507/*
508 * RFC 5681 3.1
509 * If SMSS > 2190 bytes:
510 *     IW = 2 * SMSS bytes and MUST NOT be more than 2 segments
511 *  If (SMSS > 1095 bytes) and (SMSS <= 2190 bytes):
512 *     IW = 3 * SMSS bytes and MUST NOT be more than 3 segments
513 *  if SMSS <= 1095 bytes:
514 *     IW = 4 * SMSS bytes and MUST NOT be more than 4 segments
515 */
516static inline uint32_t
517initial_cwnd(uint16_t smss)
518{
519	if (smss > 2190)
520		return 2 * smss;
521	else if (smss > 1095)
522		return 3 * smss;
523	return 4 * smss;
524}
525
526/*
527 * queue standalone packet to he particular output device
528 * It assumes that:
529 * - L2/L3/L4 headers should be already set.
530 * - packet fits into one segment.
531 */
532static inline int
533send_pkt(struct tle_tcp_stream *s, struct tle_dev *dev, struct rte_mbuf *m)
534{
535	uint32_t n, nb;
536	struct tle_drb *drb;
537
538	if (stream_drb_alloc(s, &drb, 1) == 0)
539		return -ENOBUFS;
540
541	/* enqueue pkt for TX. */
542	nb = 1;
543	n = tle_dring_mp_enqueue(&dev->tx.dr, (const void * const*)&m, 1,
544		&drb, &nb);
545
546	/* free unused drbs. */
547	if (nb != 0)
548		stream_drb_free(s, &drb, 1);
549
550	return (n == 1) ? 0 : -ENOBUFS;
551}
552
553static inline int
554send_ctrl_pkt(struct tle_tcp_stream *s, struct rte_mbuf *m, uint32_t seq,
555	uint32_t flags)
556{
557	const struct tle_dest *dst;
558	uint32_t pid, type;
559	int32_t rc;
560
561	dst = &s->tx.dst;
562	type = s->s.type;
563	pid = rte_atomic32_add_return(&dst->dev->tx.packet_id[type], 1) - 1;
564
565	rc = tcp_fill_mbuf(m, s, dst, 0, s->s.port, seq, flags, pid, 1);
566	if (rc == 0)
567		rc = send_pkt(s, dst->dev, m);
568
569	return rc;
570}
571
572static inline int
573send_rst(struct tle_tcp_stream *s, uint32_t seq)
574{
575	struct rte_mbuf *m;
576	int32_t rc;
577
578	m = rte_pktmbuf_alloc(s->tx.dst.head_mp);
579	if (m == NULL)
580		return -ENOMEM;
581
582	rc = send_ctrl_pkt(s, m, seq, TCP_FLAG_RST);
583	if (rc != 0)
584		rte_pktmbuf_free(m);
585
586	return rc;
587}
588
589static inline int
590send_ack(struct tle_tcp_stream *s, uint32_t tms, uint32_t flags)
591{
592	struct rte_mbuf *m;
593	uint32_t seq;
594	int32_t rc;
595
596	m = rte_pktmbuf_alloc(s->tx.dst.head_mp);
597	if (m == NULL)
598		return -ENOMEM;
599
600	seq = s->tcb.snd.nxt - ((flags & (TCP_FLAG_FIN | TCP_FLAG_SYN)) != 0);
601	s->tcb.snd.ts = tms;
602
603	rc = send_ctrl_pkt(s, m, seq, flags);
604	if (rc != 0) {
605		rte_pktmbuf_free(m);
606		return rc;
607	}
608
609	s->tcb.snd.ack = s->tcb.rcv.nxt;
610	return 0;
611}
612
613
614static int
615sync_ack(struct tle_tcp_stream *s, const union pkt_info *pi,
616	const union seg_info *si, uint32_t ts, struct rte_mbuf *m)
617{
618	uint16_t len;
619	int32_t rc;
620	uint32_t pid, seq, type;
621	struct tle_dev *dev;
622	const void *da;
623	struct tle_dest dst;
624	const struct tcp_hdr *th;
625
626	type = s->s.type;
627
628	/* get destination information. */
629	if (type == TLE_V4)
630		da = &pi->addr4.src;
631	else
632		da = &pi->addr6->src;
633
634	rc = stream_get_dest(&s->s, da, &dst);
635	if (rc < 0)
636		return rc;
637
638	th = rte_pktmbuf_mtod_offset(m, const struct tcp_hdr *,
639		m->l2_len + m->l3_len);
640	get_syn_opts(&s->tcb.so, (uintptr_t)(th + 1), m->l4_len - sizeof(*th));
641
642	s->tcb.rcv.nxt = si->seq + 1;
643	seq = sync_gen_seq(pi, s->tcb.rcv.nxt, ts, s->tcb.so.mss);
644	s->tcb.so.ts.ecr = s->tcb.so.ts.val;
645	s->tcb.so.ts.val = sync_gen_ts(ts, s->tcb.so.wscale);
646	s->tcb.so.wscale = (s->tcb.so.wscale == TCP_WSCALE_NONE) ?
647		TCP_WSCALE_NONE : TCP_WSCALE_DEFAULT;
648	s->tcb.so.mss = calc_smss(dst.mtu, &dst);
649
650	/* reset mbuf's data contents. */
651	len = m->l2_len + m->l3_len + m->l4_len;
652	m->tx_offload = 0;
653	if (rte_pktmbuf_adj(m, len) == NULL)
654		return -EINVAL;
655
656	dev = dst.dev;
657	pid = rte_atomic32_add_return(&dev->tx.packet_id[type], 1) - 1;
658
659	rc = tcp_fill_mbuf(m, s, &dst, 0, pi->port, seq,
660		TCP_FLAG_SYN | TCP_FLAG_ACK, pid, 1);
661	if (rc == 0)
662		rc = send_pkt(s, dev, m);
663
664	return rc;
665}
666
667/*
668 * RFC 793:
669 * There are four cases for the acceptability test for an incoming segment:
670 * Segment Receive  Test
671 * Length  Window
672 * ------- -------  -------------------------------------------
673 *    0       0     SEG.SEQ = RCV.NXT
674 *    0      >0     RCV.NXT =< SEG.SEQ < RCV.NXT+RCV.WND
675 *   >0       0     not acceptable
676 *   >0      >0     RCV.NXT =< SEG.SEQ < RCV.NXT+RCV.WND
677 *                  or RCV.NXT =< SEG.SEQ+SEG.LEN-1 < RCV.NXT+RCV.WND
678 */
679static inline int
680check_seqn(const struct tcb *tcb, uint32_t seqn, uint32_t len)
681{
682	uint32_t n;
683
684	n = seqn + len;
685	if (seqn - tcb->rcv.nxt >= tcb->rcv.wnd &&
686			n - tcb->rcv.nxt > tcb->rcv.wnd)
687		return -ERANGE;
688
689	return 0;
690}
691
692static inline union tsopt
693rx_tms_opt(const struct tcb *tcb, const struct rte_mbuf *mb)
694{
695	union tsopt ts;
696	uintptr_t opt;
697	const struct tcp_hdr *th;
698
699	if (tcb->so.ts.val != 0) {
700		opt = rte_pktmbuf_mtod_offset(mb, uintptr_t,
701			mb->l2_len + mb->l3_len + sizeof(*th));
702		ts = get_tms_opts(opt, mb->l4_len - sizeof(*th));
703	} else
704		ts.raw = 0;
705
706	return ts;
707}
708
709/*
710 * PAWS and sequence check.
711 * RFC 1323 4.2.1
712 */
713static inline int
714rx_check_seq(struct tcb *tcb, uint32_t seq, uint32_t len, const union tsopt ts)
715{
716	int32_t rc;
717
718	/* RFC 1323 4.2.1 R2 */
719	rc = check_seqn(tcb, seq, len);
720	if (rc < 0)
721		return rc;
722
723	if (ts.raw != 0) {
724
725		/* RFC 1323 4.2.1 R1 */
726		if (tcp_seq_lt(ts.val, tcb->rcv.ts))
727			return -ERANGE;
728
729		/* RFC 1323 4.2.1 R3 */
730		if (tcp_seq_leq(seq, tcb->snd.ack) &&
731				tcp_seq_lt(tcb->snd.ack, seq + len))
732			tcb->rcv.ts = ts.val;
733	}
734
735	return rc;
736}
737
738static inline int
739rx_check_ack(const struct tcb *tcb, uint32_t ack)
740{
741	uint32_t max;
742
743	max = (uint32_t)RTE_MAX(tcb->snd.nxt, tcb->snd.rcvr);
744
745	if (tcp_seq_leq(tcb->snd.una, ack) && tcp_seq_leq(ack, max))
746		return 0;
747
748	return -ERANGE;
749}
750
751static inline int
752rx_check_seqack(struct tcb *tcb, uint32_t seq, uint32_t ack, uint32_t len,
753	const union tsopt ts)
754{
755	int32_t rc;
756
757	rc = rx_check_seq(tcb, seq, len, ts);
758	rc |= rx_check_ack(tcb, ack);
759	return rc;
760}
761
762static inline int
763restore_syn_opt(struct syn_opts *so, const union pkt_info *pi,
764	const union seg_info *si, uint32_t ts, const struct rte_mbuf *mb)
765{
766	int32_t rc;
767	uint32_t len;
768	const struct tcp_hdr *th;
769
770	/* check that ACK, etc fields are what we expected. */
771	rc = sync_check_ack(pi, si->seq, si->ack - 1, ts);
772	if (rc < 0)
773		return rc;
774
775	so->mss = rc;
776
777	th = rte_pktmbuf_mtod_offset(mb, const struct tcp_hdr *,
778		mb->l2_len + mb->l3_len);
779	len = mb->l4_len - sizeof(*th);
780	sync_get_opts(so, (uintptr_t)(th + 1), len);
781	return 0;
782}
783
784static inline void
785stream_term(struct tle_tcp_stream *s)
786{
787	struct sdr *dr;
788
789	s->tcb.state = TCP_ST_CLOSED;
790	rte_smp_wmb();
791
792	timer_stop(s);
793
794	/* close() was already invoked, schedule final cleanup */
795	if ((s->tcb.uop & TCP_OP_CLOSE) != 0) {
796
797		dr = CTX_TCP_SDR(s->s.ctx);
798		STAILQ_INSERT_TAIL(&dr->be, &s->s, link);
799
800	/* notify user that stream need to be closed */
801	} else if (s->err.ev != NULL)
802		tle_event_raise(s->err.ev);
803	else if (s->err.cb.func != NULL)
804		s->err.cb.func(s->err.cb.data, &s->s);
805}
806
807static inline int
808stream_fill_dest(struct tle_tcp_stream *s)
809{
810	int32_t rc;
811	const void *da;
812
813	if (s->s.type == TLE_V4)
814		da = &s->s.ipv4.addr.src;
815	else
816		da = &s->s.ipv6.addr.src;
817
818	rc = stream_get_dest(&s->s, da, &s->tx.dst);
819	return (rc < 0) ? rc : 0;
820}
821
822/*
823 * helper function, prepares a new accept stream.
824 */
825static inline int
826accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st,
827	struct tle_tcp_stream *cs, const struct syn_opts *so,
828	uint32_t tms, const union pkt_info *pi, const union seg_info *si)
829{
830	int32_t rc;
831	uint32_t rtt;
832
833	/* some TX still pending for that stream. */
834	if (TCP_STREAM_TX_PENDING(cs))
835		return -EAGAIN;
836
837	/* setup L4 ports and L3 addresses fields. */
838	cs->s.port.raw = pi->port.raw;
839	cs->s.pmsk.raw = UINT32_MAX;
840
841	if (pi->tf.type == TLE_V4) {
842		cs->s.ipv4.addr = pi->addr4;
843		cs->s.ipv4.mask.src = INADDR_NONE;
844		cs->s.ipv4.mask.dst = INADDR_NONE;
845	} else if (pi->tf.type == TLE_V6) {
846		cs->s.ipv6.addr = *pi->addr6;
847		rte_memcpy(&cs->s.ipv6.mask.src, &tle_ipv6_none,
848			sizeof(cs->s.ipv6.mask.src));
849		rte_memcpy(&cs->s.ipv6.mask.dst, &tle_ipv6_none,
850			sizeof(cs->s.ipv6.mask.dst));
851	}
852
853	/* setup TCB */
854	sync_fill_tcb(&cs->tcb, si, so);
855	cs->tcb.rcv.wnd = calc_rx_wnd(cs, cs->tcb.rcv.wscale);
856
857	/*
858	 * estimate the rto
859	 * for now rtt is calculated based on the tcp TMS option,
860	 * later add real-time one
861	 */
862	if (cs->tcb.so.ts.ecr) {
863		rtt = tms - cs->tcb.so.ts.ecr;
864		rto_estimate(&cs->tcb, rtt);
865	} else
866		cs->tcb.snd.rto = TCP_RTO_DEFAULT;
867
868	/* copy streams type. */
869	cs->s.type = ps->s.type;
870
871	/* retrive and cache destination information. */
872	rc = stream_fill_dest(cs);
873	if (rc != 0)
874		return rc;
875
876	/* update snd.mss with SMSS value */
877	cs->tcb.snd.mss = calc_smss(cs->tcb.snd.mss, &cs->tx.dst);
878
879	/* setup congestion variables */
880	cs->tcb.snd.cwnd = initial_cwnd(cs->tcb.snd.mss);
881	cs->tcb.snd.ssthresh = cs->tcb.snd.wnd;
882
883	cs->tcb.state = TCP_ST_ESTABLISHED;
884
885	/* add stream to the table */
886	cs->ste = stbl_add_stream(st, pi, cs);
887	if (cs->ste == NULL)
888		return -ENOBUFS;
889
890	cs->tcb.uop |= TCP_OP_ACCEPT;
891	tcp_stream_up(cs);
892	return 0;
893}
894
895
896/*
897 * ACK for new connection request arrived.
898 * Check that the packet meets all conditions and try to open a new stream.
899 * returns:
900 * < 0  - invalid packet
901 * == 0 - packet is valid and new stream was opened for it.
902 * > 0  - packet is valid, but failed to open new stream.
903 */
904static inline int
905rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st,
906	const union pkt_info *pi, const union seg_info *si,
907	uint32_t tms, struct rte_mbuf *mb, struct tle_tcp_stream **csp)
908{
909	int32_t rc;
910	struct tle_ctx *ctx;
911	struct tle_stream *ts;
912	struct tle_tcp_stream *cs;
913	struct syn_opts so;
914
915	*csp = NULL;
916
917	if (pi->tf.flags != TCP_FLAG_ACK || rx_check_stream(s, pi) != 0)
918		return -EINVAL;
919
920	rc = restore_syn_opt(&so, pi, si, tms, mb);
921	if (rc < 0)
922		return rc;
923
924	ctx = s->s.ctx;
925
926	/* allocate new stream */
927	ts = get_stream(ctx);
928	cs = TCP_STREAM(ts);
929	if (ts == NULL)
930		return ENFILE;
931
932	/* prepare stream to handle new connection */
933	if (accept_prep_stream(s, st, cs, &so, tms, pi, si) == 0) {
934
935		/* put new stream in the accept queue */
936		if (rte_ring_enqueue_burst(s->rx.q,
937				(void * const *)&ts, 1) == 1) {
938			*csp = cs;
939			return 0;
940		}
941
942		/* cleanup on failure */
943		tcp_stream_down(cs);
944		stbl_del_pkt(st, cs->ste, pi);
945		cs->ste = NULL;
946	}
947
948	tcp_stream_reset(ctx, cs);
949	return ENOBUFS;
950}
951
952static inline int
953data_pkt_adjust(const struct tcb *tcb, struct rte_mbuf *mb, uint32_t hlen,
954	uint32_t *seqn, uint32_t *plen)
955{
956	uint32_t len, n, seq;
957
958	seq = *seqn;
959	len = *plen;
960
961	rte_pktmbuf_adj(mb, hlen);
962	if (len == 0)
963		return -ENODATA;
964	/* cut off the start of the packet */
965	else if (tcp_seq_lt(seq, tcb->rcv.nxt)) {
966		n = tcb->rcv.nxt - seq;
967		if (n >= len)
968			return -ENODATA;
969
970		rte_pktmbuf_adj(mb, n);
971		*seqn = seq + n;
972		*plen = len - n;
973	}
974
975	return 0;
976}
977
978static inline uint32_t
979rx_ackdata(struct tle_tcp_stream *s, uint32_t ack)
980{
981	uint32_t k, n;
982
983	n = ack - (uint32_t)s->tcb.snd.una;
984
985	/* some more data was acked. */
986	if (n != 0) {
987
988		/* advance SND.UNA and free related packets. */
989		k = rte_ring_free_count(s->tx.q);
990		free_una_data(s, n);
991
992		/* mark the stream as available for writing */
993		if (rte_ring_free_count(s->tx.q) != 0) {
994			if (s->tx.ev != NULL)
995				tle_event_raise(s->tx.ev);
996			else if (k == 0 && s->tx.cb.func != NULL)
997				s->tx.cb.func(s->tx.cb.data, &s->s);
998		}
999	}
1000
1001	return n;
1002}
1003
1004static void
1005rx_fin_state(struct tle_tcp_stream *s, struct resp_info *rsp)
1006{
1007	uint32_t state;
1008	int32_t ackfin;
1009
1010	s->tcb.rcv.nxt += 1;
1011
1012	ackfin = (s->tcb.snd.una == s->tcb.snd.fss);
1013	state = s->tcb.state;
1014
1015	if (state == TCP_ST_ESTABLISHED) {
1016		s->tcb.state = TCP_ST_CLOSE_WAIT;
1017		/* raise err.ev & err.cb */
1018		if (s->err.ev != NULL)
1019			tle_event_raise(s->err.ev);
1020		else if (s->err.cb.func != NULL)
1021			s->err.cb.func(s->err.cb.data, &s->s);
1022	} else if (state == TCP_ST_FIN_WAIT_1 || state == TCP_ST_CLOSING) {
1023		rsp->flags |= TCP_FLAG_ACK;
1024		if (ackfin != 0) {
1025			s->tcb.state = TCP_ST_TIME_WAIT;
1026			s->tcb.snd.rto = TCP_RTO_2MSL;
1027			timer_reset(s);
1028		} else
1029			s->tcb.state = TCP_ST_CLOSING;
1030	} else if (state == TCP_ST_FIN_WAIT_2) {
1031		rsp->flags |= TCP_FLAG_ACK;
1032		s->tcb.state = TCP_ST_TIME_WAIT;
1033		s->tcb.snd.rto = TCP_RTO_2MSL;
1034		timer_reset(s);
1035	} else if (state == TCP_ST_LAST_ACK && ackfin != 0) {
1036		stream_term(s);
1037	}
1038}
1039
1040/*
1041 * FIN process for ESTABLISHED state
1042 * returns:
1043 * 0 < - error occurred
1044 * 0 - FIN was processed OK, and mbuf can be free/reused.
1045 * 0 > - FIN was processed OK and mbuf can't be free/reused.
1046 */
1047static inline int
1048rx_fin(struct tle_tcp_stream *s, uint32_t state,
1049	const union seg_info *si, struct rte_mbuf *mb,
1050	struct resp_info *rsp)
1051{
1052	uint32_t hlen, plen, seq;
1053	int32_t ret;
1054	union tsopt ts;
1055
1056	hlen = PKT_L234_HLEN(mb);
1057	plen = mb->pkt_len - hlen;
1058	seq = si->seq;
1059
1060	ts = rx_tms_opt(&s->tcb, mb);
1061	ret = rx_check_seqack(&s->tcb, seq, si->ack, plen, ts);
1062	if (ret != 0)
1063		return ret;
1064
1065	if (state < TCP_ST_ESTABLISHED)
1066		return -EINVAL;
1067
1068	if (plen != 0) {
1069
1070		ret = data_pkt_adjust(&s->tcb, mb, hlen, &seq, &plen);
1071		if (ret != 0)
1072			return ret;
1073		if (rx_data_enqueue(s, seq, plen, &mb, 1) != 1)
1074			return -ENOBUFS;
1075	}
1076
1077	/* process ack here */
1078	rx_ackdata(s, si->ack);
1079
1080	/* some fragments still missing */
1081	if (seq + plen != s->tcb.rcv.nxt) {
1082		s->tcb.rcv.frs.seq = seq + plen;
1083		s->tcb.rcv.frs.on = 1;
1084	} else
1085		rx_fin_state(s, rsp);
1086
1087	return plen;
1088}
1089
1090static inline int
1091rx_rst(struct tle_tcp_stream *s, uint32_t state, uint32_t flags,
1092	const union seg_info *si)
1093{
1094	int32_t rc;
1095
1096	/*
1097	 * RFC 793: In all states except SYN-SENT, all reset (RST) segments
1098	 * are validated by checking their SEQ-fields.
1099	 * A reset is valid if its sequence number is in the window.
1100	 * In the SYN-SENT state (a RST received in response to an initial SYN),
1101	 * the RST is acceptable if the ACK field acknowledges the SYN.
1102	 */
1103	if (state == TCP_ST_SYN_SENT) {
1104		rc = ((flags & TCP_FLAG_ACK) == 0 ||
1105				si->ack != s->tcb.snd.nxt) ?
1106			-ERANGE : 0;
1107	}
1108
1109	else
1110		rc = check_seqn(&s->tcb, si->seq, 0);
1111
1112	if (rc == 0)
1113		stream_term(s);
1114
1115	return rc;
1116}
1117
1118/*
1119 *  check do we have FIN  that was received out-of-order.
1120 *  if yes, try to process it now.
1121 */
1122static inline void
1123rx_ofo_fin(struct tle_tcp_stream *s, struct resp_info *rsp)
1124{
1125	if (s->tcb.rcv.frs.on != 0 && s->tcb.rcv.nxt == s->tcb.rcv.frs.seq)
1126		rx_fin_state(s, rsp);
1127}
1128
1129static inline void
1130dack_info_init(struct dack_info *tack, const struct tcb *tcb)
1131{
1132	memset(tack, 0, sizeof(*tack));
1133	tack->ack = tcb->snd.una;
1134	tack->segs.dup = tcb->rcv.dupack;
1135	tack->wu.raw = tcb->snd.wu.raw;
1136	tack->wnd = tcb->snd.wnd >> tcb->snd.wscale;
1137}
1138
1139static inline void
1140ack_window_update(struct tcb *tcb, const struct dack_info *tack)
1141{
1142	tcb->snd.wu.raw = tack->wu.raw;
1143	tcb->snd.wnd = tack->wnd << tcb->snd.wscale;
1144}
1145
1146static inline void
1147ack_cwnd_update(struct tcb *tcb, uint32_t acked, const struct dack_info *tack)
1148{
1149	uint32_t n;
1150
1151	n = tack->segs.ack * tcb->snd.mss;
1152
1153	/* slow start phase, RFC 5681 3.1 (2)  */
1154	if (tcb->snd.cwnd < tcb->snd.ssthresh)
1155		tcb->snd.cwnd += RTE_MIN(acked, n);
1156	/* congestion avoidance phase, RFC 5681 3.1 (3) */
1157	else
1158		tcb->snd.cwnd += RTE_MAX(1U, n * tcb->snd.mss / tcb->snd.cwnd);
1159}
1160
1161static inline void
1162rto_ssthresh_update(struct tcb *tcb)
1163{
1164	uint32_t k, n;
1165
1166	/* RFC 5681 3.1 (4)  */
1167	n = (tcb->snd.nxt - tcb->snd.una) / 2;
1168	k = 2 * tcb->snd.mss;
1169	tcb->snd.ssthresh = RTE_MAX(n, k);
1170}
1171
1172static inline void
1173rto_cwnd_update(struct tcb *tcb)
1174{
1175
1176	if (tcb->snd.nb_retx == 0)
1177		rto_ssthresh_update(tcb);
1178
1179	/*
1180	 * RFC 5681 3.1: upon a timeout cwnd MUST be set to
1181	 * no more than 1 full-sized segment.
1182	 */
1183	tcb->snd.cwnd = tcb->snd.mss;
1184}
1185
1186static inline void
1187ack_info_update(struct dack_info *tack, const union seg_info *si,
1188	int32_t badseq, uint32_t dlen, const union tsopt ts)
1189{
1190	if (badseq != 0) {
1191		tack->segs.badseq++;
1192		return;
1193	}
1194
1195	/* segnt with incoming data */
1196	tack->segs.data += (dlen != 0);
1197
1198	/* segment with newly acked data */
1199	if (tcp_seq_lt(tack->ack, si->ack)) {
1200		tack->segs.dup = 0;
1201		tack->segs.ack++;
1202		tack->ack = si->ack;
1203		tack->ts = ts;
1204
1205	/*
1206	 * RFC 5681: An acknowledgment is considered a "duplicate" when:
1207	 * (a) the receiver of the ACK has outstanding data
1208	 * (b) the incoming acknowledgment carries no data
1209	 * (c) the SYN and FIN bits are both off
1210	 * (d) the acknowledgment number is equal to the TCP.UNA
1211	 * (e) the advertised window in the incoming acknowledgment equals the
1212	 * advertised window in the last incoming acknowledgment.
1213	 *
1214	 * Here will have only to check only for (b),(d),(e).
1215	 * (a) will be checked later for the whole bulk of packets,
1216	 * (c) should never happen here.
1217	 */
1218	} else if (dlen == 0 && si->wnd == tack->wnd && ++tack->segs.dup == 3) {
1219		tack->dup3.seg = tack->segs.ack + 1;
1220		tack->dup3.ack = tack->ack;
1221	}
1222
1223	/*
1224	 * RFC 793:
1225	 * If SND.UNA < SEG.ACK =< SND.NXT, the send window should be
1226	 * updated.  If (SND.WL1 < SEG.SEQ or (SND.WL1 = SEG.SEQ and
1227	 * SND.WL2 =< SEG.ACK)), set SND.WND <- SEG.WND, set
1228	 * SND.WL1 <- SEG.SEQ, and set SND.WL2 <- SEG.ACK.
1229	 */
1230	if (tcp_seq_lt(tack->wu.wl1, si->seq) ||
1231			(si->seq == tack->wu.wl1 &&
1232			tcp_seq_leq(tack->wu.wl2, si->ack))) {
1233
1234		tack->wu.wl1 = si->seq;
1235		tack->wu.wl2 = si->ack;
1236		tack->wnd = si->wnd;
1237	}
1238}
1239
1240static inline uint32_t
1241rx_data_ack(struct tle_tcp_stream *s, struct dack_info *tack,
1242	const union seg_info si[], struct rte_mbuf *mb[], struct rte_mbuf *rp[],
1243	int32_t rc[], uint32_t num)
1244{
1245	uint32_t i, j, k, n, t;
1246	uint32_t hlen, plen, seq, tlen;
1247	int32_t ret;
1248	union tsopt ts;
1249
1250	k = 0;
1251	for (i = 0; i != num; i = j) {
1252
1253		hlen = PKT_L234_HLEN(mb[i]);
1254		plen = mb[i]->pkt_len - hlen;
1255		seq = si[i].seq;
1256
1257		ts = rx_tms_opt(&s->tcb, mb[i]);
1258		ret = rx_check_seqack(&s->tcb, seq, si[i].ack, plen, ts);
1259
1260		/* account segment received */
1261		ack_info_update(tack, &si[i], ret != 0, plen, ts);
1262
1263		if (ret == 0) {
1264			/* skip duplicate data, if any */
1265			ret = data_pkt_adjust(&s->tcb, mb[i], hlen,
1266				&seq, &plen);
1267		}
1268
1269		j = i + 1;
1270		if (ret != 0) {
1271			rp[k] = mb[i];
1272			rc[k] = -ret;
1273			k++;
1274			continue;
1275		}
1276
1277		/* group sequential packets together. */
1278		for (tlen = plen; j != num; tlen += plen, j++) {
1279
1280			hlen = PKT_L234_HLEN(mb[j]);
1281			plen = mb[j]->pkt_len - hlen;
1282
1283			/* not consecutive packet */
1284			if (plen == 0 || seq + tlen != si[j].seq)
1285				break;
1286
1287			/* check SEQ/ACK */
1288			ts = rx_tms_opt(&s->tcb, mb[j]);
1289			ret = rx_check_seqack(&s->tcb, si[j].seq, si[j].ack,
1290				plen, ts);
1291
1292			/* account for segment received */
1293			ack_info_update(tack, &si[j], ret != 0, plen, ts);
1294
1295			if (ret != 0) {
1296				rp[k] = mb[j];
1297				rc[k] = -ret;
1298				k++;
1299				break;
1300			}
1301			rte_pktmbuf_adj(mb[j], hlen);
1302		}
1303
1304		n = j - i;
1305		j += (ret != 0);
1306
1307		/* account for OFO data */
1308		if (seq != s->tcb.rcv.nxt)
1309			tack->segs.ofo += n;
1310
1311		/* enqueue packets */
1312		t = rx_data_enqueue(s, seq, tlen, mb + i, n);
1313
1314		/* if we are out of space in stream recv buffer. */
1315		for (; t != n; t++) {
1316			rp[k] = mb[i + t];
1317			rc[k] = -ENOBUFS;
1318			k++;
1319		}
1320	}
1321
1322	return num - k;
1323}
1324
1325static inline void
1326start_fast_retransmit(struct tle_tcp_stream *s)
1327{
1328	struct tcb *tcb;
1329
1330	tcb = &s->tcb;
1331
1332	/* RFC 6582 3.2.2 */
1333	tcb->snd.rcvr = tcb->snd.nxt;
1334	tcb->snd.fastack = 1;
1335
1336	/* RFC 5681 3.2.2 */
1337	rto_ssthresh_update(tcb);
1338
1339	/* RFC 5681 3.2.3 */
1340	tcp_txq_rst_nxt_head(s);
1341	tcb->snd.nxt = tcb->snd.una;
1342	tcb->snd.cwnd = tcb->snd.ssthresh + 3 * tcb->snd.mss;
1343}
1344
1345static inline void
1346stop_fast_retransmit(struct tle_tcp_stream *s)
1347{
1348	struct tcb *tcb;
1349	uint32_t n;
1350
1351	tcb = &s->tcb;
1352	n = tcb->snd.nxt - tcb->snd.una;
1353	tcb->snd.cwnd = RTE_MIN(tcb->snd.ssthresh,
1354		RTE_MAX(n, tcb->snd.mss) + tcb->snd.mss);
1355	tcb->snd.fastack = 0;
1356}
1357
1358static inline int
1359in_fast_retransmit(struct tle_tcp_stream *s, uint32_t ack_len, uint32_t ack_num,
1360	uint32_t dup_num)
1361{
1362	uint32_t n;
1363	struct tcb *tcb;
1364
1365	tcb = &s->tcb;
1366
1367	/* RFC 5682 3.2.3 partial ACK */
1368	if (ack_len != 0) {
1369
1370		n = ack_num * tcb->snd.mss;
1371		if (ack_len >= n)
1372			tcb->snd.cwnd -= ack_len - n;
1373		else
1374			tcb->snd.cwnd -= ack_len % tcb->snd.mss;
1375
1376		/*
1377		 * For the first partial ACK that arrives
1378		 * during fast recovery, also reset the
1379		 * retransmit timer.
1380		 */
1381		if (tcb->snd.fastack == 1)
1382			timer_reset(s);
1383
1384		tcb->snd.fastack += ack_num;
1385		return 1;
1386
1387	/* RFC 5681 3.2.4 */
1388	} else if (dup_num > 3) {
1389		s->tcb.snd.cwnd += (dup_num - 3) * tcb->snd.mss;
1390		return 1;
1391	}
1392
1393	return 0;
1394}
1395
1396static inline int
1397process_ack(struct tle_tcp_stream *s, uint32_t acked,
1398	const struct dack_info *tack)
1399{
1400	int32_t send;
1401
1402	send = 0;
1403
1404	/* normal mode */
1405	if (s->tcb.snd.fastack == 0) {
1406
1407		send = 1;
1408
1409		/* RFC 6582 3.2.2 switch to fast retransmit mode */
1410		if (tack->dup3.seg != 0 && s->tcb.snd.una != s->tcb.snd.nxt &&
1411				s->tcb.snd.una >= s->tcb.snd.rcvr) {
1412
1413			start_fast_retransmit(s);
1414			in_fast_retransmit(s,
1415				tack->ack - tack->dup3.ack,
1416				tack->segs.ack - tack->dup3.seg - 1,
1417				tack->segs.dup);
1418
1419		/* remain in normal mode */
1420		} else if (acked != 0) {
1421			ack_cwnd_update(&s->tcb, acked, tack);
1422			timer_stop(s);
1423		}
1424
1425	/* fast retransmit mode */
1426	} else {
1427
1428		/* remain in fast retransmit mode */
1429		if (s->tcb.snd.una < s->tcb.snd.rcvr) {
1430
1431			send = in_fast_retransmit(s, acked, tack->segs.ack,
1432				tack->segs.dup);
1433		} else {
1434			/* RFC 5682 3.2.3 full ACK */
1435			stop_fast_retransmit(s);
1436			timer_stop(s);
1437
1438			/* if we have another series of dup ACKs */
1439			if (tack->dup3.seg != 0 &&
1440					s->tcb.snd.una != s->tcb.snd.nxt &&
1441					tcp_seq_leq((uint32_t)s->tcb.snd.rcvr,
1442					tack->dup3.ack)) {
1443
1444				/* restart fast retransmit again. */
1445				start_fast_retransmit(s);
1446				send = in_fast_retransmit(s,
1447					tack->ack - tack->dup3.ack,
1448					tack->segs.ack - tack->dup3.seg - 1,
1449					tack->segs.dup);
1450			}
1451		}
1452	}
1453
1454	return send;
1455}
1456
1457/*
1458 * our FIN was acked, stop rto timer, change stream state,
1459 * and possibly close the stream.
1460 */
1461static inline void
1462rx_ackfin(struct tle_tcp_stream *s)
1463{
1464	uint32_t state;
1465
1466	s->tcb.snd.una = s->tcb.snd.fss;
1467	empty_mbuf_ring(s->tx.q);
1468
1469	state = s->tcb.state;
1470	if (state == TCP_ST_LAST_ACK)
1471		stream_term(s);
1472	else if (state == TCP_ST_FIN_WAIT_1) {
1473		timer_stop(s);
1474		s->tcb.state = TCP_ST_FIN_WAIT_2;
1475	} else if (state == TCP_ST_CLOSING) {
1476		s->tcb.state = TCP_ST_TIME_WAIT;
1477		s->tcb.snd.rto = TCP_RTO_2MSL;
1478		timer_reset(s);
1479	}
1480}
1481
1482static inline void
1483rx_process_ack(struct tle_tcp_stream *s, uint32_t ts,
1484	const struct dack_info *tack)
1485{
1486	int32_t send;
1487	uint32_t n;
1488
1489	s->tcb.rcv.dupack = tack->segs.dup;
1490
1491	n = rx_ackdata(s, tack->ack);
1492	send = process_ack(s, n, tack);
1493
1494	/* try to send more data. */
1495	if ((n != 0 || send != 0) && tcp_txq_nxt_cnt(s) != 0)
1496		txs_enqueue(s->s.ctx, s);
1497
1498	/* restart RTO timer. */
1499	if (s->tcb.snd.nxt != s->tcb.snd.una)
1500		timer_start(s);
1501
1502	/* update rto, if fresh packet is here then calculate rtt */
1503	if (tack->ts.ecr != 0)
1504		rto_estimate(&s->tcb, ts - tack->ts.ecr);
1505}
1506
1507/*
1508 * process <SYN,ACK>
1509 * returns negative value on failure, or zero on success.
1510 */
1511static inline int
1512rx_synack(struct tle_tcp_stream *s, uint32_t ts, uint32_t state,
1513	const union seg_info *si, struct rte_mbuf *mb,
1514	struct resp_info *rsp)
1515{
1516	struct syn_opts so;
1517	struct tcp_hdr *th;
1518
1519	if (state != TCP_ST_SYN_SENT)
1520		return -EINVAL;
1521
1522	/* invalid SEG.SEQ */
1523	if (si->ack != (uint32_t)s->tcb.snd.nxt) {
1524		rsp->flags = TCP_FLAG_RST;
1525		return 0;
1526	}
1527
1528	th = rte_pktmbuf_mtod_offset(mb, struct tcp_hdr *,
1529		mb->l2_len + mb->l3_len);
1530	get_syn_opts(&so, (uintptr_t)(th + 1), mb->l4_len - sizeof(*th));
1531
1532	s->tcb.so = so;
1533
1534	s->tcb.snd.una = s->tcb.snd.nxt;
1535	s->tcb.snd.mss = so.mss;
1536	s->tcb.snd.wnd = si->wnd << so.wscale;
1537	s->tcb.snd.wu.wl1 = si->seq;
1538	s->tcb.snd.wu.wl2 = si->ack;
1539	s->tcb.snd.wscale = so.wscale;
1540
1541	/* setup congestion variables */
1542	s->tcb.snd.cwnd = initial_cwnd(s->tcb.snd.mss);
1543	s->tcb.snd.ssthresh = s->tcb.snd.wnd;
1544
1545	s->tcb.rcv.ts = so.ts.val;
1546	s->tcb.rcv.irs = si->seq;
1547	s->tcb.rcv.nxt = si->seq + 1;
1548
1549	/* calculate initial rto */
1550	rto_estimate(&s->tcb, ts - s->tcb.snd.ts);
1551
1552	rsp->flags |= TCP_FLAG_ACK;
1553
1554	timer_stop(s);
1555	s->tcb.state = TCP_ST_ESTABLISHED;
1556	rte_smp_wmb();
1557
1558	if (s->tx.ev != NULL)
1559		tle_event_raise(s->tx.ev);
1560	else if (s->tx.cb.func != NULL)
1561		s->tx.cb.func(s->tx.cb.data, &s->s);
1562
1563	return 0;
1564}
1565
1566static inline uint32_t
1567rx_stream(struct tle_tcp_stream *s, uint32_t ts,
1568	const union pkt_info *pi, const union seg_info si[],
1569	struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1570	uint32_t num)
1571{
1572	uint32_t i, k, n, state;
1573	int32_t ret;
1574	struct resp_info rsp;
1575	struct dack_info tack;
1576
1577	k = 0;
1578	rsp.flags = 0;
1579
1580	state = s->tcb.state;
1581
1582	/*
1583	 * first check for the states/flags where we don't
1584	 * expect groups of packets.
1585	 */
1586
1587	/* process RST */
1588	if ((pi->tf.flags & TCP_FLAG_RST) != 0) {
1589		for (i = 0;
1590				i != num &&
1591				rx_rst(s, state, pi->tf.flags, &si[i]);
1592				i++)
1593			;
1594		i = 0;
1595
1596	/* RFC 793: if the ACK bit is off drop the segment and return */
1597	} else if ((pi->tf.flags & TCP_FLAG_ACK) == 0) {
1598		i = 0;
1599	/*
1600	 * first check for the states/flags where we don't
1601	 * expect groups of packets.
1602	 */
1603
1604	/* process <SYN,ACK> */
1605	} else if ((pi->tf.flags & TCP_FLAG_SYN) != 0) {
1606		ret = 0;
1607		for (i = 0; i != num; i++) {
1608			ret = rx_synack(s, ts, state, &si[i], mb[i], &rsp);
1609			if (ret == 0)
1610				break;
1611
1612			rc[k] = -ret;
1613			rp[k] = mb[i];
1614			k++;
1615		}
1616
1617	/* process FIN */
1618	} else if ((pi->tf.flags & TCP_FLAG_FIN) != 0) {
1619		ret = 0;
1620		for (i = 0; i != num; i++) {
1621			ret = rx_fin(s, state, &si[i], mb[i], &rsp);
1622			if (ret >= 0)
1623				break;
1624
1625			rc[k] = -ret;
1626			rp[k] = mb[i];
1627			k++;
1628		}
1629		i += (ret > 0);
1630
1631	/* normal data/ack packets */
1632	} else if (state >= TCP_ST_ESTABLISHED && state <= TCP_ST_LAST_ACK) {
1633
1634		/* process incoming data packets. */
1635		dack_info_init(&tack, &s->tcb);
1636		n = rx_data_ack(s, &tack, si, mb, rp, rc, num);
1637
1638		/* follow up actions based on aggregated information */
1639
1640		/* update SND.WND */
1641		ack_window_update(&s->tcb, &tack);
1642
1643		/*
1644		 * fast-path: all data & FIN was already sent out
1645		 * and now is acknowledged.
1646		 */
1647		if (s->tcb.snd.fss == s->tcb.snd.nxt &&
1648				tack.ack == (uint32_t) s->tcb.snd.nxt)
1649			rx_ackfin(s);
1650		else
1651			rx_process_ack(s, ts, &tack);
1652
1653		/*
1654		 * send an immediate ACK if either:
1655		 * - received segment with invalid seq/ack number
1656		 * - received segment with OFO data
1657		 * - received segment with INO data and no TX is scheduled
1658		 *   for that stream.
1659		 */
1660		if (tack.segs.badseq != 0 || tack.segs.ofo != 0 ||
1661				(tack.segs.data != 0 &&
1662				rte_atomic32_read(&s->tx.arm) == 0))
1663			rsp.flags |= TCP_FLAG_ACK;
1664
1665		rx_ofo_fin(s, &rsp);
1666
1667		k += num - n;
1668		i = num;
1669
1670	/* unhandled state, drop all packets. */
1671	} else
1672		i = 0;
1673
1674	/* we have a response packet to send. */
1675	if (rsp.flags == TCP_FLAG_RST) {
1676		send_rst(s, si[i].ack);
1677		stream_term(s);
1678	} else if (rsp.flags != 0) {
1679		send_ack(s, ts, rsp.flags);
1680
1681		/* start the timer for FIN packet */
1682		if ((rsp.flags & TCP_FLAG_FIN) != 0)
1683			timer_reset(s);
1684	}
1685
1686	/* unprocessed packets */
1687	for (; i != num; i++, k++) {
1688		rc[k] = EINVAL;
1689		rp[k] = mb[i];
1690	}
1691
1692	return num - k;
1693}
1694
1695static inline uint32_t
1696rx_new_stream(struct tle_tcp_stream *s, uint32_t ts,
1697	const union pkt_info *pi, const union seg_info si[],
1698	struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1699	uint32_t num)
1700{
1701	uint32_t i;
1702
1703	if (rwl_acquire(&s->rx.use) > 0) {
1704		i = rx_stream(s, ts, pi, si, mb, rp, rc, num);
1705		rwl_release(&s->rx.use);
1706		return i;
1707	}
1708
1709	for (i = 0; i != num; i++) {
1710		rc[i] = ENOENT;
1711		rp[i] = mb[i];
1712	}
1713	return 0;
1714}
1715
1716static inline uint32_t
1717rx_postsyn(struct tle_dev *dev, struct stbl *st, uint32_t type, uint32_t ts,
1718	const union pkt_info pi[], const union seg_info si[],
1719	struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1720	uint32_t num)
1721{
1722	struct tle_tcp_stream *cs, *s;
1723	uint32_t i, k, n, state;
1724	int32_t ret;
1725
1726	s = rx_obtain_stream(dev, st, &pi[0], type);
1727	if (s == NULL) {
1728		for (i = 0; i != num; i++) {
1729			rc[i] = ENOENT;
1730			rp[i] = mb[i];
1731		}
1732		return 0;
1733	}
1734
1735	k = 0;
1736	state = s->tcb.state;
1737
1738	if (state == TCP_ST_LISTEN) {
1739
1740		/* one connection per flow */
1741		cs = NULL;
1742		ret = -EINVAL;
1743		for (i = 0; i != num; i++) {
1744
1745			ret = rx_ack_listen(s, st, pi, &si[i], ts, mb[i], &cs);
1746
1747			/* valid packet encountered */
1748			if (ret >= 0)
1749				break;
1750
1751			/* invalid packet, keep trying to find a proper one */
1752			rc[k] = -ret;
1753			rp[k] = mb[i];
1754			k++;
1755		}
1756
1757		/* packet is valid, but we are out of streams to serve it */
1758		if (ret > 0) {
1759			for (; i != num; i++, k++) {
1760				rc[k] = ret;
1761				rp[k] = mb[i];
1762			}
1763		/* new stream is accepted */
1764		} else if (ret == 0) {
1765
1766			/* inform listen stream about new connections */
1767			if (s->rx.ev != NULL)
1768				tle_event_raise(s->rx.ev);
1769			else if (s->rx.cb.func != NULL &&
1770					rte_ring_count(s->rx.q) == 1)
1771				s->rx.cb.func(s->rx.cb.data, &s->s);
1772
1773			/* if there is no data, drop current packet */
1774			if (PKT_L4_PLEN(mb[i]) == 0) {
1775				rc[k] = ENODATA;
1776				rp[k++] = mb[i++];
1777			}
1778
1779			/*  process remaining packets for that stream */
1780			if (num != i) {
1781				n = rx_new_stream(cs, ts, pi + i, si + i,
1782					mb + i, rp + k, rc + k, num - i);
1783				k += num - n - i;
1784			}
1785		}
1786
1787	} else {
1788		i = rx_stream(s, ts, pi, si, mb, rp, rc, num);
1789		k = num - i;
1790	}
1791
1792	rwl_release(&s->rx.use);
1793	return num - k;
1794}
1795
1796
1797static inline uint32_t
1798rx_syn(struct tle_dev *dev, uint32_t type, uint32_t ts,
1799	const union pkt_info pi[], const union seg_info si[],
1800	struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1801	uint32_t num)
1802{
1803	struct tle_tcp_stream *s;
1804	uint32_t i, k;
1805	int32_t ret;
1806
1807	s = rx_obtain_listen_stream(dev, &pi[0], type);
1808	if (s == NULL) {
1809		for (i = 0; i != num; i++) {
1810			rc[i] = ENOENT;
1811			rp[i] = mb[i];
1812		}
1813		return 0;
1814	}
1815
1816	k = 0;
1817	for (i = 0; i != num; i++) {
1818
1819		/* check that this remote is allowed to connect */
1820		if (rx_check_stream(s, &pi[i]) != 0)
1821			ret = -ENOENT;
1822		else
1823			/* syncokie: reply with <SYN,ACK> */
1824			ret = sync_ack(s, &pi[i], &si[i], ts, mb[i]);
1825
1826		if (ret != 0) {
1827			rc[k] = -ret;
1828			rp[k] = mb[i];
1829			k++;
1830		}
1831	}
1832
1833	rwl_release(&s->rx.use);
1834	return num - k;
1835}
1836
1837uint16_t
1838tle_tcp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[],
1839	struct rte_mbuf *rp[], int32_t rc[], uint16_t num)
1840{
1841	struct stbl *st;
1842	uint32_t i, j, k, n, t, ts;
1843	uint64_t csf;
1844	union pkt_info pi[num];
1845	union seg_info si[num];
1846	union {
1847		uint8_t t[TLE_VNUM];
1848		uint32_t raw;
1849	} stu;
1850
1851	ts = tcp_get_tms();
1852	st = CTX_TCP_STLB(dev->ctx);
1853
1854	stu.raw = 0;
1855
1856	/* extract packet info and check the L3/L4 csums */
1857	for (i = 0; i != num; i++) {
1858
1859		get_pkt_info(pkt[i], &pi[i], &si[i]);
1860
1861		t = pi[i].tf.type;
1862		csf = dev->rx.ol_flags[t] &
1863			(PKT_RX_IP_CKSUM_BAD | PKT_RX_L4_CKSUM_BAD);
1864
1865		/* check csums in SW */
1866		if (pi[i].csf == 0 && csf != 0 && check_pkt_csum(pkt[i], csf,
1867				pi[i].tf.type, IPPROTO_TCP) != 0)
1868			pi[i].csf = csf;
1869
1870		stu.t[t] = 1;
1871	}
1872
1873	if (stu.t[TLE_V4] != 0)
1874		stbl_lock(st, TLE_V4);
1875	if (stu.t[TLE_V6] != 0)
1876		stbl_lock(st, TLE_V6);
1877
1878	k = 0;
1879	for (i = 0; i != num; i += j) {
1880
1881		t = pi[i].tf.type;
1882
1883		/*basic checks for incoming packet */
1884		if (t >= TLE_VNUM || pi[i].csf != 0 || dev->dp[t] == NULL) {
1885			rc[k] = EINVAL;
1886			rp[k] = pkt[i];
1887			j = 1;
1888			k++;
1889		/* process input SYN packets */
1890		} else if (pi[i].tf.flags == TCP_FLAG_SYN) {
1891			j = pkt_info_bulk_syneq(pi + i, num - i);
1892			n = rx_syn(dev, t, ts, pi + i, si + i, pkt + i,
1893				rp + k, rc + k, j);
1894			k += j - n;
1895		} else {
1896			j = pkt_info_bulk_eq(pi + i, num - i);
1897			n = rx_postsyn(dev, st, t, ts, pi + i, si + i, pkt + i,
1898				rp + k, rc + k, j);
1899			k += j - n;
1900		}
1901	}
1902
1903	if (stu.t[TLE_V4] != 0)
1904		stbl_unlock(st, TLE_V4);
1905	if (stu.t[TLE_V6] != 0)
1906		stbl_unlock(st, TLE_V6);
1907
1908	return num - k;
1909}
1910
1911uint16_t
1912tle_tcp_stream_accept(struct tle_stream *ts, struct tle_stream *rs[],
1913	uint32_t num)
1914{
1915	uint32_t n;
1916	struct tle_tcp_stream *s;
1917
1918	s = TCP_STREAM(ts);
1919	n = rte_ring_mc_dequeue_burst(s->rx.q, (void **)rs, num);
1920	if (n == 0)
1921		return 0;
1922
1923	/*
1924	 * if we still have packets to read,
1925	 * then rearm stream RX event.
1926	 */
1927	if (n == num && rte_ring_count(s->rx.q) != 0) {
1928		if (rwl_try_acquire(&s->rx.use) > 0 && s->rx.ev != NULL)
1929			tle_event_raise(s->rx.ev);
1930		rwl_release(&s->rx.use);
1931	}
1932
1933	return n;
1934}
1935
1936uint16_t
1937tle_tcp_tx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], uint16_t num)
1938{
1939	uint32_t i, j, k, n;
1940	struct tle_drb *drb[num];
1941	struct tle_tcp_stream *s;
1942
1943	/* extract packets from device TX queue. */
1944
1945	k = num;
1946	n = tle_dring_sc_dequeue(&dev->tx.dr, (const void **)(uintptr_t)pkt,
1947		num, drb, &k);
1948
1949	if (n == 0)
1950		return 0;
1951
1952	/* free empty drbs and notify related streams. */
1953
1954	for (i = 0; i != k; i = j) {
1955		s = drb[i]->udata;
1956		for (j = i + 1; j != k && s == drb[j]->udata; j++)
1957			;
1958		stream_drb_free(s, drb + i, j - i);
1959	}
1960
1961	return n;
1962}
1963
1964static inline void
1965stream_fill_pkt_info(const struct tle_tcp_stream *s, union pkt_info *pi)
1966{
1967	if (s->s.type == TLE_V4)
1968		pi->addr4 = s->s.ipv4.addr;
1969	else
1970		pi->addr6 = &s->s.ipv6.addr;
1971
1972	pi->port = s->s.port;
1973	pi->tf.type = s->s.type;
1974}
1975
1976static int
1977stream_fill_addr(struct tle_tcp_stream *s, const struct sockaddr *addr)
1978{
1979	const struct sockaddr_in *in4;
1980	const struct sockaddr_in6 *in6;
1981	const struct tle_dev_param *prm;
1982	int32_t rc;
1983
1984	rc = 0;
1985	s->s.pmsk.raw = UINT32_MAX;
1986
1987	/* setup L4 src ports and src address fields. */
1988	if (s->s.type == TLE_V4) {
1989		in4 = (const struct sockaddr_in *)addr;
1990		if (in4->sin_addr.s_addr == INADDR_ANY || in4->sin_port == 0)
1991			return -EINVAL;
1992
1993		s->s.port.src = in4->sin_port;
1994		s->s.ipv4.addr.src = in4->sin_addr.s_addr;
1995		s->s.ipv4.mask.src = INADDR_NONE;
1996		s->s.ipv4.mask.dst = INADDR_NONE;
1997
1998	} else if (s->s.type == TLE_V6) {
1999		in6 = (const struct sockaddr_in6 *)addr;
2000		if (memcmp(&in6->sin6_addr, &tle_ipv6_any,
2001				sizeof(tle_ipv6_any)) == 0 ||
2002				in6->sin6_port == 0)
2003			return -EINVAL;
2004
2005		s->s.port.src = in6->sin6_port;
2006		rte_memcpy(&s->s.ipv6.addr.src, &in6->sin6_addr,
2007			sizeof(s->s.ipv6.addr.src));
2008		rte_memcpy(&s->s.ipv6.mask.src, &tle_ipv6_none,
2009			sizeof(s->s.ipv6.mask.src));
2010		rte_memcpy(&s->s.ipv6.mask.dst, &tle_ipv6_none,
2011			sizeof(s->s.ipv6.mask.dst));
2012	}
2013
2014	/* setup the destination device. */
2015	rc = stream_fill_dest(s);
2016	if (rc != 0)
2017		return rc;
2018
2019	/* setup L4 dst address from device param */
2020	prm = &s->tx.dst.dev->prm;
2021	if (s->s.type == TLE_V4) {
2022		if (s->s.ipv4.addr.dst == INADDR_ANY)
2023			s->s.ipv4.addr.dst = prm->local_addr4.s_addr;
2024	} else if (memcmp(&s->s.ipv6.addr.dst, &tle_ipv6_any,
2025			sizeof(tle_ipv6_any)) == 0)
2026		memcpy(&s->s.ipv6.addr.dst, &prm->local_addr6,
2027			sizeof(s->s.ipv6.addr.dst));
2028
2029	return rc;
2030}
2031
2032static inline int
2033tx_syn(struct tle_tcp_stream *s, const struct sockaddr *addr)
2034{
2035	int32_t rc;
2036	uint32_t tms, seq;
2037	union pkt_info pi;
2038	struct stbl *st;
2039	struct stbl_entry *se;
2040
2041	/* fill stream address */
2042	rc = stream_fill_addr(s, addr);
2043	if (rc != 0)
2044		return rc;
2045
2046	/* fill pkt info to generate seq.*/
2047	stream_fill_pkt_info(s, &pi);
2048
2049	tms = tcp_get_tms();
2050	s->tcb.so.ts.val = tms;
2051	s->tcb.so.ts.ecr = 0;
2052	s->tcb.so.wscale = TCP_WSCALE_DEFAULT;
2053	s->tcb.so.mss = calc_smss(s->tx.dst.mtu, &s->tx.dst);
2054
2055	/* note that rcv.nxt is 0 here for sync_gen_seq.*/
2056	seq = sync_gen_seq(&pi, s->tcb.rcv.nxt, tms, s->tcb.so.mss);
2057	s->tcb.snd.iss = seq;
2058	s->tcb.snd.rcvr = seq;
2059	s->tcb.snd.una = seq;
2060	s->tcb.snd.nxt = seq + 1;
2061	s->tcb.snd.rto = TCP_RTO_DEFAULT;
2062	s->tcb.snd.ts = tms;
2063
2064	s->tcb.rcv.mss = s->tcb.so.mss;
2065	s->tcb.rcv.wscale = TCP_WSCALE_DEFAULT;
2066	s->tcb.rcv.wnd = calc_rx_wnd(s, s->tcb.rcv.wscale);
2067	s->tcb.rcv.ts = 0;
2068
2069	/* add the stream in stream table */
2070	st = CTX_TCP_STLB(s->s.ctx);
2071	se = stbl_add_stream_lock(st, s);
2072	if (se == NULL)
2073		return -ENOBUFS;
2074	s->ste = se;
2075
2076	/* put stream into the to-send queue */
2077	txs_enqueue(s->s.ctx, s);
2078
2079	return 0;
2080}
2081
2082int
2083tle_tcp_stream_connect(struct tle_stream *ts, const struct sockaddr *addr)
2084{
2085	struct tle_tcp_stream *s;
2086	uint32_t type;
2087	int32_t rc;
2088
2089	if (ts == NULL || addr == NULL)
2090		return -EINVAL;
2091
2092	s = TCP_STREAM(ts);
2093	type = s->s.type;
2094	if (type >= TLE_VNUM)
2095		return -EINVAL;
2096
2097	if (rwl_try_acquire(&s->tx.use) > 0) {
2098		rc = rte_atomic16_cmpset(&s->tcb.state, TCP_ST_CLOSED,
2099			TCP_ST_SYN_SENT);
2100		rc = (rc == 0) ? -EDEADLK : 0;
2101	} else
2102		rc = -EINVAL;
2103
2104	if (rc != 0) {
2105		rwl_release(&s->tx.use);
2106		return rc;
2107	}
2108
2109	/* fill stream, prepare and transmit syn pkt */
2110	s->tcb.uop |= TCP_OP_CONNECT;
2111	rc = tx_syn(s, addr);
2112	rwl_release(&s->tx.use);
2113
2114	/* error happened, do a cleanup */
2115	if (rc != 0)
2116		tle_tcp_stream_close(ts);
2117
2118	return rc;
2119}
2120
2121uint16_t
2122tle_tcp_stream_recv(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
2123{
2124	uint32_t n;
2125	struct tle_tcp_stream *s;
2126
2127	s = TCP_STREAM(ts);
2128	n = rte_ring_mc_dequeue_burst(s->rx.q, (void **)pkt, num);
2129	if (n == 0)
2130		return 0;
2131
2132	/*
2133	 * if we still have packets to read,
2134	 * then rearm stream RX event.
2135	 */
2136	if (n == num && rte_ring_count(s->rx.q) != 0) {
2137		if (rwl_try_acquire(&s->rx.use) > 0 && s->rx.ev != NULL)
2138			tle_event_raise(s->rx.ev);
2139		rwl_release(&s->rx.use);
2140	}
2141
2142	return n;
2143}
2144
2145uint16_t
2146tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
2147{
2148	uint32_t i, j, mss, n, state, type;
2149	uint64_t ol_flags;
2150	struct tle_tcp_stream *s;
2151	struct tle_dev *dev;
2152
2153	s = TCP_STREAM(ts);
2154
2155	/* mark stream as not closable. */
2156	if (rwl_acquire(&s->tx.use) < 0) {
2157		rte_errno = EAGAIN;
2158		return 0;
2159	}
2160
2161	state = s->tcb.state;
2162	if (state != TCP_ST_ESTABLISHED && state != TCP_ST_CLOSE_WAIT) {
2163		rte_errno = ENOTCONN;
2164		n = 0;
2165	} else {
2166		mss = s->tcb.snd.mss;
2167		dev = s->tx.dst.dev;
2168		type = s->s.type;
2169		ol_flags = dev->tx.ol_flags[type];
2170
2171		/* prepare and check for TX */
2172		for (i = 0; i != num; i++) {
2173
2174			/* !!! need to be modified !!! */
2175			if (pkt[i]->pkt_len > mss ||
2176					pkt[i]->nb_segs > TCP_MAX_PKT_SEG) {
2177				rte_errno = EBADMSG;
2178				break;
2179			} else if (tcp_fill_mbuf(pkt[i], s, &s->tx.dst,
2180					ol_flags, s->s.port, 0, TCP_FLAG_ACK,
2181					0, 0) != 0)
2182				break;
2183		}
2184
2185		/* queue packets for further transmision. */
2186		n = rte_ring_mp_enqueue_burst(s->tx.q, (void **)pkt, i);
2187
2188		/* notify BE about more data to send */
2189		if (n != 0)
2190			txs_enqueue(s->s.ctx, s);
2191
2192		/*
2193		 * for unsent, but already modified packets:
2194		 * remove pkt l2/l3 headers, restore ol_flags
2195		 */
2196		if (n != i) {
2197			ol_flags = ~dev->tx.ol_flags[type];
2198			for (j = n; j != i; j++) {
2199				rte_pktmbuf_adj(pkt[j], pkt[j]->l2_len +
2200					pkt[j]->l3_len + pkt[j]->l4_len);
2201				pkt[j]->ol_flags &= ol_flags;
2202			}
2203		/* if possible, rearm stream write event. */
2204		} else if (rte_ring_free_count(s->tx.q) != 0 &&
2205				s->tx.ev != NULL)
2206			tle_event_raise(s->tx.ev);
2207	}
2208
2209	rwl_release(&s->tx.use);
2210	return n;
2211}
2212
2213/* send data and FIN (if needed) */
2214static inline void
2215tx_data_fin(struct tle_tcp_stream *s, uint32_t tms, uint32_t state)
2216{
2217	/* try to send some data */
2218	tx_nxt_data(s, tms);
2219
2220	/* we also have to send a FIN */
2221	if (state != TCP_ST_ESTABLISHED &&
2222			state != TCP_ST_CLOSE_WAIT &&
2223			tcp_txq_nxt_cnt(s) == 0 &&
2224			s->tcb.snd.fss != s->tcb.snd.nxt) {
2225		s->tcb.snd.fss = ++s->tcb.snd.nxt;
2226		send_ack(s, tms, TCP_FLAG_FIN | TCP_FLAG_ACK);
2227	}
2228}
2229
2230static inline void
2231tx_stream(struct tle_tcp_stream *s, uint32_t tms)
2232{
2233	uint32_t state;
2234
2235	state = s->tcb.state;
2236
2237	if (state == TCP_ST_SYN_SENT) {
2238		/* send the SYN, start the rto timer */
2239		send_ack(s, tms, TCP_FLAG_SYN);
2240		timer_start(s);
2241
2242	} else if (state >= TCP_ST_ESTABLISHED && state <= TCP_ST_LAST_ACK) {
2243
2244		tx_data_fin(s, tms, state);
2245
2246		/* start RTO timer. */
2247		if (s->tcb.snd.nxt != s->tcb.snd.una)
2248			timer_start(s);
2249	}
2250}
2251
2252static inline void
2253rto_stream(struct tle_tcp_stream *s, uint32_t tms)
2254{
2255	uint32_t state;
2256
2257	state = s->tcb.state;
2258
2259	TCP_LOG(DEBUG, "%s(%p, tms=%u): state=%u, "
2260		"retx=%u, retm=%u, "
2261		"rto=%u, snd.ts=%u, tmo=%u, "
2262		"snd.nxt=%lu, snd.una=%lu, flight_size=%lu, "
2263		"snd.rcvr=%lu, snd.fastack=%u, "
2264		"wnd=%u, cwnd=%u, ssthresh=%u, "
2265		"bytes sent=%lu, pkt remain=%u;\n",
2266		__func__, s, tms, s->tcb.state,
2267		s->tcb.snd.nb_retx, s->tcb.snd.nb_retm,
2268		s->tcb.snd.rto, s->tcb.snd.ts, tms - s->tcb.snd.ts,
2269		s->tcb.snd.nxt, s->tcb.snd.una, s->tcb.snd.nxt - s->tcb.snd.una,
2270		s->tcb.snd.rcvr, s->tcb.snd.fastack,
2271		s->tcb.snd.wnd, s->tcb.snd.cwnd, s->tcb.snd.ssthresh,
2272		s->tcb.snd.nxt - s->tcb.snd.iss, tcp_txq_nxt_cnt(s));
2273
2274	if (s->tcb.snd.nb_retx < s->tcb.snd.nb_retm) {
2275
2276		if (state >= TCP_ST_ESTABLISHED && state <= TCP_ST_LAST_ACK) {
2277
2278			/* update SND.CWD and SND.SSTHRESH */
2279			rto_cwnd_update(&s->tcb);
2280
2281			/* RFC 6582 3.2.4 */
2282			s->tcb.snd.rcvr = s->tcb.snd.nxt;
2283			s->tcb.snd.fastack = 0;
2284
2285			/* restart from last acked data */
2286			tcp_txq_rst_nxt_head(s);
2287			s->tcb.snd.nxt = s->tcb.snd.una;
2288
2289			tx_data_fin(s, tms, state);
2290
2291		} else if (state == TCP_ST_SYN_SENT) {
2292			/* resending SYN */
2293			s->tcb.so.ts.val = tms;
2294			send_ack(s, tms, TCP_FLAG_SYN);
2295
2296		} else if (state == TCP_ST_TIME_WAIT) {
2297			stream_term(s);
2298		}
2299
2300		/* RFC6298:5.5 back off the timer */
2301		s->tcb.snd.rto = rto_roundup(2 * s->tcb.snd.rto);
2302		s->tcb.snd.nb_retx++;
2303		timer_restart(s);
2304
2305	} else {
2306		send_rst(s, s->tcb.snd.una);
2307		stream_term(s);
2308	}
2309}
2310
2311int
2312tle_tcp_process(struct tle_ctx *ctx, uint32_t num)
2313{
2314	uint32_t i, k, tms;
2315	struct sdr *dr;
2316	struct tle_timer_wheel *tw;
2317	struct tle_stream *p;
2318	struct tle_tcp_stream *s, *rs[num];
2319
2320	/* process streams with RTO exipred */
2321
2322	tw = CTX_TCP_TMWHL(ctx);
2323	tms = tcp_get_tms();
2324	tle_timer_expire(tw, tms);
2325
2326	k = tle_timer_get_expired_bulk(tw, (void **)rs, RTE_DIM(rs));
2327
2328	for (i = 0; i != k; i++) {
2329
2330		s = rs[i];
2331		s->timer.handle = NULL;
2332		if (rwl_try_acquire(&s->tx.use) > 0)
2333			rto_stream(s, tms);
2334		rwl_release(&s->tx.use);
2335	}
2336
2337	/* process streams from to-send queue */
2338
2339	k = txs_dequeue_bulk(ctx, rs, RTE_DIM(rs));
2340
2341	for (i = 0; i != k; i++) {
2342
2343		s = rs[i];
2344		if (rwl_try_acquire(&s->tx.use) > 0 &&
2345				rte_atomic32_read(&s->tx.arm) > 0) {
2346			rte_atomic32_set(&s->tx.arm, 0);
2347			tx_stream(s, tms);
2348		}
2349		rwl_release(&s->tx.use);
2350	}
2351
2352	/* collect streams to close from the death row */
2353
2354	dr = CTX_TCP_SDR(ctx);
2355	for (k = 0, p = STAILQ_FIRST(&dr->be);
2356			k != num && p != NULL;
2357			k++, p = STAILQ_NEXT(p, link))
2358		rs[k] = TCP_STREAM(p);
2359
2360	if (p == NULL)
2361		STAILQ_INIT(&dr->be);
2362	else
2363		STAILQ_FIRST(&dr->be) = p;
2364
2365	/* cleanup closed streams */
2366	for (i = 0; i != k; i++) {
2367		s = rs[i];
2368		tcp_stream_down(s);
2369		tcp_stream_reset(ctx, s);
2370	}
2371
2372	return 0;
2373}
2374