tcp_rxtx.c revision 5740a1da
1/*
2 * Copyright (c) 2016-2017  Intel Corporation.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#include <rte_errno.h>
17#include <rte_ethdev.h>
18#include <rte_ip.h>
19#include <rte_ip_frag.h>
20#include <rte_tcp.h>
21
22#include "tcp_stream.h"
23#include "tcp_timer.h"
24#include "stream_table.h"
25#include "syncookie.h"
26#include "misc.h"
27#include "tcp_ctl.h"
28#include "tcp_rxq.h"
29#include "tcp_txq.h"
30#include "tcp_tx_seg.h"
31
32#define	TCP_MAX_PKT_SEG	0x20
33
34/*
35 * checks if input TCP ports and IP addresses match given stream.
36 * returns zero on success.
37 */
38static inline int
39rx_check_stream(const struct tle_tcp_stream *s, const union pkt_info *pi)
40{
41	int32_t rc;
42
43	if (pi->tf.type == TLE_V4)
44		rc = (pi->port.raw & s->s.pmsk.raw) != s->s.port.raw ||
45			(pi->addr4.raw & s->s.ipv4.mask.raw) !=
46			s->s.ipv4.addr.raw;
47	else
48		rc = (pi->port.raw & s->s.pmsk.raw) != s->s.port.raw ||
49			ymm_mask_cmp(&pi->addr6->raw, &s->s.ipv6.addr.raw,
50			&s->s.ipv6.mask.raw) != 0;
51
52	return rc;
53}
54
55static inline struct tle_tcp_stream *
56rx_obtain_listen_stream(const struct tle_dev *dev, const union pkt_info *pi,
57	uint32_t type)
58{
59	struct tle_tcp_stream *s;
60
61	s = (struct tle_tcp_stream *)dev->dp[type]->streams[pi->port.dst];
62	if (s == NULL || tcp_stream_acquire(s) < 0)
63		return NULL;
64
65	/* check that we have a proper stream. */
66	if (s->tcb.state != TCP_ST_LISTEN) {
67		tcp_stream_release(s);
68		s = NULL;
69	}
70
71	return s;
72}
73
74static inline struct tle_tcp_stream *
75rx_obtain_stream(const struct tle_dev *dev, struct stbl *st,
76	const union pkt_info *pi, uint32_t type)
77{
78	struct tle_tcp_stream *s;
79
80	s = stbl_find_data(st, pi);
81	if (s == NULL) {
82		if (pi->tf.flags == TCP_FLAG_ACK)
83			return rx_obtain_listen_stream(dev, pi, type);
84		return NULL;
85	}
86
87	if (tcp_stream_acquire(s) < 0)
88		return NULL;
89	/* check that we have a proper stream. */
90	else if (s->tcb.state == TCP_ST_CLOSED) {
91		tcp_stream_release(s);
92		s = NULL;
93	}
94
95	return s;
96}
97
98/*
99 * Consider 2 pkt_info *equal* if their:
100 * - types (IPv4/IPv6)
101 * - TCP flags
102 * - checksum flags
103 * - TCP src and dst ports
104 * - IP src and dst addresses
105 * are equal.
106 */
107static inline int
108pkt_info_bulk_eq(const union pkt_info pi[], uint32_t num)
109{
110	uint32_t i;
111
112	i = 1;
113
114	if (pi[0].tf.type == TLE_V4) {
115		while (i != num && xmm_cmp(&pi[0].raw, &pi[i].raw) == 0)
116			i++;
117
118	} else if (pi[0].tf.type == TLE_V6) {
119		while (i != num &&
120				pi[0].raw.u64[0] == pi[i].raw.u64[0] &&
121				ymm_cmp(&pi[0].addr6->raw,
122				&pi[i].addr6->raw) == 0)
123			i++;
124	}
125
126	return i;
127}
128
129static inline int
130pkt_info_bulk_syneq(const union pkt_info pi[], uint32_t num)
131{
132	uint32_t i;
133
134	i = 1;
135
136	if (pi[0].tf.type == TLE_V4) {
137		while (i != num && pi[0].tf.raw == pi[i].tf.raw &&
138				pi[0].port.dst == pi[i].port.dst &&
139				pi[0].addr4.dst == pi[i].addr4.dst)
140			i++;
141
142	} else if (pi[0].tf.type == TLE_V6) {
143		while (i != num && pi[0].tf.raw == pi[i].tf.raw &&
144				pi[0].port.dst == pi[i].port.dst &&
145				xmm_cmp(&pi[0].addr6->dst,
146				&pi[i].addr6->dst) == 0)
147			i++;
148	}
149
150	return i;
151}
152
153static inline void
154stream_drb_free(struct tle_tcp_stream *s, struct tle_drb *drbs[],
155	uint32_t nb_drb)
156{
157	_rte_ring_enqueue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
158}
159
160static inline uint32_t
161stream_drb_alloc(struct tle_tcp_stream *s, struct tle_drb *drbs[],
162	uint32_t nb_drb)
163{
164	return _rte_ring_dequeue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
165}
166
167static inline uint32_t
168get_ip_pid(struct tle_dev *dev, uint32_t num, uint32_t type, uint32_t st)
169{
170	uint32_t pid;
171	rte_atomic32_t *pa;
172
173	pa = &dev->tx.packet_id[type];
174
175	if (st == 0) {
176		pid = rte_atomic32_add_return(pa, num);
177		return pid - num;
178	} else {
179		pid = rte_atomic32_read(pa);
180		rte_atomic32_set(pa, pid + num);
181		return pid;
182	}
183}
184
185static inline void
186fill_tcph(struct tcp_hdr *l4h, const struct tcb *tcb, union l4_ports port,
187	uint32_t seq, uint8_t hlen, uint8_t flags)
188{
189	uint16_t wnd;
190
191	l4h->src_port = port.dst;
192	l4h->dst_port = port.src;
193
194	wnd = (flags & TCP_FLAG_SYN) ?
195		RTE_MIN(tcb->rcv.wnd, (uint32_t)UINT16_MAX) :
196		tcb->rcv.wnd >> tcb->rcv.wscale;
197
198	/* ??? use sse shuffle to hton all remaining 16 bytes at once. ??? */
199	l4h->sent_seq = rte_cpu_to_be_32(seq);
200	l4h->recv_ack = rte_cpu_to_be_32(tcb->rcv.nxt);
201	l4h->data_off = hlen / TCP_DATA_ALIGN << TCP_DATA_OFFSET;
202	l4h->tcp_flags = flags;
203	l4h->rx_win = rte_cpu_to_be_16(wnd);
204	l4h->cksum = 0;
205	l4h->tcp_urp = 0;
206
207	if (flags & TCP_FLAG_SYN)
208		fill_syn_opts(l4h + 1, &tcb->so);
209	else if ((flags & TCP_FLAG_RST) == 0 && tcb->so.ts.raw != 0)
210		fill_tms_opts(l4h + 1, tcb->snd.ts, tcb->rcv.ts);
211}
212
213static inline int
214tcp_fill_mbuf(struct rte_mbuf *m, const struct tle_tcp_stream *s,
215	const struct tle_dest *dst, uint64_t ol_flags,
216	union l4_ports port, uint32_t seq, uint32_t flags,
217	uint32_t pid, uint32_t swcsm)
218{
219	uint32_t l4, len, plen;
220	struct tcp_hdr *l4h;
221	char *l2h;
222
223	len = dst->l2_len + dst->l3_len;
224	plen = m->pkt_len;
225
226	if (flags & TCP_FLAG_SYN)
227		l4 = sizeof(*l4h) + TCP_TX_OPT_LEN_MAX;
228	else if ((flags & TCP_FLAG_RST) == 0 && s->tcb.rcv.ts != 0)
229		l4 = sizeof(*l4h) + TCP_TX_OPT_LEN_TMS;
230	else
231		l4 = sizeof(*l4h);
232
233	/* adjust mbuf to put L2/L3/L4 headers into it. */
234	l2h = rte_pktmbuf_prepend(m, len + l4);
235	if (l2h == NULL)
236		return -EINVAL;
237
238	/* copy L2/L3 header */
239	rte_memcpy(l2h, dst->hdr, len);
240
241	/* setup TCP header & options */
242	l4h = (struct tcp_hdr *)(l2h + len);
243	fill_tcph(l4h, &s->tcb, port, seq, l4, flags);
244
245	/* setup mbuf TX offload related fields. */
246	m->tx_offload = _mbuf_tx_offload(dst->l2_len, dst->l3_len, l4, 0, 0, 0);
247	m->ol_flags |= ol_flags;
248
249	/* update proto specific fields. */
250
251	if (s->s.type == TLE_V4) {
252		struct ipv4_hdr *l3h;
253		l3h = (struct ipv4_hdr *)(l2h + dst->l2_len);
254		l3h->packet_id = rte_cpu_to_be_16(pid);
255		l3h->total_length = rte_cpu_to_be_16(plen + dst->l3_len + l4);
256
257		if ((ol_flags & PKT_TX_TCP_CKSUM) != 0)
258			l4h->cksum = _ipv4x_phdr_cksum(l3h, m->l3_len,
259				ol_flags);
260		else if (swcsm != 0)
261			l4h->cksum = _ipv4_udptcp_mbuf_cksum(m, len, l3h);
262
263		if ((ol_flags & PKT_TX_IP_CKSUM) == 0 && swcsm != 0)
264			l3h->hdr_checksum = _ipv4x_cksum(l3h, m->l3_len);
265	} else {
266		struct ipv6_hdr *l3h;
267		l3h = (struct ipv6_hdr *)(l2h + dst->l2_len);
268		l3h->payload_len = rte_cpu_to_be_16(plen + l4);
269		if ((ol_flags & PKT_TX_TCP_CKSUM) != 0)
270			l4h->cksum = rte_ipv6_phdr_cksum(l3h, ol_flags);
271		else if (swcsm != 0)
272			l4h->cksum = _ipv6_udptcp_mbuf_cksum(m, len, l3h);
273	}
274
275	return 0;
276}
277
278/*
279 * That function supposed to be used only for data packets.
280 * Assumes that L2/L3/L4 headers and mbuf fields already setup properly.
281 *  - updates tcp SEG.SEQ, SEG.ACK, TS.VAL, TS.ECR.
282 *  - if no HW cksum offloads are enabled, calculates TCP checksum.
283 */
284static inline void
285tcp_update_mbuf(struct rte_mbuf *m, uint32_t type, const struct tcb *tcb,
286	uint32_t seq, uint32_t pid)
287{
288	struct tcp_hdr *l4h;
289	uint32_t len;
290
291	len = m->l2_len + m->l3_len;
292	l4h = rte_pktmbuf_mtod_offset(m, struct tcp_hdr *, len);
293
294	l4h->sent_seq = rte_cpu_to_be_32(seq);
295	l4h->recv_ack = rte_cpu_to_be_32(tcb->rcv.nxt);
296
297	if (tcb->so.ts.raw != 0)
298		fill_tms_opts(l4h + 1, tcb->snd.ts, tcb->rcv.ts);
299
300	if (type == TLE_V4) {
301		struct ipv4_hdr *l3h;
302		l3h = rte_pktmbuf_mtod_offset(m, struct ipv4_hdr *, m->l2_len);
303		l3h->hdr_checksum = 0;
304		l3h->packet_id = rte_cpu_to_be_16(pid);
305		if ((m->ol_flags & PKT_TX_IP_CKSUM) == 0)
306			l3h->hdr_checksum = _ipv4x_cksum(l3h, m->l3_len);
307	}
308
309	/* have to calculate TCP checksum in SW */
310	if ((m->ol_flags & PKT_TX_TCP_CKSUM) == 0) {
311
312		l4h->cksum = 0;
313
314		if (type == TLE_V4) {
315			struct ipv4_hdr *l3h;
316			l3h = rte_pktmbuf_mtod_offset(m, struct ipv4_hdr *,
317				m->l2_len);
318			l4h->cksum = _ipv4_udptcp_mbuf_cksum(m, len, l3h);
319
320		} else {
321			struct ipv6_hdr *l3h;
322			l3h = rte_pktmbuf_mtod_offset(m, struct ipv6_hdr *,
323				m->l2_len);
324			l4h->cksum = _ipv6_udptcp_mbuf_cksum(m, len, l3h);
325		}
326	}
327}
328
329/* Send data packets that need to be ACK-ed by peer */
330static inline uint32_t
331tx_data_pkts(struct tle_tcp_stream *s, struct rte_mbuf *const m[], uint32_t num)
332{
333	uint32_t bsz, i, nb, nbm;
334	struct tle_dev *dev;
335	struct tle_drb *drb[num];
336
337	/* calculate how many drbs are needed.*/
338	bsz = s->tx.drb.nb_elem;
339	nbm = (num + bsz - 1) / bsz;
340
341	/* allocate drbs, adjust number of packets. */
342	nb = stream_drb_alloc(s, drb, nbm);
343
344	/* drb ring is empty. */
345	if (nb == 0)
346		return 0;
347
348	else if (nb != nbm)
349		num = nb * bsz;
350
351	dev = s->tx.dst.dev;
352
353	/* enqueue pkts for TX. */
354	nbm = nb;
355	i = tle_dring_mp_enqueue(&dev->tx.dr, (const void * const*)m,
356		num, drb, &nb);
357
358	/* free unused drbs. */
359	if (nb != 0)
360		stream_drb_free(s, drb + nbm - nb, nb);
361
362	return i;
363}
364
365static inline uint32_t
366tx_data_bulk(struct tle_tcp_stream *s, union seqlen *sl, struct rte_mbuf *mi[],
367	uint32_t num)
368{
369	uint32_t fail, i, k, n, mss, pid, plen, sz, tn, type;
370	struct tle_dev *dev;
371	struct rte_mbuf *mb;
372	struct rte_mbuf *mo[MAX_PKT_BURST + TCP_MAX_PKT_SEG];
373
374	mss = s->tcb.snd.mss;
375	type = s->s.type;
376
377	dev = s->tx.dst.dev;
378	pid = get_ip_pid(dev, num, type, (s->flags & TLE_CTX_FLAG_ST) != 0);
379
380	k = 0;
381	tn = 0;
382	fail = 0;
383	for (i = 0; i != num && sl->len != 0 && fail == 0; i++) {
384
385		mb = mi[i];
386		sz = RTE_MIN(sl->len, mss);
387		plen = PKT_L4_PLEN(mb);
388
389		/*fast path, no need to use indirect mbufs. */
390		if (plen <= sz) {
391
392			/* update pkt TCP header */
393			tcp_update_mbuf(mb, type, &s->tcb, sl->seq, pid + i);
394
395			/* keep mbuf till ACK is received. */
396			rte_pktmbuf_refcnt_update(mb, 1);
397			sl->len -= plen;
398			sl->seq += plen;
399			mo[k++] = mb;
400		/* remaining snd.wnd is less them MSS, send nothing */
401		} else if (sz < mss)
402			break;
403		/* packet indirection needed */
404		else
405			RTE_VERIFY(0);
406
407		if (k >= MAX_PKT_BURST) {
408			n = tx_data_pkts(s, mo, k);
409			fail = k - n;
410			tn += n;
411			k = 0;
412		}
413	}
414
415	if (k != 0) {
416		n = tx_data_pkts(s, mo, k);
417		fail = k - n;
418		tn += n;
419	}
420
421	if (fail != 0) {
422		sz = tcp_mbuf_seq_free(mo + n, fail);
423		sl->seq -= sz;
424		sl->len += sz;
425	}
426
427	return tn;
428}
429
430/*
431 * gets data from stream send buffer, updates it and
432 * queues it into TX device queue.
433 * Note that this function and is not MT safe.
434 */
435static inline uint32_t
436tx_nxt_data(struct tle_tcp_stream *s, uint32_t tms)
437{
438	uint32_t n, num, tn, wnd;
439	struct rte_mbuf **mi;
440	union seqlen sl;
441
442	tn = 0;
443	wnd = s->tcb.snd.wnd - (uint32_t)(s->tcb.snd.nxt - s->tcb.snd.una);
444	sl.seq = s->tcb.snd.nxt;
445	sl.len = RTE_MIN(wnd, s->tcb.snd.cwnd);
446
447	if (sl.len == 0)
448		return tn;
449
450	/* update send timestamp */
451	s->tcb.snd.ts = tms;
452
453	do {
454		/* get group of packets */
455		mi = tcp_txq_get_nxt_objs(s, &num);
456
457		/* stream send buffer is empty */
458		if (num == 0)
459			break;
460
461		/* queue data packets for TX */
462		n = tx_data_bulk(s, &sl, mi, num);
463		tn += n;
464
465		/* update consumer head */
466		tcp_txq_set_nxt_head(s, n);
467	} while (n == num);
468
469	s->tcb.snd.nxt += sl.seq - (uint32_t)s->tcb.snd.nxt;
470	return tn;
471}
472
473static inline void
474free_una_data(struct tle_tcp_stream *s, uint32_t len)
475{
476	uint32_t i, num, plen;
477	struct rte_mbuf **mi;
478
479	plen = 0;
480
481	do {
482		/* get group of packets */
483		mi = tcp_txq_get_una_objs(s, &num);
484
485		if (num == 0)
486			break;
487
488		/* free acked data */
489		for (i = 0; i != num && plen != len; i++) {
490			uint32_t next_pkt_len = PKT_L4_PLEN(mi[i]);
491			if (plen + next_pkt_len > len) {
492				/* keep SND.UNA at the start of the packet */
493				len = plen;
494				break;
495			} else {
496				plen += next_pkt_len;
497			}
498			rte_pktmbuf_free(mi[i]);
499		}
500
501		/* update consumer tail */
502		tcp_txq_set_una_tail(s, i);
503	} while (plen < len);
504
505	s->tcb.snd.una += len;
506
507	/*
508	 * that could happen in case of retransmit,
509	 * adjust SND.NXT with SND.UNA.
510	 */
511	if (s->tcb.snd.una > s->tcb.snd.nxt) {
512		tcp_txq_rst_nxt_head(s);
513		s->tcb.snd.nxt = s->tcb.snd.una;
514	}
515}
516
517static inline uint16_t
518calc_smss(uint16_t mss, const struct tle_dest *dst)
519{
520	uint16_t n;
521
522	n = dst->mtu - dst->l2_len - dst->l3_len - TCP_TX_HDR_DACK;
523	mss = RTE_MIN(n, mss);
524	return mss;
525}
526
527/*
528 * RFC 6928 2
529 * min (10*MSS, max (2*MSS, 14600))
530 *
531 * or using user provided initial congestion window (icw)
532 * min (10*MSS, max (2*MSS, icw))
533 */
534static inline uint32_t
535initial_cwnd(uint32_t smss, uint32_t icw)
536{
537	return RTE_MIN(10 * smss, RTE_MAX(2 * smss, icw));
538}
539
540/*
541 * queue standalone packet to he particular output device
542 * It assumes that:
543 * - L2/L3/L4 headers should be already set.
544 * - packet fits into one segment.
545 */
546static inline int
547send_pkt(struct tle_tcp_stream *s, struct tle_dev *dev, struct rte_mbuf *m)
548{
549	uint32_t n, nb;
550	struct tle_drb *drb;
551
552	if (stream_drb_alloc(s, &drb, 1) == 0)
553		return -ENOBUFS;
554
555	/* enqueue pkt for TX. */
556	nb = 1;
557	n = tle_dring_mp_enqueue(&dev->tx.dr, (const void * const*)&m, 1,
558		&drb, &nb);
559
560	/* free unused drbs. */
561	if (nb != 0)
562		stream_drb_free(s, &drb, 1);
563
564	return (n == 1) ? 0 : -ENOBUFS;
565}
566
567static inline int
568send_ctrl_pkt(struct tle_tcp_stream *s, struct rte_mbuf *m, uint32_t seq,
569	uint32_t flags)
570{
571	const struct tle_dest *dst;
572	uint32_t pid, type;
573	int32_t rc;
574
575	dst = &s->tx.dst;
576	type = s->s.type;
577	pid = get_ip_pid(dst->dev, 1, type, (s->flags & TLE_CTX_FLAG_ST) != 0);
578
579	rc = tcp_fill_mbuf(m, s, dst, 0, s->s.port, seq, flags, pid, 1);
580	if (rc == 0)
581		rc = send_pkt(s, dst->dev, m);
582
583	return rc;
584}
585
586static inline int
587send_rst(struct tle_tcp_stream *s, uint32_t seq)
588{
589	struct rte_mbuf *m;
590	int32_t rc;
591
592	m = rte_pktmbuf_alloc(s->tx.dst.head_mp);
593	if (m == NULL)
594		return -ENOMEM;
595
596	rc = send_ctrl_pkt(s, m, seq, TCP_FLAG_RST);
597	if (rc != 0)
598		rte_pktmbuf_free(m);
599
600	return rc;
601}
602
603static inline int
604send_ack(struct tle_tcp_stream *s, uint32_t tms, uint32_t flags)
605{
606	struct rte_mbuf *m;
607	uint32_t seq;
608	int32_t rc;
609
610	m = rte_pktmbuf_alloc(s->tx.dst.head_mp);
611	if (m == NULL)
612		return -ENOMEM;
613
614	seq = s->tcb.snd.nxt - ((flags & (TCP_FLAG_FIN | TCP_FLAG_SYN)) != 0);
615	s->tcb.snd.ts = tms;
616
617	rc = send_ctrl_pkt(s, m, seq, flags);
618	if (rc != 0) {
619		rte_pktmbuf_free(m);
620		return rc;
621	}
622
623	s->tcb.snd.ack = s->tcb.rcv.nxt;
624	return 0;
625}
626
627
628static int
629sync_ack(struct tle_tcp_stream *s, const union pkt_info *pi,
630	const union seg_info *si, uint32_t ts, struct rte_mbuf *m)
631{
632	uint16_t len;
633	int32_t rc;
634	uint32_t pid, seq, type;
635	struct tle_dev *dev;
636	const void *da;
637	struct tle_dest dst;
638	const struct tcp_hdr *th;
639
640	type = s->s.type;
641
642	/* get destination information. */
643	if (type == TLE_V4)
644		da = &pi->addr4.src;
645	else
646		da = &pi->addr6->src;
647
648	rc = stream_get_dest(&s->s, da, &dst);
649	if (rc < 0)
650		return rc;
651
652	th = rte_pktmbuf_mtod_offset(m, const struct tcp_hdr *,
653		m->l2_len + m->l3_len);
654	get_syn_opts(&s->tcb.so, (uintptr_t)(th + 1), m->l4_len - sizeof(*th));
655
656	s->tcb.rcv.nxt = si->seq + 1;
657	seq = sync_gen_seq(pi, s->tcb.rcv.nxt, ts, s->tcb.so.mss,
658				s->s.ctx->prm.hash_alg,
659				&s->s.ctx->prm.secret_key);
660	s->tcb.so.ts.ecr = s->tcb.so.ts.val;
661	s->tcb.so.ts.val = sync_gen_ts(ts, s->tcb.so.wscale);
662	s->tcb.so.wscale = (s->tcb.so.wscale == TCP_WSCALE_NONE) ?
663		TCP_WSCALE_NONE : TCP_WSCALE_DEFAULT;
664	s->tcb.so.mss = calc_smss(dst.mtu, &dst);
665
666	/* reset mbuf's data contents. */
667	len = m->l2_len + m->l3_len + m->l4_len;
668	m->tx_offload = 0;
669	if (rte_pktmbuf_adj(m, len) == NULL)
670		return -EINVAL;
671
672	dev = dst.dev;
673	pid = get_ip_pid(dev, 1, type, (s->flags & TLE_CTX_FLAG_ST) != 0);
674
675	rc = tcp_fill_mbuf(m, s, &dst, 0, pi->port, seq,
676		TCP_FLAG_SYN | TCP_FLAG_ACK, pid, 1);
677	if (rc == 0)
678		rc = send_pkt(s, dev, m);
679
680	return rc;
681}
682
683/*
684 * RFC 793:
685 * There are four cases for the acceptability test for an incoming segment:
686 * Segment Receive  Test
687 * Length  Window
688 * ------- -------  -------------------------------------------
689 *    0       0     SEG.SEQ = RCV.NXT
690 *    0      >0     RCV.NXT =< SEG.SEQ < RCV.NXT+RCV.WND
691 *   >0       0     not acceptable
692 *   >0      >0     RCV.NXT =< SEG.SEQ < RCV.NXT+RCV.WND
693 *                  or RCV.NXT =< SEG.SEQ+SEG.LEN-1 < RCV.NXT+RCV.WND
694 */
695static inline int
696check_seqn(const struct tcb *tcb, uint32_t seqn, uint32_t len)
697{
698	uint32_t n;
699
700	n = seqn + len;
701	if (seqn - tcb->rcv.nxt >= tcb->rcv.wnd &&
702			n - tcb->rcv.nxt > tcb->rcv.wnd)
703		return -ERANGE;
704
705	return 0;
706}
707
708static inline union tsopt
709rx_tms_opt(const struct tcb *tcb, const struct rte_mbuf *mb)
710{
711	union tsopt ts;
712	uintptr_t opt;
713	const struct tcp_hdr *th;
714
715	if (tcb->so.ts.val != 0) {
716		opt = rte_pktmbuf_mtod_offset(mb, uintptr_t,
717			mb->l2_len + mb->l3_len + sizeof(*th));
718		ts = get_tms_opts(opt, mb->l4_len - sizeof(*th));
719	} else
720		ts.raw = 0;
721
722	return ts;
723}
724
725/*
726 * PAWS and sequence check.
727 * RFC 1323 4.2.1
728 */
729static inline int
730rx_check_seq(struct tcb *tcb, uint32_t seq, uint32_t len, const union tsopt ts)
731{
732	int32_t rc;
733
734	/* RFC 1323 4.2.1 R2 */
735	rc = check_seqn(tcb, seq, len);
736	if (rc < 0)
737		return rc;
738
739	if (ts.raw != 0) {
740
741		/* RFC 1323 4.2.1 R1 */
742		if (tcp_seq_lt(ts.val, tcb->rcv.ts))
743			return -ERANGE;
744
745		/* RFC 1323 4.2.1 R3 */
746		if (tcp_seq_leq(seq, tcb->snd.ack) &&
747				tcp_seq_lt(tcb->snd.ack, seq + len))
748			tcb->rcv.ts = ts.val;
749	}
750
751	return rc;
752}
753
754static inline int
755rx_check_ack(const struct tcb *tcb, uint32_t ack)
756{
757	uint32_t max;
758
759	max = (uint32_t)RTE_MAX(tcb->snd.nxt, tcb->snd.rcvr);
760
761	if (tcp_seq_leq(tcb->snd.una, ack) && tcp_seq_leq(ack, max))
762		return 0;
763
764	return -ERANGE;
765}
766
767static inline int
768rx_check_seqack(struct tcb *tcb, uint32_t seq, uint32_t ack, uint32_t len,
769	const union tsopt ts)
770{
771	int32_t rc;
772
773	rc = rx_check_seq(tcb, seq, len, ts);
774	rc |= rx_check_ack(tcb, ack);
775	return rc;
776}
777
778static inline int
779restore_syn_opt(union seg_info *si, union tsopt *to,
780	const union pkt_info *pi, uint32_t ts, const struct rte_mbuf *mb,
781	uint32_t hash_alg, rte_xmm_t *secret_key)
782{
783	int32_t rc;
784	uint32_t len;
785	const struct tcp_hdr *th;
786
787	/* check that ACK, etc fields are what we expected. */
788	rc = sync_check_ack(pi, si->seq, si->ack - 1, ts,
789				hash_alg,
790				secret_key);
791	if (rc < 0)
792		return rc;
793
794	si->mss = rc;
795
796	th = rte_pktmbuf_mtod_offset(mb, const struct tcp_hdr *,
797		mb->l2_len + mb->l3_len);
798	len = mb->l4_len - sizeof(*th);
799	to[0] = get_tms_opts((uintptr_t)(th + 1), len);
800	return 0;
801}
802
803static inline void
804stream_term(struct tle_tcp_stream *s)
805{
806	struct sdr *dr;
807
808	s->tcb.state = TCP_ST_CLOSED;
809	rte_smp_wmb();
810
811	timer_stop(s);
812
813	/* close() was already invoked, schedule final cleanup */
814	if ((s->tcb.uop & TCP_OP_CLOSE) != 0) {
815
816		dr = CTX_TCP_SDR(s->s.ctx);
817		STAILQ_INSERT_TAIL(&dr->be, &s->s, link);
818
819	/* notify user that stream need to be closed */
820	} else if (s->err.ev != NULL)
821		tle_event_raise(s->err.ev);
822	else if (s->err.cb.func != NULL)
823		s->err.cb.func(s->err.cb.data, &s->s);
824}
825
826static inline int
827stream_fill_dest(struct tle_tcp_stream *s)
828{
829	int32_t rc;
830	uint32_t type;
831	const void *da;
832
833        type = s->s.type;
834	if (type == TLE_V4)
835		da = &s->s.ipv4.addr.src;
836	else
837		da = &s->s.ipv6.addr.src;
838
839	rc = stream_get_dest(&s->s, da, &s->tx.dst);
840	return (rc < 0) ? rc : 0;
841}
842
843/*
844 * helper function, prepares a new accept stream.
845 */
846static inline int
847accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st,
848	struct tle_tcp_stream *cs, const union tsopt *to,
849	uint32_t tms, const union pkt_info *pi, const union seg_info *si)
850{
851	int32_t rc;
852	uint32_t rtt;
853
854	/* some TX still pending for that stream. */
855	if (TCP_STREAM_TX_PENDING(cs))
856		return -EAGAIN;
857
858	/* setup L4 ports and L3 addresses fields. */
859	cs->s.port.raw = pi->port.raw;
860	cs->s.pmsk.raw = UINT32_MAX;
861
862	if (pi->tf.type == TLE_V4) {
863		cs->s.ipv4.addr = pi->addr4;
864		cs->s.ipv4.mask.src = INADDR_NONE;
865		cs->s.ipv4.mask.dst = INADDR_NONE;
866	} else if (pi->tf.type == TLE_V6) {
867		cs->s.ipv6.addr = *pi->addr6;
868		rte_memcpy(&cs->s.ipv6.mask.src, &tle_ipv6_none,
869			sizeof(cs->s.ipv6.mask.src));
870		rte_memcpy(&cs->s.ipv6.mask.dst, &tle_ipv6_none,
871			sizeof(cs->s.ipv6.mask.dst));
872	}
873
874	/* setup TCB */
875	sync_fill_tcb(&cs->tcb, si, to);
876	cs->tcb.rcv.wnd = calc_rx_wnd(cs, cs->tcb.rcv.wscale);
877
878	/*
879	 * estimate the rto
880	 * for now rtt is calculated based on the tcp TMS option,
881	 * later add real-time one
882	 */
883	if (cs->tcb.so.ts.ecr) {
884		rtt = tms - cs->tcb.so.ts.ecr;
885		rto_estimate(&cs->tcb, rtt);
886	} else
887		cs->tcb.snd.rto = TCP_RTO_DEFAULT;
888
889	/* copy streams type & flags. */
890	cs->s.type = ps->s.type;
891	cs->flags = ps->flags;
892
893	/* retrive and cache destination information. */
894	rc = stream_fill_dest(cs);
895	if (rc != 0)
896		return rc;
897
898	/* update snd.mss with SMSS value */
899	cs->tcb.snd.mss = calc_smss(cs->tcb.snd.mss, &cs->tx.dst);
900
901	/* setup congestion variables */
902	cs->tcb.snd.cwnd = initial_cwnd(cs->tcb.snd.mss, ps->tcb.snd.cwnd);
903	cs->tcb.snd.ssthresh = cs->tcb.snd.wnd;
904	cs->tcb.snd.rto_tw = ps->tcb.snd.rto_tw;
905
906	cs->tcb.state = TCP_ST_ESTABLISHED;
907
908	/* add stream to the table */
909	cs->ste = stbl_add_stream(st, pi, cs);
910	if (cs->ste == NULL)
911		return -ENOBUFS;
912
913	cs->tcb.uop |= TCP_OP_ACCEPT;
914	tcp_stream_up(cs);
915	return 0;
916}
917
918
919/*
920 * ACK for new connection request arrived.
921 * Check that the packet meets all conditions and try to open a new stream.
922 * returns:
923 * < 0  - invalid packet
924 * == 0 - packet is valid and new stream was opened for it.
925 * > 0  - packet is valid, but failed to open new stream.
926 */
927static inline int
928rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st,
929	const union pkt_info *pi, union seg_info *si,
930	uint32_t tms, struct rte_mbuf *mb, struct tle_tcp_stream **csp)
931{
932	int32_t rc;
933	struct tle_ctx *ctx;
934	struct tle_stream *ts;
935	struct tle_tcp_stream *cs;
936	union tsopt to;
937
938	*csp = NULL;
939
940	if (pi->tf.flags != TCP_FLAG_ACK || rx_check_stream(s, pi) != 0)
941		return -EINVAL;
942
943	ctx = s->s.ctx;
944	rc = restore_syn_opt(si, &to, pi, tms, mb, ctx->prm.hash_alg,
945				&ctx->prm.secret_key);
946	if (rc < 0)
947		return rc;
948
949	/* allocate new stream */
950	ts = get_stream(ctx);
951	cs = TCP_STREAM(ts);
952	if (ts == NULL)
953		return ENFILE;
954
955	/* prepare stream to handle new connection */
956	if (accept_prep_stream(s, st, cs, &to, tms, pi, si) == 0) {
957
958		/* put new stream in the accept queue */
959		if (_rte_ring_enqueue_burst(s->rx.q,
960				(void * const *)&ts, 1) == 1) {
961			*csp = cs;
962			return 0;
963		}
964
965		/* cleanup on failure */
966		tcp_stream_down(cs);
967		stbl_del_stream(st, cs->ste, cs, 0);
968		cs->ste = NULL;
969	}
970
971	tcp_stream_reset(ctx, cs);
972	return ENOBUFS;
973}
974
975static inline int
976data_pkt_adjust(const struct tcb *tcb, struct rte_mbuf *mb, uint32_t hlen,
977	uint32_t *seqn, uint32_t *plen)
978{
979	uint32_t len, n, seq;
980
981	seq = *seqn;
982	len = *plen;
983
984	rte_pktmbuf_adj(mb, hlen);
985	if (len == 0)
986		return -ENODATA;
987	/* cut off the start of the packet */
988	else if (tcp_seq_lt(seq, tcb->rcv.nxt)) {
989		n = tcb->rcv.nxt - seq;
990		if (n >= len)
991			return -ENODATA;
992
993		rte_pktmbuf_adj(mb, n);
994		*seqn = seq + n;
995		*plen = len - n;
996	}
997
998	return 0;
999}
1000
1001static inline uint32_t
1002rx_ackdata(struct tle_tcp_stream *s, uint32_t ack)
1003{
1004	uint32_t k, n;
1005
1006	n = ack - (uint32_t)s->tcb.snd.una;
1007
1008	/* some more data was acked. */
1009	if (n != 0) {
1010
1011		/* advance SND.UNA and free related packets. */
1012		k = rte_ring_free_count(s->tx.q);
1013		free_una_data(s, n);
1014
1015		/* mark the stream as available for writing */
1016		if (rte_ring_free_count(s->tx.q) != 0) {
1017			if (s->tx.ev != NULL)
1018				tle_event_raise(s->tx.ev);
1019			else if (k == 0 && s->tx.cb.func != NULL)
1020				s->tx.cb.func(s->tx.cb.data, &s->s);
1021		}
1022	}
1023
1024	return n;
1025}
1026
1027static void
1028stream_timewait(struct tle_tcp_stream *s, uint32_t rto)
1029{
1030	if (rto != 0) {
1031		s->tcb.state = TCP_ST_TIME_WAIT;
1032		s->tcb.snd.rto = rto;
1033		timer_reset(s);
1034	} else
1035		stream_term(s);
1036}
1037
1038static void
1039rx_fin_state(struct tle_tcp_stream *s, struct resp_info *rsp)
1040{
1041	uint32_t state;
1042	int32_t ackfin;
1043
1044	s->tcb.rcv.nxt += 1;
1045
1046	ackfin = (s->tcb.snd.una == s->tcb.snd.fss);
1047	state = s->tcb.state;
1048
1049	if (state == TCP_ST_ESTABLISHED) {
1050		s->tcb.state = TCP_ST_CLOSE_WAIT;
1051		/* raise err.ev & err.cb */
1052		if (s->err.ev != NULL)
1053			tle_event_raise(s->err.ev);
1054		else if (s->err.cb.func != NULL)
1055			s->err.cb.func(s->err.cb.data, &s->s);
1056	} else if (state == TCP_ST_FIN_WAIT_1 || state == TCP_ST_CLOSING) {
1057		rsp->flags |= TCP_FLAG_ACK;
1058		if (ackfin != 0)
1059			stream_timewait(s, s->tcb.snd.rto_tw);
1060		else
1061			s->tcb.state = TCP_ST_CLOSING;
1062	} else if (state == TCP_ST_FIN_WAIT_2) {
1063		rsp->flags |= TCP_FLAG_ACK;
1064		stream_timewait(s, s->tcb.snd.rto_tw);
1065	} else if (state == TCP_ST_LAST_ACK && ackfin != 0) {
1066		stream_term(s);
1067	}
1068}
1069
1070/*
1071 * FIN process for ESTABLISHED state
1072 * returns:
1073 * 0 < - error occurred
1074 * 0 - FIN was processed OK, and mbuf can be free/reused.
1075 * 0 > - FIN was processed OK and mbuf can't be free/reused.
1076 */
1077static inline int
1078rx_fin(struct tle_tcp_stream *s, uint32_t state,
1079	const union seg_info *si, struct rte_mbuf *mb,
1080	struct resp_info *rsp)
1081{
1082	uint32_t hlen, plen, seq;
1083	int32_t ret;
1084	union tsopt ts;
1085
1086	hlen = PKT_L234_HLEN(mb);
1087	plen = mb->pkt_len - hlen;
1088	seq = si->seq;
1089
1090	ts = rx_tms_opt(&s->tcb, mb);
1091	ret = rx_check_seqack(&s->tcb, seq, si->ack, plen, ts);
1092	if (ret != 0)
1093		return ret;
1094
1095	if (state < TCP_ST_ESTABLISHED)
1096		return -EINVAL;
1097
1098	if (plen != 0) {
1099
1100		ret = data_pkt_adjust(&s->tcb, mb, hlen, &seq, &plen);
1101		if (ret != 0)
1102			return ret;
1103		if (rx_data_enqueue(s, seq, plen, &mb, 1) != 1)
1104			return -ENOBUFS;
1105	}
1106
1107	/*
1108	 * fast-path: all data & FIN was already sent out
1109	 * and now is acknowledged.
1110	 */
1111	if (s->tcb.snd.fss == s->tcb.snd.nxt &&
1112			si->ack == (uint32_t)s->tcb.snd.nxt) {
1113		s->tcb.snd.una = s->tcb.snd.fss;
1114		empty_tq(s);
1115	/* conventional ACK processiing */
1116	} else
1117		rx_ackdata(s, si->ack);
1118
1119	/* some fragments still missing */
1120	if (seq + plen != s->tcb.rcv.nxt) {
1121		s->tcb.rcv.frs.seq = seq + plen;
1122		s->tcb.rcv.frs.on = 1;
1123	} else
1124		rx_fin_state(s, rsp);
1125
1126	return plen;
1127}
1128
1129static inline int
1130rx_rst(struct tle_tcp_stream *s, uint32_t state, uint32_t flags,
1131	const union seg_info *si)
1132{
1133	int32_t rc;
1134
1135	/*
1136	 * RFC 793: In all states except SYN-SENT, all reset (RST) segments
1137	 * are validated by checking their SEQ-fields.
1138	 * A reset is valid if its sequence number is in the window.
1139	 * In the SYN-SENT state (a RST received in response to an initial SYN),
1140	 * the RST is acceptable if the ACK field acknowledges the SYN.
1141	 */
1142	if (state == TCP_ST_SYN_SENT) {
1143		rc = ((flags & TCP_FLAG_ACK) == 0 ||
1144				si->ack != s->tcb.snd.nxt) ?
1145			-ERANGE : 0;
1146	}
1147
1148	else
1149		rc = check_seqn(&s->tcb, si->seq, 0);
1150
1151	if (rc == 0)
1152		stream_term(s);
1153
1154	return rc;
1155}
1156
1157/*
1158 *  check do we have FIN  that was received out-of-order.
1159 *  if yes, try to process it now.
1160 */
1161static inline void
1162rx_ofo_fin(struct tle_tcp_stream *s, struct resp_info *rsp)
1163{
1164	if (s->tcb.rcv.frs.on != 0 && s->tcb.rcv.nxt == s->tcb.rcv.frs.seq)
1165		rx_fin_state(s, rsp);
1166}
1167
1168static inline void
1169dack_info_init(struct dack_info *tack, const struct tcb *tcb)
1170{
1171	static const struct dack_info zero_dack;
1172
1173	tack[0] = zero_dack;
1174	tack->ack = tcb->snd.una;
1175	tack->segs.dup = tcb->rcv.dupack;
1176	tack->wu.raw = tcb->snd.wu.raw;
1177	tack->wnd = tcb->snd.wnd >> tcb->snd.wscale;
1178}
1179
1180static inline void
1181ack_window_update(struct tcb *tcb, const struct dack_info *tack)
1182{
1183	tcb->snd.wu.raw = tack->wu.raw;
1184	tcb->snd.wnd = tack->wnd << tcb->snd.wscale;
1185}
1186
1187static inline void
1188ack_cwnd_update(struct tcb *tcb, uint32_t acked, const struct dack_info *tack)
1189{
1190	uint32_t n;
1191
1192	n = tack->segs.ack * tcb->snd.mss;
1193
1194	/* slow start phase, RFC 5681 3.1 (2)  */
1195	if (tcb->snd.cwnd < tcb->snd.ssthresh)
1196		tcb->snd.cwnd += RTE_MIN(acked, n);
1197	/* congestion avoidance phase, RFC 5681 3.1 (3) */
1198	else
1199		tcb->snd.cwnd += RTE_MAX(1U, n * tcb->snd.mss / tcb->snd.cwnd);
1200}
1201
1202static inline void
1203rto_ssthresh_update(struct tcb *tcb)
1204{
1205	uint32_t k, n;
1206
1207	/* RFC 5681 3.1 (4)  */
1208	n = (tcb->snd.nxt - tcb->snd.una) / 2;
1209	k = 2 * tcb->snd.mss;
1210	tcb->snd.ssthresh = RTE_MAX(n, k);
1211}
1212
1213static inline void
1214rto_cwnd_update(struct tcb *tcb)
1215{
1216
1217	if (tcb->snd.nb_retx == 0)
1218		rto_ssthresh_update(tcb);
1219
1220	/*
1221	 * RFC 5681 3.1: upon a timeout cwnd MUST be set to
1222	 * no more than 1 full-sized segment.
1223	 */
1224	tcb->snd.cwnd = tcb->snd.mss;
1225}
1226
1227static inline void
1228ack_info_update(struct dack_info *tack, const union seg_info *si,
1229	int32_t badseq, uint32_t dlen, const union tsopt ts)
1230{
1231	if (badseq != 0) {
1232		tack->segs.badseq++;
1233		return;
1234	}
1235
1236	/* segnt with incoming data */
1237	tack->segs.data += (dlen != 0);
1238
1239	/* segment with newly acked data */
1240	if (tcp_seq_lt(tack->ack, si->ack)) {
1241		tack->segs.dup = 0;
1242		tack->segs.ack++;
1243		tack->ack = si->ack;
1244		tack->ts = ts;
1245
1246	/*
1247	 * RFC 5681: An acknowledgment is considered a "duplicate" when:
1248	 * (a) the receiver of the ACK has outstanding data
1249	 * (b) the incoming acknowledgment carries no data
1250	 * (c) the SYN and FIN bits are both off
1251	 * (d) the acknowledgment number is equal to the TCP.UNA
1252	 * (e) the advertised window in the incoming acknowledgment equals the
1253	 * advertised window in the last incoming acknowledgment.
1254	 *
1255	 * Here will have only to check only for (b),(d),(e).
1256	 * (a) will be checked later for the whole bulk of packets,
1257	 * (c) should never happen here.
1258	 */
1259	} else if (dlen == 0 && si->wnd == tack->wnd && ++tack->segs.dup == 3) {
1260		tack->dup3.seg = tack->segs.ack + 1;
1261		tack->dup3.ack = tack->ack;
1262	}
1263
1264	/*
1265	 * RFC 793:
1266	 * If SND.UNA < SEG.ACK =< SND.NXT, the send window should be
1267	 * updated.  If (SND.WL1 < SEG.SEQ or (SND.WL1 = SEG.SEQ and
1268	 * SND.WL2 =< SEG.ACK)), set SND.WND <- SEG.WND, set
1269	 * SND.WL1 <- SEG.SEQ, and set SND.WL2 <- SEG.ACK.
1270	 */
1271	if (tcp_seq_lt(tack->wu.wl1, si->seq) ||
1272			(si->seq == tack->wu.wl1 &&
1273			tcp_seq_leq(tack->wu.wl2, si->ack))) {
1274
1275		tack->wu.wl1 = si->seq;
1276		tack->wu.wl2 = si->ack;
1277		tack->wnd = si->wnd;
1278	}
1279}
1280
1281static inline uint32_t
1282rx_data_ack(struct tle_tcp_stream *s, struct dack_info *tack,
1283	const union seg_info si[], struct rte_mbuf *mb[], struct rte_mbuf *rp[],
1284	int32_t rc[], uint32_t num)
1285{
1286	uint32_t i, j, k, n, t;
1287	uint32_t hlen, plen, seq, tlen;
1288	int32_t ret;
1289	union tsopt ts;
1290
1291	k = 0;
1292	for (i = 0; i != num; i = j) {
1293
1294		hlen = PKT_L234_HLEN(mb[i]);
1295		plen = mb[i]->pkt_len - hlen;
1296		seq = si[i].seq;
1297
1298		ts = rx_tms_opt(&s->tcb, mb[i]);
1299		ret = rx_check_seqack(&s->tcb, seq, si[i].ack, plen, ts);
1300
1301		/* account segment received */
1302		ack_info_update(tack, &si[i], ret != 0, plen, ts);
1303
1304		if (ret == 0) {
1305			/* skip duplicate data, if any */
1306			ret = data_pkt_adjust(&s->tcb, mb[i], hlen,
1307				&seq, &plen);
1308		}
1309
1310		j = i + 1;
1311		if (ret != 0) {
1312			rp[k] = mb[i];
1313			rc[k] = -ret;
1314			k++;
1315			continue;
1316		}
1317
1318		/* group sequential packets together. */
1319		for (tlen = plen; j != num; tlen += plen, j++) {
1320
1321			hlen = PKT_L234_HLEN(mb[j]);
1322			plen = mb[j]->pkt_len - hlen;
1323
1324			/* not consecutive packet */
1325			if (plen == 0 || seq + tlen != si[j].seq)
1326				break;
1327
1328			/* check SEQ/ACK */
1329			ts = rx_tms_opt(&s->tcb, mb[j]);
1330			ret = rx_check_seqack(&s->tcb, si[j].seq, si[j].ack,
1331				plen, ts);
1332
1333			if (ret != 0)
1334				break;
1335
1336			/* account for segment received */
1337			ack_info_update(tack, &si[j], ret != 0, plen, ts);
1338
1339			rte_pktmbuf_adj(mb[j], hlen);
1340		}
1341
1342		n = j - i;
1343
1344		/* account for OFO data */
1345		if (seq != s->tcb.rcv.nxt)
1346			tack->segs.ofo += n;
1347
1348		/* enqueue packets */
1349		t = rx_data_enqueue(s, seq, tlen, mb + i, n);
1350
1351		/* if we are out of space in stream recv buffer. */
1352		for (; t != n; t++) {
1353			rp[k] = mb[i + t];
1354			rc[k] = -ENOBUFS;
1355			k++;
1356		}
1357	}
1358
1359	return num - k;
1360}
1361
1362static inline void
1363start_fast_retransmit(struct tle_tcp_stream *s)
1364{
1365	struct tcb *tcb;
1366
1367	tcb = &s->tcb;
1368
1369	/* RFC 6582 3.2.2 */
1370	tcb->snd.rcvr = tcb->snd.nxt;
1371	tcb->snd.fastack = 1;
1372
1373	/* RFC 5681 3.2.2 */
1374	rto_ssthresh_update(tcb);
1375
1376	/* RFC 5681 3.2.3 */
1377	tcp_txq_rst_nxt_head(s);
1378	tcb->snd.nxt = tcb->snd.una;
1379	tcb->snd.cwnd = tcb->snd.ssthresh + 3 * tcb->snd.mss;
1380}
1381
1382static inline void
1383stop_fast_retransmit(struct tle_tcp_stream *s)
1384{
1385	struct tcb *tcb;
1386	uint32_t n;
1387
1388	tcb = &s->tcb;
1389	n = tcb->snd.nxt - tcb->snd.una;
1390	tcb->snd.cwnd = RTE_MIN(tcb->snd.ssthresh,
1391		RTE_MAX(n, tcb->snd.mss) + tcb->snd.mss);
1392	tcb->snd.fastack = 0;
1393}
1394
1395static inline int
1396in_fast_retransmit(struct tle_tcp_stream *s, uint32_t ack_len, uint32_t ack_num,
1397	uint32_t dup_num)
1398{
1399	uint32_t n;
1400	struct tcb *tcb;
1401
1402	tcb = &s->tcb;
1403
1404	/* RFC 5682 3.2.3 partial ACK */
1405	if (ack_len != 0) {
1406
1407		n = ack_num * tcb->snd.mss;
1408		if (ack_len >= n)
1409			tcb->snd.cwnd -= ack_len - n;
1410		else
1411			tcb->snd.cwnd -= ack_len % tcb->snd.mss;
1412
1413		/*
1414		 * For the first partial ACK that arrives
1415		 * during fast recovery, also reset the
1416		 * retransmit timer.
1417		 */
1418		if (tcb->snd.fastack == 1)
1419			timer_reset(s);
1420
1421		tcb->snd.fastack += ack_num;
1422		return 1;
1423
1424	/* RFC 5681 3.2.4 */
1425	} else if (dup_num > 3) {
1426		s->tcb.snd.cwnd += (dup_num - 3) * tcb->snd.mss;
1427		return 1;
1428	}
1429
1430	return 0;
1431}
1432
1433static inline int
1434process_ack(struct tle_tcp_stream *s, uint32_t acked,
1435	const struct dack_info *tack)
1436{
1437	int32_t send;
1438
1439	send = 0;
1440
1441	/* normal mode */
1442	if (s->tcb.snd.fastack == 0) {
1443
1444		send = 1;
1445
1446		/* RFC 6582 3.2.2 switch to fast retransmit mode */
1447		if (tack->dup3.seg != 0 && s->tcb.snd.una != s->tcb.snd.nxt &&
1448				s->tcb.snd.una >= s->tcb.snd.rcvr) {
1449
1450			start_fast_retransmit(s);
1451			in_fast_retransmit(s,
1452				tack->ack - tack->dup3.ack,
1453				tack->segs.ack - tack->dup3.seg - 1,
1454				tack->segs.dup);
1455
1456		/* remain in normal mode */
1457		} else if (acked != 0) {
1458			ack_cwnd_update(&s->tcb, acked, tack);
1459			timer_stop(s);
1460		}
1461
1462	/* fast retransmit mode */
1463	} else {
1464
1465		/* remain in fast retransmit mode */
1466		if (s->tcb.snd.una < s->tcb.snd.rcvr) {
1467
1468			send = in_fast_retransmit(s, acked, tack->segs.ack,
1469				tack->segs.dup);
1470		} else {
1471			/* RFC 5682 3.2.3 full ACK */
1472			stop_fast_retransmit(s);
1473			timer_stop(s);
1474
1475			/* if we have another series of dup ACKs */
1476			if (tack->dup3.seg != 0 &&
1477					s->tcb.snd.una != s->tcb.snd.nxt &&
1478					tcp_seq_leq((uint32_t)s->tcb.snd.rcvr,
1479					tack->dup3.ack)) {
1480
1481				/* restart fast retransmit again. */
1482				start_fast_retransmit(s);
1483				send = in_fast_retransmit(s,
1484					tack->ack - tack->dup3.ack,
1485					tack->segs.ack - tack->dup3.seg - 1,
1486					tack->segs.dup);
1487			}
1488		}
1489	}
1490
1491	return send;
1492}
1493
1494/*
1495 * our FIN was acked, stop rto timer, change stream state,
1496 * and possibly close the stream.
1497 */
1498static inline void
1499rx_ackfin(struct tle_tcp_stream *s)
1500{
1501	uint32_t state;
1502
1503	s->tcb.snd.una = s->tcb.snd.fss;
1504	empty_tq(s);
1505
1506	state = s->tcb.state;
1507	if (state == TCP_ST_LAST_ACK)
1508		stream_term(s);
1509	else if (state == TCP_ST_FIN_WAIT_1) {
1510		timer_stop(s);
1511		s->tcb.state = TCP_ST_FIN_WAIT_2;
1512	} else if (state == TCP_ST_CLOSING) {
1513		stream_timewait(s, s->tcb.snd.rto_tw);
1514	}
1515}
1516
1517static inline void
1518rx_process_ack(struct tle_tcp_stream *s, uint32_t ts,
1519	const struct dack_info *tack)
1520{
1521	int32_t send;
1522	uint32_t n;
1523
1524	s->tcb.rcv.dupack = tack->segs.dup;
1525
1526	n = rx_ackdata(s, tack->ack);
1527	send = process_ack(s, n, tack);
1528
1529	/* try to send more data. */
1530	if ((n != 0 || send != 0) && tcp_txq_nxt_cnt(s) != 0)
1531		txs_enqueue(s->s.ctx, s);
1532
1533	/* restart RTO timer. */
1534	if (s->tcb.snd.nxt != s->tcb.snd.una)
1535		timer_start(s);
1536
1537	/* update rto, if fresh packet is here then calculate rtt */
1538	if (tack->ts.ecr != 0)
1539		rto_estimate(&s->tcb, ts - tack->ts.ecr);
1540}
1541
1542/*
1543 * process <SYN,ACK>
1544 * returns negative value on failure, or zero on success.
1545 */
1546static inline int
1547rx_synack(struct tle_tcp_stream *s, uint32_t ts, uint32_t state,
1548	const union seg_info *si, struct rte_mbuf *mb,
1549	struct resp_info *rsp)
1550{
1551	struct syn_opts so;
1552	struct tcp_hdr *th;
1553
1554	if (state != TCP_ST_SYN_SENT)
1555		return -EINVAL;
1556
1557	/* invalid SEG.SEQ */
1558	if (si->ack != (uint32_t)s->tcb.snd.nxt) {
1559		rsp->flags = TCP_FLAG_RST;
1560		return 0;
1561	}
1562
1563	th = rte_pktmbuf_mtod_offset(mb, struct tcp_hdr *,
1564		mb->l2_len + mb->l3_len);
1565	get_syn_opts(&so, (uintptr_t)(th + 1), mb->l4_len - sizeof(*th));
1566
1567	s->tcb.so = so;
1568
1569	s->tcb.snd.una = s->tcb.snd.nxt;
1570	s->tcb.snd.mss = calc_smss(so.mss, &s->tx.dst);
1571	s->tcb.snd.wnd = si->wnd << so.wscale;
1572	s->tcb.snd.wu.wl1 = si->seq;
1573	s->tcb.snd.wu.wl2 = si->ack;
1574	s->tcb.snd.wscale = so.wscale;
1575
1576	/* setup congestion variables */
1577	s->tcb.snd.cwnd = initial_cwnd(s->tcb.snd.mss, s->tcb.snd.cwnd);
1578	s->tcb.snd.ssthresh = s->tcb.snd.wnd;
1579
1580	s->tcb.rcv.ts = so.ts.val;
1581	s->tcb.rcv.irs = si->seq;
1582	s->tcb.rcv.nxt = si->seq + 1;
1583
1584	/* if peer doesn't support WSCALE opt, recalculate RCV.WND */
1585	s->tcb.rcv.wscale = (so.wscale == TCP_WSCALE_NONE) ?
1586		TCP_WSCALE_NONE : TCP_WSCALE_DEFAULT;
1587	s->tcb.rcv.wnd = calc_rx_wnd(s, s->tcb.rcv.wscale);
1588
1589	/* calculate initial rto */
1590	rto_estimate(&s->tcb, ts - s->tcb.snd.ts);
1591
1592	rsp->flags |= TCP_FLAG_ACK;
1593
1594	timer_stop(s);
1595	s->tcb.state = TCP_ST_ESTABLISHED;
1596	rte_smp_wmb();
1597
1598	if (s->tx.ev != NULL)
1599		tle_event_raise(s->tx.ev);
1600	else if (s->tx.cb.func != NULL)
1601		s->tx.cb.func(s->tx.cb.data, &s->s);
1602
1603	return 0;
1604}
1605
1606static inline uint32_t
1607rx_stream(struct tle_tcp_stream *s, uint32_t ts,
1608	const union pkt_info *pi, const union seg_info si[],
1609	struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1610	uint32_t num)
1611{
1612	uint32_t i, k, n, state;
1613	int32_t ret;
1614	struct resp_info rsp;
1615	struct dack_info tack;
1616
1617	k = 0;
1618	rsp.flags = 0;
1619
1620	state = s->tcb.state;
1621
1622	/*
1623	 * first check for the states/flags where we don't
1624	 * expect groups of packets.
1625	 */
1626
1627	/* process RST */
1628	if ((pi->tf.flags & TCP_FLAG_RST) != 0) {
1629		for (i = 0;
1630				i != num &&
1631				rx_rst(s, state, pi->tf.flags, &si[i]);
1632				i++)
1633			;
1634		i = 0;
1635
1636	/* RFC 793: if the ACK bit is off drop the segment and return */
1637	} else if ((pi->tf.flags & TCP_FLAG_ACK) == 0) {
1638		i = 0;
1639	/*
1640	 * first check for the states/flags where we don't
1641	 * expect groups of packets.
1642	 */
1643
1644	/* process <SYN,ACK> */
1645	} else if ((pi->tf.flags & TCP_FLAG_SYN) != 0) {
1646		for (i = 0; i != num; i++) {
1647			ret = rx_synack(s, ts, state, &si[i], mb[i], &rsp);
1648			if (ret == 0)
1649				break;
1650
1651			rc[k] = -ret;
1652			rp[k] = mb[i];
1653			k++;
1654		}
1655
1656	/* process FIN */
1657	} else if ((pi->tf.flags & TCP_FLAG_FIN) != 0) {
1658		ret = 0;
1659		for (i = 0; i != num; i++) {
1660			ret = rx_fin(s, state, &si[i], mb[i], &rsp);
1661			if (ret >= 0)
1662				break;
1663
1664			rc[k] = -ret;
1665			rp[k] = mb[i];
1666			k++;
1667		}
1668		i += (ret > 0);
1669
1670	/* normal data/ack packets */
1671	} else if (state >= TCP_ST_ESTABLISHED && state <= TCP_ST_LAST_ACK) {
1672
1673		/* process incoming data packets. */
1674		dack_info_init(&tack, &s->tcb);
1675		n = rx_data_ack(s, &tack, si, mb, rp, rc, num);
1676
1677		/* follow up actions based on aggregated information */
1678
1679		/* update SND.WND */
1680		ack_window_update(&s->tcb, &tack);
1681
1682		/*
1683		 * fast-path: all data & FIN was already sent out
1684		 * and now is acknowledged.
1685		 */
1686		if (s->tcb.snd.fss == s->tcb.snd.nxt &&
1687				tack.ack == (uint32_t)s->tcb.snd.nxt)
1688			rx_ackfin(s);
1689		else
1690			rx_process_ack(s, ts, &tack);
1691
1692		/*
1693		 * send an immediate ACK if either:
1694		 * - received segment with invalid seq/ack number
1695		 * - received segment with OFO data
1696		 * - received segment with INO data and no TX is scheduled
1697		 *   for that stream.
1698		 */
1699		if (tack.segs.badseq != 0 || tack.segs.ofo != 0 ||
1700				(tack.segs.data != 0 &&
1701				rte_atomic32_read(&s->tx.arm) == 0))
1702			rsp.flags |= TCP_FLAG_ACK;
1703
1704		rx_ofo_fin(s, &rsp);
1705
1706		k += num - n;
1707		i = num;
1708
1709	/* unhandled state, drop all packets. */
1710	} else
1711		i = 0;
1712
1713	/* we have a response packet to send. */
1714	if (rsp.flags == TCP_FLAG_RST) {
1715		send_rst(s, si[i].ack);
1716		stream_term(s);
1717	} else if (rsp.flags != 0) {
1718		send_ack(s, ts, rsp.flags);
1719
1720		/* start the timer for FIN packet */
1721		if ((rsp.flags & TCP_FLAG_FIN) != 0)
1722			timer_reset(s);
1723	}
1724
1725	/* unprocessed packets */
1726	for (; i != num; i++, k++) {
1727		rc[k] = ENODATA;
1728		rp[k] = mb[i];
1729	}
1730
1731	return num - k;
1732}
1733
1734static inline uint32_t
1735rx_new_stream(struct tle_tcp_stream *s, uint32_t ts,
1736	const union pkt_info *pi, const union seg_info si[],
1737	struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1738	uint32_t num)
1739{
1740	uint32_t i;
1741
1742	if (tcp_stream_acquire(s) > 0) {
1743		i = rx_stream(s, ts, pi, si, mb, rp, rc, num);
1744		tcp_stream_release(s);
1745		return i;
1746	}
1747
1748	for (i = 0; i != num; i++) {
1749		rc[i] = ENOENT;
1750		rp[i] = mb[i];
1751	}
1752	return 0;
1753}
1754
1755static inline uint32_t
1756rx_postsyn(struct tle_dev *dev, struct stbl *st, uint32_t type, uint32_t ts,
1757	const union pkt_info pi[], union seg_info si[],
1758	struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1759	uint32_t num)
1760{
1761	struct tle_tcp_stream *cs, *s;
1762	uint32_t i, k, n, state;
1763	int32_t ret;
1764
1765	s = rx_obtain_stream(dev, st, &pi[0], type);
1766	if (s == NULL) {
1767		for (i = 0; i != num; i++) {
1768			rc[i] = ENOENT;
1769			rp[i] = mb[i];
1770		}
1771		return 0;
1772	}
1773
1774	k = 0;
1775	state = s->tcb.state;
1776
1777	if (state == TCP_ST_LISTEN) {
1778
1779		/* one connection per flow */
1780		cs = NULL;
1781		ret = -EINVAL;
1782		for (i = 0; i != num; i++) {
1783
1784			ret = rx_ack_listen(s, st, pi, &si[i], ts, mb[i], &cs);
1785
1786			/* valid packet encountered */
1787			if (ret >= 0)
1788				break;
1789
1790			/* invalid packet, keep trying to find a proper one */
1791			rc[k] = -ret;
1792			rp[k] = mb[i];
1793			k++;
1794		}
1795
1796		/* packet is valid, but we are out of streams to serve it */
1797		if (ret > 0) {
1798			for (; i != num; i++, k++) {
1799				rc[k] = ret;
1800				rp[k] = mb[i];
1801			}
1802		/* new stream is accepted */
1803		} else if (ret == 0) {
1804
1805			/* inform listen stream about new connections */
1806			if (s->rx.ev != NULL)
1807				tle_event_raise(s->rx.ev);
1808			else if (s->rx.cb.func != NULL &&
1809					rte_ring_count(s->rx.q) == 1)
1810				s->rx.cb.func(s->rx.cb.data, &s->s);
1811
1812			/* if there is no data, drop current packet */
1813			if (PKT_L4_PLEN(mb[i]) == 0) {
1814				rc[k] = ENODATA;
1815				rp[k++] = mb[i++];
1816			}
1817
1818			/*  process remaining packets for that stream */
1819			if (num != i) {
1820				n = rx_new_stream(cs, ts, pi + i, si + i,
1821					mb + i, rp + k, rc + k, num - i);
1822				k += num - n - i;
1823			}
1824		}
1825
1826	} else {
1827		i = rx_stream(s, ts, pi, si, mb, rp, rc, num);
1828		k = num - i;
1829	}
1830
1831	tcp_stream_release(s);
1832	return num - k;
1833}
1834
1835
1836static inline uint32_t
1837rx_syn(struct tle_dev *dev, uint32_t type, uint32_t ts,
1838	const union pkt_info pi[], const union seg_info si[],
1839	struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1840	uint32_t num)
1841{
1842	struct tle_tcp_stream *s;
1843	uint32_t i, k;
1844	int32_t ret;
1845
1846	s = rx_obtain_listen_stream(dev, &pi[0], type);
1847	if (s == NULL) {
1848		for (i = 0; i != num; i++) {
1849			rc[i] = ENOENT;
1850			rp[i] = mb[i];
1851		}
1852		return 0;
1853	}
1854
1855	k = 0;
1856	for (i = 0; i != num; i++) {
1857
1858		/* check that this remote is allowed to connect */
1859		if (rx_check_stream(s, &pi[i]) != 0)
1860			ret = -ENOENT;
1861		else
1862			/* syncokie: reply with <SYN,ACK> */
1863			ret = sync_ack(s, &pi[i], &si[i], ts, mb[i]);
1864
1865		if (ret != 0) {
1866			rc[k] = -ret;
1867			rp[k] = mb[i];
1868			k++;
1869		}
1870	}
1871
1872	tcp_stream_release(s);
1873	return num - k;
1874}
1875
1876uint16_t
1877tle_tcp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[],
1878	struct rte_mbuf *rp[], int32_t rc[], uint16_t num)
1879{
1880	struct stbl *st;
1881	struct tle_ctx *ctx;
1882	uint32_t i, j, k, mt, n, t, ts;
1883	uint64_t csf;
1884	union pkt_info pi[num];
1885	union seg_info si[num];
1886	union {
1887		uint8_t t[TLE_VNUM];
1888		uint32_t raw;
1889	} stu;
1890
1891	ctx = dev->ctx;
1892	ts = tcp_get_tms(ctx->cycles_ms_shift);
1893	st = CTX_TCP_STLB(ctx);
1894	mt = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0);
1895
1896	stu.raw = 0;
1897
1898	/* extract packet info and check the L3/L4 csums */
1899	for (i = 0; i != num; i++) {
1900
1901		get_pkt_info(pkt[i], &pi[i], &si[i]);
1902
1903		t = pi[i].tf.type;
1904		csf = dev->rx.ol_flags[t] &
1905			(PKT_RX_IP_CKSUM_BAD | PKT_RX_L4_CKSUM_BAD);
1906
1907		/* check csums in SW */
1908		if (pi[i].csf == 0 && csf != 0 && check_pkt_csum(pkt[i], csf,
1909				pi[i].tf.type, IPPROTO_TCP) != 0)
1910			pi[i].csf = csf;
1911
1912		stu.t[t] = mt;
1913	}
1914
1915	if (stu.t[TLE_V4] != 0)
1916		stbl_lock(st, TLE_V4);
1917	if (stu.t[TLE_V6] != 0)
1918		stbl_lock(st, TLE_V6);
1919
1920	k = 0;
1921	for (i = 0; i != num; i += j) {
1922
1923		t = pi[i].tf.type;
1924
1925		/*basic checks for incoming packet */
1926		if (t >= TLE_VNUM || pi[i].csf != 0 || dev->dp[t] == NULL) {
1927			rc[k] = EINVAL;
1928			rp[k] = pkt[i];
1929			j = 1;
1930			k++;
1931		/* process input SYN packets */
1932		} else if (pi[i].tf.flags == TCP_FLAG_SYN) {
1933			j = pkt_info_bulk_syneq(pi + i, num - i);
1934			n = rx_syn(dev, t, ts, pi + i, si + i, pkt + i,
1935				rp + k, rc + k, j);
1936			k += j - n;
1937		} else {
1938			j = pkt_info_bulk_eq(pi + i, num - i);
1939			n = rx_postsyn(dev, st, t, ts, pi + i, si + i, pkt + i,
1940				rp + k, rc + k, j);
1941			k += j - n;
1942		}
1943	}
1944
1945	if (stu.t[TLE_V4] != 0)
1946		stbl_unlock(st, TLE_V4);
1947	if (stu.t[TLE_V6] != 0)
1948		stbl_unlock(st, TLE_V6);
1949
1950	return num - k;
1951}
1952
1953uint16_t
1954tle_tcp_stream_accept(struct tle_stream *ts, struct tle_stream *rs[],
1955	uint32_t num)
1956{
1957	uint32_t n;
1958	struct tle_tcp_stream *s;
1959
1960	s = TCP_STREAM(ts);
1961	n = _rte_ring_dequeue_burst(s->rx.q, (void **)rs, num);
1962	if (n == 0)
1963		return 0;
1964
1965	/*
1966	 * if we still have packets to read,
1967	 * then rearm stream RX event.
1968	 */
1969	if (n == num && rte_ring_count(s->rx.q) != 0) {
1970		if (tcp_stream_try_acquire(s) > 0 && s->rx.ev != NULL)
1971			tle_event_raise(s->rx.ev);
1972		tcp_stream_release(s);
1973	}
1974
1975	return n;
1976}
1977
1978uint16_t
1979tle_tcp_tx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], uint16_t num)
1980{
1981	uint32_t i, j, k, n;
1982	struct tle_drb *drb[num];
1983	struct tle_tcp_stream *s;
1984
1985	/* extract packets from device TX queue. */
1986
1987	k = num;
1988	n = tle_dring_sc_dequeue(&dev->tx.dr, (const void **)(uintptr_t)pkt,
1989		num, drb, &k);
1990
1991	if (n == 0)
1992		return 0;
1993
1994	/* free empty drbs and notify related streams. */
1995
1996	for (i = 0; i != k; i = j) {
1997		s = drb[i]->udata;
1998		for (j = i + 1; j != k && s == drb[j]->udata; j++)
1999			;
2000		stream_drb_free(s, drb + i, j - i);
2001	}
2002
2003	return n;
2004}
2005
2006static inline void
2007stream_fill_pkt_info(const struct tle_tcp_stream *s, union pkt_info *pi)
2008{
2009	if (s->s.type == TLE_V4)
2010		pi->addr4 = s->s.ipv4.addr;
2011	else
2012		pi->addr6 = &s->s.ipv6.addr;
2013
2014	pi->port = s->s.port;
2015	pi->tf.type = s->s.type;
2016}
2017
2018static int
2019stream_fill_addr(struct tle_tcp_stream *s, const struct sockaddr *addr)
2020{
2021	const struct sockaddr_in *in4;
2022	const struct sockaddr_in6 *in6;
2023	const struct tle_dev_param *prm;
2024	int32_t rc;
2025
2026	rc = 0;
2027	s->s.pmsk.raw = UINT32_MAX;
2028
2029	/* setup L4 src ports and src address fields. */
2030	if (s->s.type == TLE_V4) {
2031		in4 = (const struct sockaddr_in *)addr;
2032		if (in4->sin_addr.s_addr == INADDR_ANY || in4->sin_port == 0)
2033			return -EINVAL;
2034
2035		s->s.port.src = in4->sin_port;
2036		s->s.ipv4.addr.src = in4->sin_addr.s_addr;
2037		s->s.ipv4.mask.src = INADDR_NONE;
2038		s->s.ipv4.mask.dst = INADDR_NONE;
2039
2040	} else if (s->s.type == TLE_V6) {
2041		in6 = (const struct sockaddr_in6 *)addr;
2042		if (memcmp(&in6->sin6_addr, &tle_ipv6_any,
2043				sizeof(tle_ipv6_any)) == 0 ||
2044				in6->sin6_port == 0)
2045			return -EINVAL;
2046
2047		s->s.port.src = in6->sin6_port;
2048		rte_memcpy(&s->s.ipv6.addr.src, &in6->sin6_addr,
2049			sizeof(s->s.ipv6.addr.src));
2050		rte_memcpy(&s->s.ipv6.mask.src, &tle_ipv6_none,
2051			sizeof(s->s.ipv6.mask.src));
2052		rte_memcpy(&s->s.ipv6.mask.dst, &tle_ipv6_none,
2053			sizeof(s->s.ipv6.mask.dst));
2054	}
2055
2056	/* setup the destination device. */
2057	rc = stream_fill_dest(s);
2058	if (rc != 0)
2059		return rc;
2060
2061	/* setup L4 dst address from device param */
2062	prm = &s->tx.dst.dev->prm;
2063	if (s->s.type == TLE_V4) {
2064		if (s->s.ipv4.addr.dst == INADDR_ANY)
2065			s->s.ipv4.addr.dst = prm->local_addr4.s_addr;
2066	} else if (memcmp(&s->s.ipv6.addr.dst, &tle_ipv6_any,
2067			sizeof(tle_ipv6_any)) == 0)
2068		memcpy(&s->s.ipv6.addr.dst, &prm->local_addr6,
2069			sizeof(s->s.ipv6.addr.dst));
2070
2071	return rc;
2072}
2073
2074static inline int
2075tx_syn(struct tle_tcp_stream *s, const struct sockaddr *addr)
2076{
2077	int32_t rc;
2078	uint32_t tms, seq;
2079	union pkt_info pi;
2080	struct stbl *st;
2081	struct stbl_entry *se;
2082
2083	/* fill stream address */
2084	rc = stream_fill_addr(s, addr);
2085	if (rc != 0)
2086		return rc;
2087
2088	/* fill pkt info to generate seq.*/
2089	stream_fill_pkt_info(s, &pi);
2090
2091	tms = tcp_get_tms(s->s.ctx->cycles_ms_shift);
2092	s->tcb.so.ts.val = tms;
2093	s->tcb.so.ts.ecr = 0;
2094	s->tcb.so.wscale = TCP_WSCALE_DEFAULT;
2095	s->tcb.so.mss = calc_smss(s->tx.dst.mtu, &s->tx.dst);
2096
2097	/* note that rcv.nxt is 0 here for sync_gen_seq.*/
2098	seq = sync_gen_seq(&pi, s->tcb.rcv.nxt, tms, s->tcb.so.mss,
2099				s->s.ctx->prm.hash_alg,
2100				&s->s.ctx->prm.secret_key);
2101	s->tcb.snd.iss = seq;
2102	s->tcb.snd.rcvr = seq;
2103	s->tcb.snd.una = seq;
2104	s->tcb.snd.nxt = seq + 1;
2105	s->tcb.snd.rto = TCP_RTO_DEFAULT;
2106	s->tcb.snd.ts = tms;
2107
2108	s->tcb.rcv.mss = s->tcb.so.mss;
2109	s->tcb.rcv.wscale = TCP_WSCALE_DEFAULT;
2110	s->tcb.rcv.wnd = calc_rx_wnd(s, s->tcb.rcv.wscale);
2111	s->tcb.rcv.ts = 0;
2112
2113	/* add the stream in stream table */
2114	st = CTX_TCP_STLB(s->s.ctx);
2115	se = stbl_add_stream_lock(st, s);
2116	if (se == NULL)
2117		return -ENOBUFS;
2118	s->ste = se;
2119
2120	/* put stream into the to-send queue */
2121	txs_enqueue(s->s.ctx, s);
2122
2123	return 0;
2124}
2125
2126int
2127tle_tcp_stream_connect(struct tle_stream *ts, const struct sockaddr *addr)
2128{
2129	struct tle_tcp_stream *s;
2130	uint32_t type;
2131	int32_t rc;
2132
2133	if (ts == NULL || addr == NULL)
2134		return -EINVAL;
2135
2136	s = TCP_STREAM(ts);
2137	type = s->s.type;
2138	if (type >= TLE_VNUM)
2139		return -EINVAL;
2140
2141	if (tcp_stream_try_acquire(s) > 0) {
2142		rc = rte_atomic16_cmpset(&s->tcb.state, TCP_ST_CLOSED,
2143			TCP_ST_SYN_SENT);
2144		rc = (rc == 0) ? -EDEADLK : 0;
2145	} else
2146		rc = -EINVAL;
2147
2148	if (rc != 0) {
2149		tcp_stream_release(s);
2150		return rc;
2151	}
2152
2153	/* fill stream, prepare and transmit syn pkt */
2154	s->tcb.uop |= TCP_OP_CONNECT;
2155	rc = tx_syn(s, addr);
2156	tcp_stream_release(s);
2157
2158	/* error happened, do a cleanup */
2159	if (rc != 0)
2160		tle_tcp_stream_close(ts);
2161
2162	return rc;
2163}
2164
2165uint16_t
2166tle_tcp_stream_recv(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
2167{
2168	uint32_t n;
2169	struct tle_tcp_stream *s;
2170
2171	s = TCP_STREAM(ts);
2172	n = _rte_ring_mcs_dequeue_burst(s->rx.q, (void **)pkt, num);
2173	if (n == 0)
2174		return 0;
2175
2176	/*
2177	 * if we still have packets to read,
2178	 * then rearm stream RX event.
2179	 */
2180	if (n == num && rte_ring_count(s->rx.q) != 0) {
2181		if (tcp_stream_try_acquire(s) > 0 && s->rx.ev != NULL)
2182			tle_event_raise(s->rx.ev);
2183		tcp_stream_release(s);
2184	}
2185
2186	return n;
2187}
2188
2189ssize_t
2190tle_tcp_stream_readv(struct tle_stream *ts, const struct iovec *iov,
2191	int iovcnt)
2192{
2193	int32_t i;
2194	uint32_t mn, n, tn;
2195	size_t sz;
2196	struct tle_tcp_stream *s;
2197	struct iovec iv;
2198	struct rxq_objs mo[2];
2199
2200	s = TCP_STREAM(ts);
2201
2202	/* get group of packets */
2203	mn = tcp_rxq_get_objs(s, mo);
2204	if (mn == 0)
2205		return 0;
2206
2207	sz = 0;
2208	n = 0;
2209	for (i = 0; i != iovcnt; i++) {
2210		iv = iov[i];
2211		sz += iv.iov_len;
2212		n += _mbus_to_iovec(&iv, mo[0].mb + n, mo[0].num - n);
2213		if (iv.iov_len != 0) {
2214			sz -= iv.iov_len;
2215			break;
2216		}
2217	}
2218
2219	tn = n;
2220
2221	if (i != iovcnt && mn != 1) {
2222		n = 0;
2223		do {
2224			sz += iv.iov_len;
2225			n += _mbus_to_iovec(&iv, mo[1].mb + n, mo[1].num - n);
2226			if (iv.iov_len != 0) {
2227				sz -= iv.iov_len;
2228				break;
2229			}
2230			if (i + 1 != iovcnt)
2231				iv = iov[i + 1];
2232		} while (++i != iovcnt);
2233		tn += n;
2234	}
2235
2236	tcp_rxq_consume(s, tn);
2237
2238	/*
2239	 * if we still have packets to read,
2240	 * then rearm stream RX event.
2241	 */
2242	if (i == iovcnt && rte_ring_count(s->rx.q) != 0) {
2243		if (tcp_stream_try_acquire(s) > 0 && s->rx.ev != NULL)
2244			tle_event_raise(s->rx.ev);
2245		tcp_stream_release(s);
2246	}
2247
2248	return sz;
2249}
2250
2251static inline int32_t
2252tx_segments(struct tle_tcp_stream *s, uint64_t ol_flags,
2253	struct rte_mbuf *segs[], uint32_t num)
2254{
2255	uint32_t i;
2256	int32_t rc;
2257
2258	for (i = 0; i != num; i++) {
2259		/* Build L2/L3/L4 header */
2260		rc = tcp_fill_mbuf(segs[i], s, &s->tx.dst, ol_flags, s->s.port,
2261			0, TCP_FLAG_ACK, 0, 0);
2262		if (rc != 0) {
2263			free_mbufs(segs, num);
2264			break;
2265		}
2266	}
2267
2268	if (i == num) {
2269		/* queue packets for further transmission. */
2270		rc = _rte_ring_enqueue_bulk(s->tx.q, (void **)segs, num);
2271		if (rc != 0)
2272			free_mbufs(segs, num);
2273	}
2274
2275	return rc;
2276}
2277
2278uint16_t
2279tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
2280{
2281	uint32_t i, j, k, mss, n, state;
2282	int32_t rc;
2283	uint64_t ol_flags;
2284	struct tle_tcp_stream *s;
2285	struct rte_mbuf *segs[TCP_MAX_PKT_SEG];
2286
2287	s = TCP_STREAM(ts);
2288
2289	/* mark stream as not closable. */
2290	if (tcp_stream_acquire(s) < 0) {
2291		rte_errno = EAGAIN;
2292		return 0;
2293	}
2294
2295	state = s->tcb.state;
2296	if (state != TCP_ST_ESTABLISHED && state != TCP_ST_CLOSE_WAIT) {
2297		rte_errno = ENOTCONN;
2298		tcp_stream_release(s);
2299		return 0;
2300	}
2301
2302	mss = s->tcb.snd.mss;
2303	ol_flags = s->tx.dst.ol_flags;
2304
2305	k = 0;
2306	rc = 0;
2307	while (k != num) {
2308		/* prepare and check for TX */
2309		for (i = k; i != num; i++) {
2310			if (pkt[i]->pkt_len > mss ||
2311					pkt[i]->nb_segs > TCP_MAX_PKT_SEG)
2312				break;
2313			rc = tcp_fill_mbuf(pkt[i], s, &s->tx.dst, ol_flags,
2314				s->s.port, 0, TCP_FLAG_ACK, 0, 0);
2315			if (rc != 0)
2316				break;
2317		}
2318
2319		if (i != k) {
2320			/* queue packets for further transmission. */
2321			n = _rte_ring_enqueue_burst(s->tx.q,
2322				(void **)pkt + k, (i - k));
2323			k += n;
2324
2325			/*
2326			 * for unsent, but already modified packets:
2327			 * remove pkt l2/l3 headers, restore ol_flags
2328			 */
2329			if (i != k) {
2330				ol_flags = ~s->tx.dst.ol_flags;
2331				for (j = k; j != i; j++) {
2332					rte_pktmbuf_adj(pkt[j], pkt[j]->l2_len +
2333						pkt[j]->l3_len +
2334						pkt[j]->l4_len);
2335					pkt[j]->ol_flags &= ol_flags;
2336				}
2337				break;
2338			}
2339		}
2340
2341		if (rc != 0) {
2342			rte_errno = -rc;
2343			break;
2344
2345		/* segment large packet and enqueue for sending */
2346		} else if (i != num) {
2347			/* segment the packet. */
2348			rc = tcp_segmentation(pkt[i], segs, RTE_DIM(segs),
2349				&s->tx.dst, mss);
2350			if (rc < 0) {
2351				rte_errno = -rc;
2352				break;
2353			}
2354
2355			rc = tx_segments(s, ol_flags, segs, rc);
2356			if (rc == 0) {
2357				/* free the large mbuf */
2358				rte_pktmbuf_free(pkt[i]);
2359				/* set the mbuf as consumed */
2360				k++;
2361			} else
2362				/* no space left in tx queue */
2363				break;
2364		}
2365	}
2366
2367	/* notify BE about more data to send */
2368	if (k != 0)
2369		txs_enqueue(s->s.ctx, s);
2370	/* if possible, re-arm stream write event. */
2371	if (rte_ring_free_count(s->tx.q) != 0 && s->tx.ev != NULL)
2372		tle_event_raise(s->tx.ev);
2373
2374	tcp_stream_release(s);
2375
2376	return k;
2377}
2378
2379ssize_t
2380tle_tcp_stream_writev(struct tle_stream *ts, struct rte_mempool *mp,
2381	const struct iovec *iov, int iovcnt)
2382{
2383	int32_t i, rc;
2384	uint32_t j, k, n, num, slen, state;
2385	uint64_t ol_flags;
2386	size_t sz, tsz;
2387	struct tle_tcp_stream *s;
2388	struct iovec iv;
2389	struct rte_mbuf *mb[2 * MAX_PKT_BURST];
2390
2391	s = TCP_STREAM(ts);
2392
2393	/* mark stream as not closable. */
2394	if (tcp_stream_acquire(s) < 0) {
2395		rte_errno = EAGAIN;
2396		return -1;
2397	}
2398
2399	state = s->tcb.state;
2400	if (state != TCP_ST_ESTABLISHED && state != TCP_ST_CLOSE_WAIT) {
2401		rte_errno = ENOTCONN;
2402		tcp_stream_release(s);
2403		return -1;
2404	}
2405
2406	/* figure out how many mbufs do we need */
2407	tsz = 0;
2408	for (i = 0; i != iovcnt; i++)
2409		tsz += iov[i].iov_len;
2410
2411	slen = rte_pktmbuf_data_room_size(mp);
2412	slen = RTE_MIN(slen, s->tcb.snd.mss);
2413
2414	num = (tsz + slen - 1) / slen;
2415	n = rte_ring_free_count(s->tx.q);
2416	num = RTE_MIN(num, n);
2417	n = RTE_MIN(num, RTE_DIM(mb));
2418
2419	/* allocate mbufs */
2420	if (rte_pktmbuf_alloc_bulk(mp, mb, n) != 0) {
2421		rte_errno = ENOMEM;
2422		tcp_stream_release(s);
2423		return -1;
2424	}
2425
2426	/* copy data into the mbufs */
2427	k = 0;
2428	sz = 0;
2429	for (i = 0; i != iovcnt; i++) {
2430		iv = iov[i];
2431		sz += iv.iov_len;
2432		k += _iovec_to_mbsegs(&iv, slen, mb + k, n - k);
2433		if (iv.iov_len != 0) {
2434			sz -= iv.iov_len;
2435			break;
2436		}
2437	}
2438
2439	/* partially filled segment */
2440	k += (k != n && mb[k]->data_len != 0);
2441
2442	/* fill pkt headers */
2443	ol_flags = s->tx.dst.ol_flags;
2444
2445	for (j = 0; j != k; j++) {
2446		rc = tcp_fill_mbuf(mb[j], s, &s->tx.dst, ol_flags,
2447			s->s.port, 0, TCP_FLAG_ACK, 0, 0);
2448		if (rc != 0)
2449			break;
2450	}
2451
2452	/* if no error encountered, then enqueue pkts for transmission */
2453	if (k == j)
2454		k = _rte_ring_enqueue_burst(s->tx.q, (void **)mb, j);
2455	else
2456		k = 0;
2457
2458	if (k != j) {
2459
2460		/* free pkts that were not enqueued */
2461		free_mbufs(mb + k, j - k);
2462
2463		/* our last segment can be partially filled */
2464		sz += slen - sz % slen;
2465		sz -= (j - k) * slen;
2466
2467		/* report an error */
2468		if (rc != 0) {
2469			rte_errno = -rc;
2470			sz = -1;
2471		}
2472	}
2473
2474        if (k != 0) {
2475
2476		/* notify BE about more data to send */
2477		txs_enqueue(s->s.ctx, s);
2478
2479		/* if possible, re-arm stream write event. */
2480		if (rte_ring_free_count(s->tx.q) != 0 && s->tx.ev != NULL)
2481			tle_event_raise(s->tx.ev);
2482	}
2483
2484	tcp_stream_release(s);
2485	return sz;
2486}
2487
2488/* send data and FIN (if needed) */
2489static inline void
2490tx_data_fin(struct tle_tcp_stream *s, uint32_t tms, uint32_t state)
2491{
2492	/* try to send some data */
2493	tx_nxt_data(s, tms);
2494
2495	/* we also have to send a FIN */
2496	if (state != TCP_ST_ESTABLISHED &&
2497			state != TCP_ST_CLOSE_WAIT &&
2498			tcp_txq_nxt_cnt(s) == 0 &&
2499			s->tcb.snd.fss != s->tcb.snd.nxt) {
2500		s->tcb.snd.fss = ++s->tcb.snd.nxt;
2501		send_ack(s, tms, TCP_FLAG_FIN | TCP_FLAG_ACK);
2502	}
2503}
2504
2505static inline void
2506tx_stream(struct tle_tcp_stream *s, uint32_t tms)
2507{
2508	uint32_t state;
2509
2510	state = s->tcb.state;
2511
2512	if (state == TCP_ST_SYN_SENT) {
2513		/* send the SYN, start the rto timer */
2514		send_ack(s, tms, TCP_FLAG_SYN);
2515		timer_start(s);
2516
2517	} else if (state >= TCP_ST_ESTABLISHED && state <= TCP_ST_LAST_ACK) {
2518
2519		tx_data_fin(s, tms, state);
2520
2521		/* start RTO timer. */
2522		if (s->tcb.snd.nxt != s->tcb.snd.una)
2523			timer_start(s);
2524	}
2525}
2526
2527static inline void
2528rto_stream(struct tle_tcp_stream *s, uint32_t tms)
2529{
2530	uint32_t state;
2531
2532	state = s->tcb.state;
2533
2534	TCP_LOG(DEBUG, "%s(%p, tms=%u): state=%u, "
2535		"retx=%u, retm=%u, "
2536		"rto=%u, snd.ts=%u, tmo=%u, "
2537		"snd.nxt=%lu, snd.una=%lu, flight_size=%lu, "
2538		"snd.rcvr=%lu, snd.fastack=%u, "
2539		"wnd=%u, cwnd=%u, ssthresh=%u, "
2540		"bytes sent=%lu, pkt remain=%u;\n",
2541		__func__, s, tms, s->tcb.state,
2542		s->tcb.snd.nb_retx, s->tcb.snd.nb_retm,
2543		s->tcb.snd.rto, s->tcb.snd.ts, tms - s->tcb.snd.ts,
2544		s->tcb.snd.nxt, s->tcb.snd.una, s->tcb.snd.nxt - s->tcb.snd.una,
2545		s->tcb.snd.rcvr, s->tcb.snd.fastack,
2546		s->tcb.snd.wnd, s->tcb.snd.cwnd, s->tcb.snd.ssthresh,
2547		s->tcb.snd.nxt - s->tcb.snd.iss, tcp_txq_nxt_cnt(s));
2548
2549	if (s->tcb.snd.nb_retx < s->tcb.snd.nb_retm) {
2550
2551		if (state >= TCP_ST_ESTABLISHED && state <= TCP_ST_LAST_ACK) {
2552
2553			/* update SND.CWD and SND.SSTHRESH */
2554			rto_cwnd_update(&s->tcb);
2555
2556			/* RFC 6582 3.2.4 */
2557			s->tcb.snd.rcvr = s->tcb.snd.nxt;
2558			s->tcb.snd.fastack = 0;
2559
2560			/* restart from last acked data */
2561			tcp_txq_rst_nxt_head(s);
2562			s->tcb.snd.nxt = s->tcb.snd.una;
2563
2564			tx_data_fin(s, tms, state);
2565
2566		} else if (state == TCP_ST_SYN_SENT) {
2567			/* resending SYN */
2568			s->tcb.so.ts.val = tms;
2569
2570			/* According to RFC 6928 2:
2571			 * To reduce the chance for spurious SYN or SYN/ACK
2572			 * retransmission, it is RECOMMENDED that
2573			 * implementations refrain from resetting the initial
2574			 * window to 1 segment, unless there have been more
2575			 * than one SYN or SYN/ACK retransmissions or true loss
2576			 * detection has been made.
2577			 */
2578			if (s->tcb.snd.nb_retx != 0)
2579				s->tcb.snd.cwnd = s->tcb.snd.mss;
2580
2581			send_ack(s, tms, TCP_FLAG_SYN);
2582
2583		} else if (state == TCP_ST_TIME_WAIT) {
2584			stream_term(s);
2585		}
2586
2587		/* RFC6298:5.5 back off the timer */
2588		s->tcb.snd.rto = rto_roundup(2 * s->tcb.snd.rto);
2589		s->tcb.snd.nb_retx++;
2590		timer_restart(s);
2591
2592	} else {
2593		send_rst(s, s->tcb.snd.una);
2594		stream_term(s);
2595	}
2596}
2597
2598int
2599tle_tcp_process(struct tle_ctx *ctx, uint32_t num)
2600{
2601	uint32_t i, k, tms;
2602	struct sdr *dr;
2603	struct tle_timer_wheel *tw;
2604	struct tle_stream *p;
2605	struct tle_tcp_stream *s, *rs[num];
2606
2607	/* process streams with RTO exipred */
2608
2609	tw = CTX_TCP_TMWHL(ctx);
2610	tms = tcp_get_tms(ctx->cycles_ms_shift);
2611	tle_timer_expire(tw, tms);
2612
2613	k = tle_timer_get_expired_bulk(tw, (void **)rs, RTE_DIM(rs));
2614
2615	for (i = 0; i != k; i++) {
2616
2617		s = rs[i];
2618		s->timer.handle = NULL;
2619		if (tcp_stream_try_acquire(s) > 0)
2620			rto_stream(s, tms);
2621		tcp_stream_release(s);
2622	}
2623
2624	/* process streams from to-send queue */
2625
2626	k = txs_dequeue_bulk(ctx, rs, RTE_DIM(rs));
2627
2628	for (i = 0; i != k; i++) {
2629
2630		s = rs[i];
2631		rte_atomic32_set(&s->tx.arm, 0);
2632
2633		if (tcp_stream_try_acquire(s) > 0)
2634			tx_stream(s, tms);
2635		else
2636			txs_enqueue(s->s.ctx, s);
2637		tcp_stream_release(s);
2638	}
2639
2640	/* collect streams to close from the death row */
2641
2642	dr = CTX_TCP_SDR(ctx);
2643	for (k = 0, p = STAILQ_FIRST(&dr->be);
2644			k != num && p != NULL;
2645			k++, p = STAILQ_NEXT(p, link))
2646		rs[k] = TCP_STREAM(p);
2647
2648	if (p == NULL)
2649		STAILQ_INIT(&dr->be);
2650	else
2651		STAILQ_FIRST(&dr->be) = p;
2652
2653	/* cleanup closed streams */
2654	for (i = 0; i != k; i++) {
2655		s = rs[i];
2656		tcp_stream_down(s);
2657		tcp_stream_reset(ctx, s);
2658	}
2659
2660	return 0;
2661}
2662