tcp_rxtx.c revision 2c33ba7b
1/*
2 * Copyright (c) 2016  Intel Corporation.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#include <rte_errno.h>
17#include <rte_ethdev.h>
18#include <rte_ip.h>
19#include <rte_ip_frag.h>
20#include <rte_tcp.h>
21
22#include "tcp_stream.h"
23#include "tcp_timer.h"
24#include "stream_table.h"
25#include "syncookie.h"
26#include "misc.h"
27#include "tcp_ctl.h"
28#include "tcp_rxq.h"
29#include "tcp_txq.h"
30#include "tcp_tx_seg.h"
31
32#define	TCP_MAX_PKT_SEG	0x20
33
34/*
35 * checks if input TCP ports and IP addresses match given stream.
36 * returns zero on success.
37 */
38static inline int
39rx_check_stream(const struct tle_tcp_stream *s, const union pkt_info *pi)
40{
41	int32_t rc;
42
43	if (pi->tf.type == TLE_V4)
44		rc = (pi->port.raw & s->s.pmsk.raw) != s->s.port.raw ||
45			(pi->addr4.raw & s->s.ipv4.mask.raw) !=
46			s->s.ipv4.addr.raw;
47	else
48		rc = (pi->port.raw & s->s.pmsk.raw) != s->s.port.raw ||
49			ymm_mask_cmp(&pi->addr6->raw, &s->s.ipv6.addr.raw,
50			&s->s.ipv6.mask.raw) != 0;
51
52	return rc;
53}
54
55static inline struct tle_tcp_stream *
56rx_obtain_listen_stream(const struct tle_dev *dev, const union pkt_info *pi,
57	uint32_t type)
58{
59	struct tle_tcp_stream *s;
60
61	s = (struct tle_tcp_stream *)dev->dp[type]->streams[pi->port.dst];
62	if (s == NULL || rwl_acquire(&s->rx.use) < 0)
63		return NULL;
64
65	/* check that we have a proper stream. */
66	if (s->tcb.state != TCP_ST_LISTEN) {
67		rwl_release(&s->rx.use);
68		s = NULL;
69	}
70
71	return s;
72}
73
74static inline struct tle_tcp_stream *
75rx_obtain_stream(const struct tle_dev *dev, struct stbl *st,
76	const union pkt_info *pi, uint32_t type)
77{
78	struct tle_tcp_stream *s;
79
80	s = stbl_find_data(st, pi);
81	if (s == NULL) {
82		if (pi->tf.flags == TCP_FLAG_ACK)
83			return rx_obtain_listen_stream(dev, pi, type);
84		return NULL;
85	}
86
87	if (stbl_data_pkt(s) || rwl_acquire(&s->rx.use) < 0)
88		return NULL;
89	/* check that we have a proper stream. */
90	else if (s->tcb.state == TCP_ST_CLOSED) {
91		rwl_release(&s->rx.use);
92		s = NULL;
93	}
94
95	return s;
96}
97
98/*
99 * Consider 2 pkt_info *equal* if their:
100 * - types (IPv4/IPv6)
101 * - TCP flags
102 * - checksum flags
103 * - TCP src and dst ports
104 * - IP src and dst addresses
105 * are equal.
106 */
107static inline int
108pkt_info_bulk_eq(const union pkt_info pi[], uint32_t num)
109{
110	uint32_t i;
111
112	i = 1;
113
114	if (pi[0].tf.type == TLE_V4) {
115		while (i != num && xmm_cmp(&pi[0].raw, &pi[i].raw) == 0)
116			i++;
117
118	} else if (pi[0].tf.type == TLE_V6) {
119		while (i != num &&
120				pi[0].raw.u64[0] == pi[i].raw.u64[0] &&
121				ymm_cmp(&pi[0].addr6->raw,
122				&pi[i].addr6->raw) == 0)
123			i++;
124	}
125
126	return i;
127}
128
129static inline int
130pkt_info_bulk_syneq(const union pkt_info pi[], uint32_t num)
131{
132	uint32_t i;
133
134	i = 1;
135
136	if (pi[0].tf.type == TLE_V4) {
137		while (i != num && pi[0].tf.raw == pi[i].tf.raw &&
138				pi[0].port.dst == pi[i].port.dst &&
139				pi[0].addr4.dst == pi[i].addr4.dst)
140			i++;
141
142	} else if (pi[0].tf.type == TLE_V6) {
143		while (i != num && pi[0].tf.raw == pi[i].tf.raw &&
144				pi[0].port.dst == pi[i].port.dst &&
145				xmm_cmp(&pi[0].addr6->dst,
146				&pi[i].addr6->dst) == 0)
147			i++;
148	}
149
150	return i;
151}
152
153static inline void
154stream_drb_free(struct tle_tcp_stream *s, struct tle_drb *drbs[],
155	uint32_t nb_drb)
156{
157	_rte_ring_enqueue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
158}
159
160static inline uint32_t
161stream_drb_alloc(struct tle_tcp_stream *s, struct tle_drb *drbs[],
162	uint32_t nb_drb)
163{
164	return _rte_ring_dequeue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
165}
166
167static inline void
168fill_tcph(struct tcp_hdr *l4h, const struct tcb *tcb, union l4_ports port,
169	uint32_t seq, uint8_t hlen, uint8_t flags)
170{
171	uint16_t wnd;
172
173	l4h->src_port = port.dst;
174	l4h->dst_port = port.src;
175
176	wnd = (flags & TCP_FLAG_SYN) ?
177		RTE_MIN(tcb->rcv.wnd, (uint32_t)UINT16_MAX) :
178		tcb->rcv.wnd >> tcb->rcv.wscale;
179
180	/* ??? use sse shuffle to hton all remaining 16 bytes at once. ??? */
181	l4h->sent_seq = rte_cpu_to_be_32(seq);
182	l4h->recv_ack = rte_cpu_to_be_32(tcb->rcv.nxt);
183	l4h->data_off = hlen / TCP_DATA_ALIGN << TCP_DATA_OFFSET;
184	l4h->tcp_flags = flags;
185	l4h->rx_win = rte_cpu_to_be_16(wnd);
186	l4h->cksum = 0;
187	l4h->tcp_urp = 0;
188
189	if (flags & TCP_FLAG_SYN)
190		fill_syn_opts(l4h + 1, &tcb->so);
191	else if ((flags & TCP_FLAG_RST) == 0 && tcb->so.ts.raw != 0)
192		fill_tms_opts(l4h + 1, tcb->snd.ts, tcb->rcv.ts);
193}
194
195static inline int
196tcp_fill_mbuf(struct rte_mbuf *m, const struct tle_tcp_stream *s,
197	const struct tle_dest *dst, uint64_t ol_flags,
198	union l4_ports port, uint32_t seq, uint32_t flags,
199	uint32_t pid, uint32_t swcsm)
200{
201	uint32_t l4, len, plen;
202	struct tcp_hdr *l4h;
203	char *l2h;
204
205	len = dst->l2_len + dst->l3_len;
206	plen = m->pkt_len;
207
208	if (flags & TCP_FLAG_SYN)
209		l4 = sizeof(*l4h) + TCP_TX_OPT_LEN_MAX;
210	else if ((flags & TCP_FLAG_RST) == 0 && s->tcb.rcv.ts != 0)
211		l4 = sizeof(*l4h) + TCP_TX_OPT_LEN_TMS;
212	else
213		l4 = sizeof(*l4h);
214
215	/* adjust mbuf to put L2/L3/L4 headers into it. */
216	l2h = rte_pktmbuf_prepend(m, len + l4);
217	if (l2h == NULL)
218		return -EINVAL;
219
220	/* copy L2/L3 header */
221	rte_memcpy(l2h, dst->hdr, len);
222
223	/* setup TCP header & options */
224	l4h = (struct tcp_hdr *)(l2h + len);
225	fill_tcph(l4h, &s->tcb, port, seq, l4, flags);
226
227	/* setup mbuf TX offload related fields. */
228	m->tx_offload = _mbuf_tx_offload(dst->l2_len, dst->l3_len, l4, 0, 0, 0);
229	m->ol_flags |= ol_flags;
230
231	/* update proto specific fields. */
232
233	if (s->s.type == TLE_V4) {
234		struct ipv4_hdr *l3h;
235		l3h = (struct ipv4_hdr *)(l2h + dst->l2_len);
236		l3h->packet_id = rte_cpu_to_be_16(pid);
237		l3h->total_length = rte_cpu_to_be_16(plen + dst->l3_len + l4);
238
239		if ((ol_flags & PKT_TX_TCP_CKSUM) != 0)
240			l4h->cksum = _ipv4x_phdr_cksum(l3h, m->l3_len,
241				ol_flags);
242		else if (swcsm != 0)
243			l4h->cksum = _ipv4_udptcp_mbuf_cksum(m, len, l3h);
244
245		if ((ol_flags & PKT_TX_IP_CKSUM) == 0 && swcsm != 0)
246			l3h->hdr_checksum = _ipv4x_cksum(l3h, m->l3_len);
247	} else {
248		struct ipv6_hdr *l3h;
249		l3h = (struct ipv6_hdr *)(l2h + dst->l2_len);
250		l3h->payload_len = rte_cpu_to_be_16(plen + l4);
251		if ((ol_flags & PKT_TX_TCP_CKSUM) != 0)
252			l4h->cksum = rte_ipv6_phdr_cksum(l3h, ol_flags);
253		else if (swcsm != 0)
254			l4h->cksum = _ipv6_udptcp_mbuf_cksum(m, len, l3h);
255	}
256
257	return 0;
258}
259
260/*
261 * That function supposed to be used only for data packets.
262 * Assumes that L2/L3/L4 headers and mbuf fields already setup properly.
263 *  - updates tcp SEG.SEQ, SEG.ACK, TS.VAL, TS.ECR.
264 *  - if no HW cksum offloads are enabled, calculates TCP checksum.
265 */
266static inline void
267tcp_update_mbuf(struct rte_mbuf *m, uint32_t type, const struct tcb *tcb,
268	uint32_t seq, uint32_t pid)
269{
270	struct tcp_hdr *l4h;
271	uint32_t len;
272
273	len = m->l2_len + m->l3_len;
274	l4h = rte_pktmbuf_mtod_offset(m, struct tcp_hdr *, len);
275
276	l4h->sent_seq = rte_cpu_to_be_32(seq);
277	l4h->recv_ack = rte_cpu_to_be_32(tcb->rcv.nxt);
278
279	if (tcb->so.ts.raw != 0)
280		fill_tms_opts(l4h + 1, tcb->snd.ts, tcb->rcv.ts);
281
282	if (type == TLE_V4) {
283		struct ipv4_hdr *l3h;
284		l3h = rte_pktmbuf_mtod_offset(m, struct ipv4_hdr *, m->l2_len);
285		l3h->hdr_checksum = 0;
286		l3h->packet_id = rte_cpu_to_be_16(pid);
287		if ((m->ol_flags & PKT_TX_IP_CKSUM) == 0)
288			l3h->hdr_checksum = _ipv4x_cksum(l3h, m->l3_len);
289	}
290
291	/* have to calculate TCP checksum in SW */
292	if ((m->ol_flags & PKT_TX_TCP_CKSUM) == 0) {
293
294		l4h->cksum = 0;
295
296		if (type == TLE_V4) {
297			struct ipv4_hdr *l3h;
298			l3h = rte_pktmbuf_mtod_offset(m, struct ipv4_hdr *,
299				m->l2_len);
300			l4h->cksum = _ipv4_udptcp_mbuf_cksum(m, len, l3h);
301
302		} else {
303			struct ipv6_hdr *l3h;
304			l3h = rte_pktmbuf_mtod_offset(m, struct ipv6_hdr *,
305				m->l2_len);
306			l4h->cksum = _ipv6_udptcp_mbuf_cksum(m, len, l3h);
307		}
308	}
309}
310
311/* Send data packets that need to be ACK-ed by peer */
312static inline uint32_t
313tx_data_pkts(struct tle_tcp_stream *s, struct rte_mbuf *const m[], uint32_t num)
314{
315	uint32_t bsz, i, nb, nbm;
316	struct tle_dev *dev;
317	struct tle_drb *drb[num];
318
319	/* calculate how many drbs are needed.*/
320	bsz = s->tx.drb.nb_elem;
321	nbm = (num + bsz - 1) / bsz;
322
323	/* allocate drbs, adjust number of packets. */
324	nb = stream_drb_alloc(s, drb, nbm);
325
326	/* drb ring is empty. */
327	if (nb == 0)
328		return 0;
329
330	else if (nb != nbm)
331		num = nb * bsz;
332
333	dev = s->tx.dst.dev;
334
335	/* enqueue pkts for TX. */
336	nbm = nb;
337	i = tle_dring_mp_enqueue(&dev->tx.dr, (const void * const*)m,
338		num, drb, &nb);
339
340	/* free unused drbs. */
341	if (nb != 0)
342		stream_drb_free(s, drb + nbm - nb, nb);
343
344	return i;
345}
346
347static inline uint32_t
348tx_data_bulk(struct tle_tcp_stream *s, union seqlen *sl, struct rte_mbuf *mi[],
349	uint32_t num)
350{
351	uint32_t fail, i, k, n, mss, pid, plen, sz, tn, type;
352	struct tle_dev *dev;
353	struct rte_mbuf *mb;
354	struct rte_mbuf *mo[MAX_PKT_BURST + TCP_MAX_PKT_SEG];
355
356	mss = s->tcb.snd.mss;
357	type = s->s.type;
358
359	dev = s->tx.dst.dev;
360	pid = rte_atomic32_add_return(&dev->tx.packet_id[type], num) - num;
361
362	k = 0;
363	tn = 0;
364	fail = 0;
365	for (i = 0; i != num && sl->len != 0 && fail == 0; i++) {
366
367		mb = mi[i];
368		sz = RTE_MIN(sl->len, mss);
369		plen = PKT_L4_PLEN(mb);
370
371		/*fast path, no need to use indirect mbufs. */
372		if (plen <= sz) {
373
374			/* update pkt TCP header */
375			tcp_update_mbuf(mb, type, &s->tcb, sl->seq, pid + i);
376
377			/* keep mbuf till ACK is received. */
378			rte_pktmbuf_refcnt_update(mb, 1);
379			sl->len -= plen;
380			sl->seq += plen;
381			mo[k++] = mb;
382		/* remaining snd.wnd is less them MSS, send nothing */
383		} else if (sz < mss)
384			break;
385		/* packet indirection needed */
386		else
387			RTE_VERIFY(0);
388
389		if (k >= MAX_PKT_BURST) {
390			n = tx_data_pkts(s, mo, k);
391			fail = k - n;
392			tn += n;
393			k = 0;
394		}
395	}
396
397	if (k != 0) {
398		n = tx_data_pkts(s, mo, k);
399		fail = k - n;
400		tn += n;
401	}
402
403	if (fail != 0) {
404		sz = tcp_mbuf_seq_free(mo + n, fail);
405		sl->seq -= sz;
406		sl->len += sz;
407	}
408
409	return tn;
410}
411
412/*
413 * gets data from stream send buffer, updates it and
414 * queues it into TX device queue.
415 * Note that this function and is not MT safe.
416 */
417static inline uint32_t
418tx_nxt_data(struct tle_tcp_stream *s, uint32_t tms)
419{
420	uint32_t n, num, tn, wnd;
421	struct rte_mbuf **mi;
422	union seqlen sl;
423
424	tn = 0;
425	wnd = s->tcb.snd.wnd - (uint32_t)(s->tcb.snd.nxt - s->tcb.snd.una);
426	sl.seq = s->tcb.snd.nxt;
427	sl.len = RTE_MIN(wnd, s->tcb.snd.cwnd);
428
429	if (sl.len == 0)
430		return tn;
431
432	/* update send timestamp */
433	s->tcb.snd.ts = tms;
434
435	do {
436		/* get group of packets */
437		mi = tcp_txq_get_nxt_objs(s, &num);
438
439		/* stream send buffer is empty */
440		if (num == 0)
441			break;
442
443		/* queue data packets for TX */
444		n = tx_data_bulk(s, &sl, mi, num);
445		tn += n;
446
447		/* update consumer head */
448		tcp_txq_set_nxt_head(s, n);
449	} while (n == num);
450
451	s->tcb.snd.nxt += sl.seq - (uint32_t)s->tcb.snd.nxt;
452	return tn;
453}
454
455static inline void
456free_una_data(struct tle_tcp_stream *s, uint32_t len)
457{
458	uint32_t i, n, num, plen;
459	struct rte_mbuf **mi;
460
461	n = 0;
462	plen = 0;
463
464	do {
465		/* get group of packets */
466		mi = tcp_txq_get_una_objs(s, &num);
467
468		if (num == 0)
469			break;
470
471		/* free acked data */
472		for (i = 0; i != num && n != len; i++, n = plen) {
473			plen += PKT_L4_PLEN(mi[i]);
474			if (plen > len) {
475				/* keep SND.UNA at the start of the packet */
476				len -= RTE_MIN(len, plen - len);
477				break;
478			}
479			rte_pktmbuf_free(mi[i]);
480		}
481
482		/* update consumer tail */
483		tcp_txq_set_una_tail(s, i);
484	} while (plen < len);
485
486	s->tcb.snd.una += len;
487
488	/*
489	 * that could happen in case of retransmit,
490	 * adjust SND.NXT with SND.UNA.
491	 */
492	if (s->tcb.snd.una > s->tcb.snd.nxt) {
493		tcp_txq_rst_nxt_head(s);
494		s->tcb.snd.nxt = s->tcb.snd.una;
495	}
496}
497
498static inline uint16_t
499calc_smss(uint16_t mss, const struct tle_dest *dst)
500{
501	uint16_t n;
502
503	n = dst->mtu - dst->l2_len - dst->l3_len - TCP_TX_HDR_DACK;
504	mss = RTE_MIN(n, mss);
505	return mss;
506}
507
508/*
509 * RFC 5681 3.1
510 * If SMSS > 2190 bytes:
511 *     IW = 2 * SMSS bytes and MUST NOT be more than 2 segments
512 *  If (SMSS > 1095 bytes) and (SMSS <= 2190 bytes):
513 *     IW = 3 * SMSS bytes and MUST NOT be more than 3 segments
514 *  if SMSS <= 1095 bytes:
515 *     IW = 4 * SMSS bytes and MUST NOT be more than 4 segments
516 */
517static inline uint32_t
518initial_cwnd(uint16_t smss)
519{
520	if (smss > 2190)
521		return 2 * smss;
522	else if (smss > 1095)
523		return 3 * smss;
524	return 4 * smss;
525}
526
527/*
528 * queue standalone packet to he particular output device
529 * It assumes that:
530 * - L2/L3/L4 headers should be already set.
531 * - packet fits into one segment.
532 */
533static inline int
534send_pkt(struct tle_tcp_stream *s, struct tle_dev *dev, struct rte_mbuf *m)
535{
536	uint32_t n, nb;
537	struct tle_drb *drb;
538
539	if (stream_drb_alloc(s, &drb, 1) == 0)
540		return -ENOBUFS;
541
542	/* enqueue pkt for TX. */
543	nb = 1;
544	n = tle_dring_mp_enqueue(&dev->tx.dr, (const void * const*)&m, 1,
545		&drb, &nb);
546
547	/* free unused drbs. */
548	if (nb != 0)
549		stream_drb_free(s, &drb, 1);
550
551	return (n == 1) ? 0 : -ENOBUFS;
552}
553
554static inline int
555send_ctrl_pkt(struct tle_tcp_stream *s, struct rte_mbuf *m, uint32_t seq,
556	uint32_t flags)
557{
558	const struct tle_dest *dst;
559	uint32_t pid, type;
560	int32_t rc;
561
562	dst = &s->tx.dst;
563	type = s->s.type;
564	pid = rte_atomic32_add_return(&dst->dev->tx.packet_id[type], 1) - 1;
565
566	rc = tcp_fill_mbuf(m, s, dst, 0, s->s.port, seq, flags, pid, 1);
567	if (rc == 0)
568		rc = send_pkt(s, dst->dev, m);
569
570	return rc;
571}
572
573static inline int
574send_rst(struct tle_tcp_stream *s, uint32_t seq)
575{
576	struct rte_mbuf *m;
577	int32_t rc;
578
579	m = rte_pktmbuf_alloc(s->tx.dst.head_mp);
580	if (m == NULL)
581		return -ENOMEM;
582
583	rc = send_ctrl_pkt(s, m, seq, TCP_FLAG_RST);
584	if (rc != 0)
585		rte_pktmbuf_free(m);
586
587	return rc;
588}
589
590static inline int
591send_ack(struct tle_tcp_stream *s, uint32_t tms, uint32_t flags)
592{
593	struct rte_mbuf *m;
594	uint32_t seq;
595	int32_t rc;
596
597	m = rte_pktmbuf_alloc(s->tx.dst.head_mp);
598	if (m == NULL)
599		return -ENOMEM;
600
601	seq = s->tcb.snd.nxt - ((flags & (TCP_FLAG_FIN | TCP_FLAG_SYN)) != 0);
602	s->tcb.snd.ts = tms;
603
604	rc = send_ctrl_pkt(s, m, seq, flags);
605	if (rc != 0) {
606		rte_pktmbuf_free(m);
607		return rc;
608	}
609
610	s->tcb.snd.ack = s->tcb.rcv.nxt;
611	return 0;
612}
613
614
615static int
616sync_ack(struct tle_tcp_stream *s, const union pkt_info *pi,
617	const union seg_info *si, uint32_t ts, struct rte_mbuf *m)
618{
619	uint16_t len;
620	int32_t rc;
621	uint32_t pid, seq, type;
622	struct tle_dev *dev;
623	const void *da;
624	struct tle_dest dst;
625	const struct tcp_hdr *th;
626
627	type = s->s.type;
628
629	/* get destination information. */
630	if (type == TLE_V4)
631		da = &pi->addr4.src;
632	else
633		da = &pi->addr6->src;
634
635	rc = stream_get_dest(&s->s, da, &dst);
636	if (rc < 0)
637		return rc;
638
639	th = rte_pktmbuf_mtod_offset(m, const struct tcp_hdr *,
640		m->l2_len + m->l3_len);
641	get_syn_opts(&s->tcb.so, (uintptr_t)(th + 1), m->l4_len - sizeof(*th));
642
643	s->tcb.rcv.nxt = si->seq + 1;
644	seq = sync_gen_seq(pi, s->tcb.rcv.nxt, ts, s->tcb.so.mss,
645				s->s.ctx->prm.hash_alg,
646				&s->s.ctx->prm.secret_key);
647	s->tcb.so.ts.ecr = s->tcb.so.ts.val;
648	s->tcb.so.ts.val = sync_gen_ts(ts, s->tcb.so.wscale);
649	s->tcb.so.wscale = (s->tcb.so.wscale == TCP_WSCALE_NONE) ?
650		TCP_WSCALE_NONE : TCP_WSCALE_DEFAULT;
651	s->tcb.so.mss = calc_smss(dst.mtu, &dst);
652
653	/* reset mbuf's data contents. */
654	len = m->l2_len + m->l3_len + m->l4_len;
655	m->tx_offload = 0;
656	if (rte_pktmbuf_adj(m, len) == NULL)
657		return -EINVAL;
658
659	dev = dst.dev;
660	pid = rte_atomic32_add_return(&dev->tx.packet_id[type], 1) - 1;
661
662	rc = tcp_fill_mbuf(m, s, &dst, 0, pi->port, seq,
663		TCP_FLAG_SYN | TCP_FLAG_ACK, pid, 1);
664	if (rc == 0)
665		rc = send_pkt(s, dev, m);
666
667	return rc;
668}
669
670/*
671 * RFC 793:
672 * There are four cases for the acceptability test for an incoming segment:
673 * Segment Receive  Test
674 * Length  Window
675 * ------- -------  -------------------------------------------
676 *    0       0     SEG.SEQ = RCV.NXT
677 *    0      >0     RCV.NXT =< SEG.SEQ < RCV.NXT+RCV.WND
678 *   >0       0     not acceptable
679 *   >0      >0     RCV.NXT =< SEG.SEQ < RCV.NXT+RCV.WND
680 *                  or RCV.NXT =< SEG.SEQ+SEG.LEN-1 < RCV.NXT+RCV.WND
681 */
682static inline int
683check_seqn(const struct tcb *tcb, uint32_t seqn, uint32_t len)
684{
685	uint32_t n;
686
687	n = seqn + len;
688	if (seqn - tcb->rcv.nxt >= tcb->rcv.wnd &&
689			n - tcb->rcv.nxt > tcb->rcv.wnd)
690		return -ERANGE;
691
692	return 0;
693}
694
695static inline union tsopt
696rx_tms_opt(const struct tcb *tcb, const struct rte_mbuf *mb)
697{
698	union tsopt ts;
699	uintptr_t opt;
700	const struct tcp_hdr *th;
701
702	if (tcb->so.ts.val != 0) {
703		opt = rte_pktmbuf_mtod_offset(mb, uintptr_t,
704			mb->l2_len + mb->l3_len + sizeof(*th));
705		ts = get_tms_opts(opt, mb->l4_len - sizeof(*th));
706	} else
707		ts.raw = 0;
708
709	return ts;
710}
711
712/*
713 * PAWS and sequence check.
714 * RFC 1323 4.2.1
715 */
716static inline int
717rx_check_seq(struct tcb *tcb, uint32_t seq, uint32_t len, const union tsopt ts)
718{
719	int32_t rc;
720
721	/* RFC 1323 4.2.1 R2 */
722	rc = check_seqn(tcb, seq, len);
723	if (rc < 0)
724		return rc;
725
726	if (ts.raw != 0) {
727
728		/* RFC 1323 4.2.1 R1 */
729		if (tcp_seq_lt(ts.val, tcb->rcv.ts))
730			return -ERANGE;
731
732		/* RFC 1323 4.2.1 R3 */
733		if (tcp_seq_leq(seq, tcb->snd.ack) &&
734				tcp_seq_lt(tcb->snd.ack, seq + len))
735			tcb->rcv.ts = ts.val;
736	}
737
738	return rc;
739}
740
741static inline int
742rx_check_ack(const struct tcb *tcb, uint32_t ack)
743{
744	uint32_t max;
745
746	max = (uint32_t)RTE_MAX(tcb->snd.nxt, tcb->snd.rcvr);
747
748	if (tcp_seq_leq(tcb->snd.una, ack) && tcp_seq_leq(ack, max))
749		return 0;
750
751	return -ERANGE;
752}
753
754static inline int
755rx_check_seqack(struct tcb *tcb, uint32_t seq, uint32_t ack, uint32_t len,
756	const union tsopt ts)
757{
758	int32_t rc;
759
760	rc = rx_check_seq(tcb, seq, len, ts);
761	rc |= rx_check_ack(tcb, ack);
762	return rc;
763}
764
765static inline int
766restore_syn_opt(struct syn_opts *so, const union pkt_info *pi,
767	const union seg_info *si, uint32_t ts, const struct rte_mbuf *mb,
768	uint32_t hash_alg, rte_xmm_t *secret_key)
769{
770	int32_t rc;
771	uint32_t len;
772	const struct tcp_hdr *th;
773
774	/* check that ACK, etc fields are what we expected. */
775	rc = sync_check_ack(pi, si->seq, si->ack - 1, ts,
776				hash_alg,
777				secret_key);
778	if (rc < 0)
779		return rc;
780
781	so->mss = rc;
782
783	th = rte_pktmbuf_mtod_offset(mb, const struct tcp_hdr *,
784		mb->l2_len + mb->l3_len);
785	len = mb->l4_len - sizeof(*th);
786	sync_get_opts(so, (uintptr_t)(th + 1), len);
787	return 0;
788}
789
790static inline void
791stream_term(struct tle_tcp_stream *s)
792{
793	struct sdr *dr;
794
795	s->tcb.state = TCP_ST_CLOSED;
796	rte_smp_wmb();
797
798	timer_stop(s);
799
800	/* close() was already invoked, schedule final cleanup */
801	if ((s->tcb.uop & TCP_OP_CLOSE) != 0) {
802
803		dr = CTX_TCP_SDR(s->s.ctx);
804		STAILQ_INSERT_TAIL(&dr->be, &s->s, link);
805
806	/* notify user that stream need to be closed */
807	} else if (s->err.ev != NULL)
808		tle_event_raise(s->err.ev);
809	else if (s->err.cb.func != NULL)
810		s->err.cb.func(s->err.cb.data, &s->s);
811}
812
813static inline int
814stream_fill_dest(struct tle_tcp_stream *s)
815{
816	int32_t rc;
817	const void *da;
818
819	if (s->s.type == TLE_V4)
820		da = &s->s.ipv4.addr.src;
821	else
822		da = &s->s.ipv6.addr.src;
823
824	rc = stream_get_dest(&s->s, da, &s->tx.dst);
825	return (rc < 0) ? rc : 0;
826}
827
828/*
829 * helper function, prepares a new accept stream.
830 */
831static inline int
832accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st,
833	struct tle_tcp_stream *cs, const struct syn_opts *so,
834	uint32_t tms, const union pkt_info *pi, const union seg_info *si)
835{
836	int32_t rc;
837	uint32_t rtt;
838
839	/* some TX still pending for that stream. */
840	if (TCP_STREAM_TX_PENDING(cs))
841		return -EAGAIN;
842
843	/* setup L4 ports and L3 addresses fields. */
844	cs->s.port.raw = pi->port.raw;
845	cs->s.pmsk.raw = UINT32_MAX;
846
847	if (pi->tf.type == TLE_V4) {
848		cs->s.ipv4.addr = pi->addr4;
849		cs->s.ipv4.mask.src = INADDR_NONE;
850		cs->s.ipv4.mask.dst = INADDR_NONE;
851	} else if (pi->tf.type == TLE_V6) {
852		cs->s.ipv6.addr = *pi->addr6;
853		rte_memcpy(&cs->s.ipv6.mask.src, &tle_ipv6_none,
854			sizeof(cs->s.ipv6.mask.src));
855		rte_memcpy(&cs->s.ipv6.mask.dst, &tle_ipv6_none,
856			sizeof(cs->s.ipv6.mask.dst));
857	}
858
859	/* setup TCB */
860	sync_fill_tcb(&cs->tcb, si, so);
861	cs->tcb.rcv.wnd = calc_rx_wnd(cs, cs->tcb.rcv.wscale);
862
863	/*
864	 * estimate the rto
865	 * for now rtt is calculated based on the tcp TMS option,
866	 * later add real-time one
867	 */
868	if (cs->tcb.so.ts.ecr) {
869		rtt = tms - cs->tcb.so.ts.ecr;
870		rto_estimate(&cs->tcb, rtt);
871	} else
872		cs->tcb.snd.rto = TCP_RTO_DEFAULT;
873
874	/* copy streams type. */
875	cs->s.type = ps->s.type;
876
877	/* retrive and cache destination information. */
878	rc = stream_fill_dest(cs);
879	if (rc != 0)
880		return rc;
881
882	/* update snd.mss with SMSS value */
883	cs->tcb.snd.mss = calc_smss(cs->tcb.snd.mss, &cs->tx.dst);
884
885	/* setup congestion variables */
886	cs->tcb.snd.cwnd = initial_cwnd(cs->tcb.snd.mss);
887	cs->tcb.snd.ssthresh = cs->tcb.snd.wnd;
888
889	cs->tcb.state = TCP_ST_ESTABLISHED;
890
891	/* add stream to the table */
892	cs->ste = stbl_add_stream(st, pi, cs);
893	if (cs->ste == NULL)
894		return -ENOBUFS;
895
896	cs->tcb.uop |= TCP_OP_ACCEPT;
897	tcp_stream_up(cs);
898	return 0;
899}
900
901
902/*
903 * ACK for new connection request arrived.
904 * Check that the packet meets all conditions and try to open a new stream.
905 * returns:
906 * < 0  - invalid packet
907 * == 0 - packet is valid and new stream was opened for it.
908 * > 0  - packet is valid, but failed to open new stream.
909 */
910static inline int
911rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st,
912	const union pkt_info *pi, const union seg_info *si,
913	uint32_t tms, struct rte_mbuf *mb, struct tle_tcp_stream **csp)
914{
915	int32_t rc;
916	struct tle_ctx *ctx;
917	struct tle_stream *ts;
918	struct tle_tcp_stream *cs;
919	struct syn_opts so;
920
921	*csp = NULL;
922
923	if (pi->tf.flags != TCP_FLAG_ACK || rx_check_stream(s, pi) != 0)
924		return -EINVAL;
925
926	ctx = s->s.ctx;
927	rc = restore_syn_opt(&so, pi, si, tms, mb, ctx->prm.hash_alg,
928				&ctx->prm.secret_key);
929	if (rc < 0)
930		return rc;
931
932	/* allocate new stream */
933	ts = get_stream(ctx);
934	cs = TCP_STREAM(ts);
935	if (ts == NULL)
936		return ENFILE;
937
938	/* prepare stream to handle new connection */
939	if (accept_prep_stream(s, st, cs, &so, tms, pi, si) == 0) {
940
941		/* put new stream in the accept queue */
942		if (_rte_ring_enqueue_burst(s->rx.q,
943				(void * const *)&ts, 1) == 1) {
944			*csp = cs;
945			return 0;
946		}
947
948		/* cleanup on failure */
949		tcp_stream_down(cs);
950		stbl_del_pkt(st, cs->ste, pi);
951		cs->ste = NULL;
952	}
953
954	tcp_stream_reset(ctx, cs);
955	return ENOBUFS;
956}
957
958static inline int
959data_pkt_adjust(const struct tcb *tcb, struct rte_mbuf *mb, uint32_t hlen,
960	uint32_t *seqn, uint32_t *plen)
961{
962	uint32_t len, n, seq;
963
964	seq = *seqn;
965	len = *plen;
966
967	rte_pktmbuf_adj(mb, hlen);
968	if (len == 0)
969		return -ENODATA;
970	/* cut off the start of the packet */
971	else if (tcp_seq_lt(seq, tcb->rcv.nxt)) {
972		n = tcb->rcv.nxt - seq;
973		if (n >= len)
974			return -ENODATA;
975
976		rte_pktmbuf_adj(mb, n);
977		*seqn = seq + n;
978		*plen = len - n;
979	}
980
981	return 0;
982}
983
984static inline uint32_t
985rx_ackdata(struct tle_tcp_stream *s, uint32_t ack)
986{
987	uint32_t k, n;
988
989	n = ack - (uint32_t)s->tcb.snd.una;
990
991	/* some more data was acked. */
992	if (n != 0) {
993
994		/* advance SND.UNA and free related packets. */
995		k = rte_ring_free_count(s->tx.q);
996		free_una_data(s, n);
997
998		/* mark the stream as available for writing */
999		if (rte_ring_free_count(s->tx.q) != 0) {
1000			if (s->tx.ev != NULL)
1001				tle_event_raise(s->tx.ev);
1002			else if (k == 0 && s->tx.cb.func != NULL)
1003				s->tx.cb.func(s->tx.cb.data, &s->s);
1004		}
1005	}
1006
1007	return n;
1008}
1009
1010static void
1011rx_fin_state(struct tle_tcp_stream *s, struct resp_info *rsp)
1012{
1013	uint32_t state;
1014	int32_t ackfin;
1015
1016	s->tcb.rcv.nxt += 1;
1017
1018	ackfin = (s->tcb.snd.una == s->tcb.snd.fss);
1019	state = s->tcb.state;
1020
1021	if (state == TCP_ST_ESTABLISHED) {
1022		s->tcb.state = TCP_ST_CLOSE_WAIT;
1023		/* raise err.ev & err.cb */
1024		if (s->err.ev != NULL)
1025			tle_event_raise(s->err.ev);
1026		else if (s->err.cb.func != NULL)
1027			s->err.cb.func(s->err.cb.data, &s->s);
1028	} else if (state == TCP_ST_FIN_WAIT_1 || state == TCP_ST_CLOSING) {
1029		rsp->flags |= TCP_FLAG_ACK;
1030		if (ackfin != 0) {
1031			s->tcb.state = TCP_ST_TIME_WAIT;
1032			s->tcb.snd.rto = TCP_RTO_2MSL;
1033			timer_reset(s);
1034		} else
1035			s->tcb.state = TCP_ST_CLOSING;
1036	} else if (state == TCP_ST_FIN_WAIT_2) {
1037		rsp->flags |= TCP_FLAG_ACK;
1038		s->tcb.state = TCP_ST_TIME_WAIT;
1039		s->tcb.snd.rto = TCP_RTO_2MSL;
1040		timer_reset(s);
1041	} else if (state == TCP_ST_LAST_ACK && ackfin != 0) {
1042		stream_term(s);
1043	}
1044}
1045
1046/*
1047 * FIN process for ESTABLISHED state
1048 * returns:
1049 * 0 < - error occurred
1050 * 0 - FIN was processed OK, and mbuf can be free/reused.
1051 * 0 > - FIN was processed OK and mbuf can't be free/reused.
1052 */
1053static inline int
1054rx_fin(struct tle_tcp_stream *s, uint32_t state,
1055	const union seg_info *si, struct rte_mbuf *mb,
1056	struct resp_info *rsp)
1057{
1058	uint32_t hlen, plen, seq;
1059	int32_t ret;
1060	union tsopt ts;
1061
1062	hlen = PKT_L234_HLEN(mb);
1063	plen = mb->pkt_len - hlen;
1064	seq = si->seq;
1065
1066	ts = rx_tms_opt(&s->tcb, mb);
1067	ret = rx_check_seqack(&s->tcb, seq, si->ack, plen, ts);
1068	if (ret != 0)
1069		return ret;
1070
1071	if (state < TCP_ST_ESTABLISHED)
1072		return -EINVAL;
1073
1074	if (plen != 0) {
1075
1076		ret = data_pkt_adjust(&s->tcb, mb, hlen, &seq, &plen);
1077		if (ret != 0)
1078			return ret;
1079		if (rx_data_enqueue(s, seq, plen, &mb, 1) != 1)
1080			return -ENOBUFS;
1081	}
1082
1083	/*
1084	 * fast-path: all data & FIN was already sent out
1085	 * and now is acknowledged.
1086	 */
1087	if (s->tcb.snd.fss == s->tcb.snd.nxt &&
1088			si->ack == (uint32_t)s->tcb.snd.nxt) {
1089		s->tcb.snd.una = s->tcb.snd.fss;
1090		empty_tq(s);
1091	/* conventional ACK processiing */
1092	} else
1093		rx_ackdata(s, si->ack);
1094
1095	/* some fragments still missing */
1096	if (seq + plen != s->tcb.rcv.nxt) {
1097		s->tcb.rcv.frs.seq = seq + plen;
1098		s->tcb.rcv.frs.on = 1;
1099	} else
1100		rx_fin_state(s, rsp);
1101
1102	return plen;
1103}
1104
1105static inline int
1106rx_rst(struct tle_tcp_stream *s, uint32_t state, uint32_t flags,
1107	const union seg_info *si)
1108{
1109	int32_t rc;
1110
1111	/*
1112	 * RFC 793: In all states except SYN-SENT, all reset (RST) segments
1113	 * are validated by checking their SEQ-fields.
1114	 * A reset is valid if its sequence number is in the window.
1115	 * In the SYN-SENT state (a RST received in response to an initial SYN),
1116	 * the RST is acceptable if the ACK field acknowledges the SYN.
1117	 */
1118	if (state == TCP_ST_SYN_SENT) {
1119		rc = ((flags & TCP_FLAG_ACK) == 0 ||
1120				si->ack != s->tcb.snd.nxt) ?
1121			-ERANGE : 0;
1122	}
1123
1124	else
1125		rc = check_seqn(&s->tcb, si->seq, 0);
1126
1127	if (rc == 0)
1128		stream_term(s);
1129
1130	return rc;
1131}
1132
1133/*
1134 *  check do we have FIN  that was received out-of-order.
1135 *  if yes, try to process it now.
1136 */
1137static inline void
1138rx_ofo_fin(struct tle_tcp_stream *s, struct resp_info *rsp)
1139{
1140	if (s->tcb.rcv.frs.on != 0 && s->tcb.rcv.nxt == s->tcb.rcv.frs.seq)
1141		rx_fin_state(s, rsp);
1142}
1143
1144static inline void
1145dack_info_init(struct dack_info *tack, const struct tcb *tcb)
1146{
1147	memset(tack, 0, sizeof(*tack));
1148	tack->ack = tcb->snd.una;
1149	tack->segs.dup = tcb->rcv.dupack;
1150	tack->wu.raw = tcb->snd.wu.raw;
1151	tack->wnd = tcb->snd.wnd >> tcb->snd.wscale;
1152}
1153
1154static inline void
1155ack_window_update(struct tcb *tcb, const struct dack_info *tack)
1156{
1157	tcb->snd.wu.raw = tack->wu.raw;
1158	tcb->snd.wnd = tack->wnd << tcb->snd.wscale;
1159}
1160
1161static inline void
1162ack_cwnd_update(struct tcb *tcb, uint32_t acked, const struct dack_info *tack)
1163{
1164	uint32_t n;
1165
1166	n = tack->segs.ack * tcb->snd.mss;
1167
1168	/* slow start phase, RFC 5681 3.1 (2)  */
1169	if (tcb->snd.cwnd < tcb->snd.ssthresh)
1170		tcb->snd.cwnd += RTE_MIN(acked, n);
1171	/* congestion avoidance phase, RFC 5681 3.1 (3) */
1172	else
1173		tcb->snd.cwnd += RTE_MAX(1U, n * tcb->snd.mss / tcb->snd.cwnd);
1174}
1175
1176static inline void
1177rto_ssthresh_update(struct tcb *tcb)
1178{
1179	uint32_t k, n;
1180
1181	/* RFC 5681 3.1 (4)  */
1182	n = (tcb->snd.nxt - tcb->snd.una) / 2;
1183	k = 2 * tcb->snd.mss;
1184	tcb->snd.ssthresh = RTE_MAX(n, k);
1185}
1186
1187static inline void
1188rto_cwnd_update(struct tcb *tcb)
1189{
1190
1191	if (tcb->snd.nb_retx == 0)
1192		rto_ssthresh_update(tcb);
1193
1194	/*
1195	 * RFC 5681 3.1: upon a timeout cwnd MUST be set to
1196	 * no more than 1 full-sized segment.
1197	 */
1198	tcb->snd.cwnd = tcb->snd.mss;
1199}
1200
1201static inline void
1202ack_info_update(struct dack_info *tack, const union seg_info *si,
1203	int32_t badseq, uint32_t dlen, const union tsopt ts)
1204{
1205	if (badseq != 0) {
1206		tack->segs.badseq++;
1207		return;
1208	}
1209
1210	/* segnt with incoming data */
1211	tack->segs.data += (dlen != 0);
1212
1213	/* segment with newly acked data */
1214	if (tcp_seq_lt(tack->ack, si->ack)) {
1215		tack->segs.dup = 0;
1216		tack->segs.ack++;
1217		tack->ack = si->ack;
1218		tack->ts = ts;
1219
1220	/*
1221	 * RFC 5681: An acknowledgment is considered a "duplicate" when:
1222	 * (a) the receiver of the ACK has outstanding data
1223	 * (b) the incoming acknowledgment carries no data
1224	 * (c) the SYN and FIN bits are both off
1225	 * (d) the acknowledgment number is equal to the TCP.UNA
1226	 * (e) the advertised window in the incoming acknowledgment equals the
1227	 * advertised window in the last incoming acknowledgment.
1228	 *
1229	 * Here will have only to check only for (b),(d),(e).
1230	 * (a) will be checked later for the whole bulk of packets,
1231	 * (c) should never happen here.
1232	 */
1233	} else if (dlen == 0 && si->wnd == tack->wnd && ++tack->segs.dup == 3) {
1234		tack->dup3.seg = tack->segs.ack + 1;
1235		tack->dup3.ack = tack->ack;
1236	}
1237
1238	/*
1239	 * RFC 793:
1240	 * If SND.UNA < SEG.ACK =< SND.NXT, the send window should be
1241	 * updated.  If (SND.WL1 < SEG.SEQ or (SND.WL1 = SEG.SEQ and
1242	 * SND.WL2 =< SEG.ACK)), set SND.WND <- SEG.WND, set
1243	 * SND.WL1 <- SEG.SEQ, and set SND.WL2 <- SEG.ACK.
1244	 */
1245	if (tcp_seq_lt(tack->wu.wl1, si->seq) ||
1246			(si->seq == tack->wu.wl1 &&
1247			tcp_seq_leq(tack->wu.wl2, si->ack))) {
1248
1249		tack->wu.wl1 = si->seq;
1250		tack->wu.wl2 = si->ack;
1251		tack->wnd = si->wnd;
1252	}
1253}
1254
1255static inline uint32_t
1256rx_data_ack(struct tle_tcp_stream *s, struct dack_info *tack,
1257	const union seg_info si[], struct rte_mbuf *mb[], struct rte_mbuf *rp[],
1258	int32_t rc[], uint32_t num)
1259{
1260	uint32_t i, j, k, n, t;
1261	uint32_t hlen, plen, seq, tlen;
1262	int32_t ret;
1263	union tsopt ts;
1264
1265	k = 0;
1266	for (i = 0; i != num; i = j) {
1267
1268		hlen = PKT_L234_HLEN(mb[i]);
1269		plen = mb[i]->pkt_len - hlen;
1270		seq = si[i].seq;
1271
1272		ts = rx_tms_opt(&s->tcb, mb[i]);
1273		ret = rx_check_seqack(&s->tcb, seq, si[i].ack, plen, ts);
1274
1275		/* account segment received */
1276		ack_info_update(tack, &si[i], ret != 0, plen, ts);
1277
1278		if (ret == 0) {
1279			/* skip duplicate data, if any */
1280			ret = data_pkt_adjust(&s->tcb, mb[i], hlen,
1281				&seq, &plen);
1282		}
1283
1284		j = i + 1;
1285		if (ret != 0) {
1286			rp[k] = mb[i];
1287			rc[k] = -ret;
1288			k++;
1289			continue;
1290		}
1291
1292		/* group sequential packets together. */
1293		for (tlen = plen; j != num; tlen += plen, j++) {
1294
1295			hlen = PKT_L234_HLEN(mb[j]);
1296			plen = mb[j]->pkt_len - hlen;
1297
1298			/* not consecutive packet */
1299			if (plen == 0 || seq + tlen != si[j].seq)
1300				break;
1301
1302			/* check SEQ/ACK */
1303			ts = rx_tms_opt(&s->tcb, mb[j]);
1304			ret = rx_check_seqack(&s->tcb, si[j].seq, si[j].ack,
1305				plen, ts);
1306
1307			/* account for segment received */
1308			ack_info_update(tack, &si[j], ret != 0, plen, ts);
1309
1310			if (ret != 0) {
1311				rp[k] = mb[j];
1312				rc[k] = -ret;
1313				k++;
1314				break;
1315			}
1316			rte_pktmbuf_adj(mb[j], hlen);
1317		}
1318
1319		n = j - i;
1320		j += (ret != 0);
1321
1322		/* account for OFO data */
1323		if (seq != s->tcb.rcv.nxt)
1324			tack->segs.ofo += n;
1325
1326		/* enqueue packets */
1327		t = rx_data_enqueue(s, seq, tlen, mb + i, n);
1328
1329		/* if we are out of space in stream recv buffer. */
1330		for (; t != n; t++) {
1331			rp[k] = mb[i + t];
1332			rc[k] = -ENOBUFS;
1333			k++;
1334		}
1335	}
1336
1337	return num - k;
1338}
1339
1340static inline void
1341start_fast_retransmit(struct tle_tcp_stream *s)
1342{
1343	struct tcb *tcb;
1344
1345	tcb = &s->tcb;
1346
1347	/* RFC 6582 3.2.2 */
1348	tcb->snd.rcvr = tcb->snd.nxt;
1349	tcb->snd.fastack = 1;
1350
1351	/* RFC 5681 3.2.2 */
1352	rto_ssthresh_update(tcb);
1353
1354	/* RFC 5681 3.2.3 */
1355	tcp_txq_rst_nxt_head(s);
1356	tcb->snd.nxt = tcb->snd.una;
1357	tcb->snd.cwnd = tcb->snd.ssthresh + 3 * tcb->snd.mss;
1358}
1359
1360static inline void
1361stop_fast_retransmit(struct tle_tcp_stream *s)
1362{
1363	struct tcb *tcb;
1364	uint32_t n;
1365
1366	tcb = &s->tcb;
1367	n = tcb->snd.nxt - tcb->snd.una;
1368	tcb->snd.cwnd = RTE_MIN(tcb->snd.ssthresh,
1369		RTE_MAX(n, tcb->snd.mss) + tcb->snd.mss);
1370	tcb->snd.fastack = 0;
1371}
1372
1373static inline int
1374in_fast_retransmit(struct tle_tcp_stream *s, uint32_t ack_len, uint32_t ack_num,
1375	uint32_t dup_num)
1376{
1377	uint32_t n;
1378	struct tcb *tcb;
1379
1380	tcb = &s->tcb;
1381
1382	/* RFC 5682 3.2.3 partial ACK */
1383	if (ack_len != 0) {
1384
1385		n = ack_num * tcb->snd.mss;
1386		if (ack_len >= n)
1387			tcb->snd.cwnd -= ack_len - n;
1388		else
1389			tcb->snd.cwnd -= ack_len % tcb->snd.mss;
1390
1391		/*
1392		 * For the first partial ACK that arrives
1393		 * during fast recovery, also reset the
1394		 * retransmit timer.
1395		 */
1396		if (tcb->snd.fastack == 1)
1397			timer_reset(s);
1398
1399		tcb->snd.fastack += ack_num;
1400		return 1;
1401
1402	/* RFC 5681 3.2.4 */
1403	} else if (dup_num > 3) {
1404		s->tcb.snd.cwnd += (dup_num - 3) * tcb->snd.mss;
1405		return 1;
1406	}
1407
1408	return 0;
1409}
1410
1411static inline int
1412process_ack(struct tle_tcp_stream *s, uint32_t acked,
1413	const struct dack_info *tack)
1414{
1415	int32_t send;
1416
1417	send = 0;
1418
1419	/* normal mode */
1420	if (s->tcb.snd.fastack == 0) {
1421
1422		send = 1;
1423
1424		/* RFC 6582 3.2.2 switch to fast retransmit mode */
1425		if (tack->dup3.seg != 0 && s->tcb.snd.una != s->tcb.snd.nxt &&
1426				s->tcb.snd.una >= s->tcb.snd.rcvr) {
1427
1428			start_fast_retransmit(s);
1429			in_fast_retransmit(s,
1430				tack->ack - tack->dup3.ack,
1431				tack->segs.ack - tack->dup3.seg - 1,
1432				tack->segs.dup);
1433
1434		/* remain in normal mode */
1435		} else if (acked != 0) {
1436			ack_cwnd_update(&s->tcb, acked, tack);
1437			timer_stop(s);
1438		}
1439
1440	/* fast retransmit mode */
1441	} else {
1442
1443		/* remain in fast retransmit mode */
1444		if (s->tcb.snd.una < s->tcb.snd.rcvr) {
1445
1446			send = in_fast_retransmit(s, acked, tack->segs.ack,
1447				tack->segs.dup);
1448		} else {
1449			/* RFC 5682 3.2.3 full ACK */
1450			stop_fast_retransmit(s);
1451			timer_stop(s);
1452
1453			/* if we have another series of dup ACKs */
1454			if (tack->dup3.seg != 0 &&
1455					s->tcb.snd.una != s->tcb.snd.nxt &&
1456					tcp_seq_leq((uint32_t)s->tcb.snd.rcvr,
1457					tack->dup3.ack)) {
1458
1459				/* restart fast retransmit again. */
1460				start_fast_retransmit(s);
1461				send = in_fast_retransmit(s,
1462					tack->ack - tack->dup3.ack,
1463					tack->segs.ack - tack->dup3.seg - 1,
1464					tack->segs.dup);
1465			}
1466		}
1467	}
1468
1469	return send;
1470}
1471
1472/*
1473 * our FIN was acked, stop rto timer, change stream state,
1474 * and possibly close the stream.
1475 */
1476static inline void
1477rx_ackfin(struct tle_tcp_stream *s)
1478{
1479	uint32_t state;
1480
1481	s->tcb.snd.una = s->tcb.snd.fss;
1482	empty_tq(s);
1483
1484	state = s->tcb.state;
1485	if (state == TCP_ST_LAST_ACK)
1486		stream_term(s);
1487	else if (state == TCP_ST_FIN_WAIT_1) {
1488		timer_stop(s);
1489		s->tcb.state = TCP_ST_FIN_WAIT_2;
1490	} else if (state == TCP_ST_CLOSING) {
1491		s->tcb.state = TCP_ST_TIME_WAIT;
1492		s->tcb.snd.rto = TCP_RTO_2MSL;
1493		timer_reset(s);
1494	}
1495}
1496
1497static inline void
1498rx_process_ack(struct tle_tcp_stream *s, uint32_t ts,
1499	const struct dack_info *tack)
1500{
1501	int32_t send;
1502	uint32_t n;
1503
1504	s->tcb.rcv.dupack = tack->segs.dup;
1505
1506	n = rx_ackdata(s, tack->ack);
1507	send = process_ack(s, n, tack);
1508
1509	/* try to send more data. */
1510	if ((n != 0 || send != 0) && tcp_txq_nxt_cnt(s) != 0)
1511		txs_enqueue(s->s.ctx, s);
1512
1513	/* restart RTO timer. */
1514	if (s->tcb.snd.nxt != s->tcb.snd.una)
1515		timer_start(s);
1516
1517	/* update rto, if fresh packet is here then calculate rtt */
1518	if (tack->ts.ecr != 0)
1519		rto_estimate(&s->tcb, ts - tack->ts.ecr);
1520}
1521
1522/*
1523 * process <SYN,ACK>
1524 * returns negative value on failure, or zero on success.
1525 */
1526static inline int
1527rx_synack(struct tle_tcp_stream *s, uint32_t ts, uint32_t state,
1528	const union seg_info *si, struct rte_mbuf *mb,
1529	struct resp_info *rsp)
1530{
1531	struct syn_opts so;
1532	struct tcp_hdr *th;
1533
1534	if (state != TCP_ST_SYN_SENT)
1535		return -EINVAL;
1536
1537	/* invalid SEG.SEQ */
1538	if (si->ack != (uint32_t)s->tcb.snd.nxt) {
1539		rsp->flags = TCP_FLAG_RST;
1540		return 0;
1541	}
1542
1543	th = rte_pktmbuf_mtod_offset(mb, struct tcp_hdr *,
1544		mb->l2_len + mb->l3_len);
1545	get_syn_opts(&so, (uintptr_t)(th + 1), mb->l4_len - sizeof(*th));
1546
1547	s->tcb.so = so;
1548
1549	s->tcb.snd.una = s->tcb.snd.nxt;
1550	s->tcb.snd.mss = calc_smss(so.mss, &s->tx.dst);
1551	s->tcb.snd.wnd = si->wnd << so.wscale;
1552	s->tcb.snd.wu.wl1 = si->seq;
1553	s->tcb.snd.wu.wl2 = si->ack;
1554	s->tcb.snd.wscale = so.wscale;
1555
1556	/* setup congestion variables */
1557	s->tcb.snd.cwnd = initial_cwnd(s->tcb.snd.mss);
1558	s->tcb.snd.ssthresh = s->tcb.snd.wnd;
1559
1560	s->tcb.rcv.ts = so.ts.val;
1561	s->tcb.rcv.irs = si->seq;
1562	s->tcb.rcv.nxt = si->seq + 1;
1563
1564	/* if peer doesn't support WSCALE opt, recalculate RCV.WND */
1565	s->tcb.rcv.wscale = (so.wscale == TCP_WSCALE_NONE) ?
1566		TCP_WSCALE_NONE : TCP_WSCALE_DEFAULT;
1567	s->tcb.rcv.wnd = calc_rx_wnd(s, s->tcb.rcv.wscale);
1568
1569	/* calculate initial rto */
1570	rto_estimate(&s->tcb, ts - s->tcb.snd.ts);
1571
1572	rsp->flags |= TCP_FLAG_ACK;
1573
1574	timer_stop(s);
1575	s->tcb.state = TCP_ST_ESTABLISHED;
1576	rte_smp_wmb();
1577
1578	if (s->tx.ev != NULL)
1579		tle_event_raise(s->tx.ev);
1580	else if (s->tx.cb.func != NULL)
1581		s->tx.cb.func(s->tx.cb.data, &s->s);
1582
1583	return 0;
1584}
1585
1586static inline uint32_t
1587rx_stream(struct tle_tcp_stream *s, uint32_t ts,
1588	const union pkt_info *pi, const union seg_info si[],
1589	struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1590	uint32_t num)
1591{
1592	uint32_t i, k, n, state;
1593	int32_t ret;
1594	struct resp_info rsp;
1595	struct dack_info tack;
1596
1597	k = 0;
1598	rsp.flags = 0;
1599
1600	state = s->tcb.state;
1601
1602	/*
1603	 * first check for the states/flags where we don't
1604	 * expect groups of packets.
1605	 */
1606
1607	/* process RST */
1608	if ((pi->tf.flags & TCP_FLAG_RST) != 0) {
1609		for (i = 0;
1610				i != num &&
1611				rx_rst(s, state, pi->tf.flags, &si[i]);
1612				i++)
1613			;
1614		i = 0;
1615
1616	/* RFC 793: if the ACK bit is off drop the segment and return */
1617	} else if ((pi->tf.flags & TCP_FLAG_ACK) == 0) {
1618		i = 0;
1619	/*
1620	 * first check for the states/flags where we don't
1621	 * expect groups of packets.
1622	 */
1623
1624	/* process <SYN,ACK> */
1625	} else if ((pi->tf.flags & TCP_FLAG_SYN) != 0) {
1626		ret = 0;
1627		for (i = 0; i != num; i++) {
1628			ret = rx_synack(s, ts, state, &si[i], mb[i], &rsp);
1629			if (ret == 0)
1630				break;
1631
1632			rc[k] = -ret;
1633			rp[k] = mb[i];
1634			k++;
1635		}
1636
1637	/* process FIN */
1638	} else if ((pi->tf.flags & TCP_FLAG_FIN) != 0) {
1639		ret = 0;
1640		for (i = 0; i != num; i++) {
1641			ret = rx_fin(s, state, &si[i], mb[i], &rsp);
1642			if (ret >= 0)
1643				break;
1644
1645			rc[k] = -ret;
1646			rp[k] = mb[i];
1647			k++;
1648		}
1649		i += (ret > 0);
1650
1651	/* normal data/ack packets */
1652	} else if (state >= TCP_ST_ESTABLISHED && state <= TCP_ST_LAST_ACK) {
1653
1654		/* process incoming data packets. */
1655		dack_info_init(&tack, &s->tcb);
1656		n = rx_data_ack(s, &tack, si, mb, rp, rc, num);
1657
1658		/* follow up actions based on aggregated information */
1659
1660		/* update SND.WND */
1661		ack_window_update(&s->tcb, &tack);
1662
1663		/*
1664		 * fast-path: all data & FIN was already sent out
1665		 * and now is acknowledged.
1666		 */
1667		if (s->tcb.snd.fss == s->tcb.snd.nxt &&
1668				tack.ack == (uint32_t)s->tcb.snd.nxt)
1669			rx_ackfin(s);
1670		else
1671			rx_process_ack(s, ts, &tack);
1672
1673		/*
1674		 * send an immediate ACK if either:
1675		 * - received segment with invalid seq/ack number
1676		 * - received segment with OFO data
1677		 * - received segment with INO data and no TX is scheduled
1678		 *   for that stream.
1679		 */
1680		if (tack.segs.badseq != 0 || tack.segs.ofo != 0 ||
1681				(tack.segs.data != 0 &&
1682				rte_atomic32_read(&s->tx.arm) == 0))
1683			rsp.flags |= TCP_FLAG_ACK;
1684
1685		rx_ofo_fin(s, &rsp);
1686
1687		k += num - n;
1688		i = num;
1689
1690	/* unhandled state, drop all packets. */
1691	} else
1692		i = 0;
1693
1694	/* we have a response packet to send. */
1695	if (rsp.flags == TCP_FLAG_RST) {
1696		send_rst(s, si[i].ack);
1697		stream_term(s);
1698	} else if (rsp.flags != 0) {
1699		send_ack(s, ts, rsp.flags);
1700
1701		/* start the timer for FIN packet */
1702		if ((rsp.flags & TCP_FLAG_FIN) != 0)
1703			timer_reset(s);
1704	}
1705
1706	/* unprocessed packets */
1707	for (; i != num; i++, k++) {
1708		rc[k] = EINVAL;
1709		rp[k] = mb[i];
1710	}
1711
1712	return num - k;
1713}
1714
1715static inline uint32_t
1716rx_new_stream(struct tle_tcp_stream *s, uint32_t ts,
1717	const union pkt_info *pi, const union seg_info si[],
1718	struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1719	uint32_t num)
1720{
1721	uint32_t i;
1722
1723	if (rwl_acquire(&s->rx.use) > 0) {
1724		i = rx_stream(s, ts, pi, si, mb, rp, rc, num);
1725		rwl_release(&s->rx.use);
1726		return i;
1727	}
1728
1729	for (i = 0; i != num; i++) {
1730		rc[i] = ENOENT;
1731		rp[i] = mb[i];
1732	}
1733	return 0;
1734}
1735
1736static inline uint32_t
1737rx_postsyn(struct tle_dev *dev, struct stbl *st, uint32_t type, uint32_t ts,
1738	const union pkt_info pi[], const union seg_info si[],
1739	struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1740	uint32_t num)
1741{
1742	struct tle_tcp_stream *cs, *s;
1743	uint32_t i, k, n, state;
1744	int32_t ret;
1745
1746	s = rx_obtain_stream(dev, st, &pi[0], type);
1747	if (s == NULL) {
1748		for (i = 0; i != num; i++) {
1749			rc[i] = ENOENT;
1750			rp[i] = mb[i];
1751		}
1752		return 0;
1753	}
1754
1755	k = 0;
1756	state = s->tcb.state;
1757
1758	if (state == TCP_ST_LISTEN) {
1759
1760		/* one connection per flow */
1761		cs = NULL;
1762		ret = -EINVAL;
1763		for (i = 0; i != num; i++) {
1764
1765			ret = rx_ack_listen(s, st, pi, &si[i], ts, mb[i], &cs);
1766
1767			/* valid packet encountered */
1768			if (ret >= 0)
1769				break;
1770
1771			/* invalid packet, keep trying to find a proper one */
1772			rc[k] = -ret;
1773			rp[k] = mb[i];
1774			k++;
1775		}
1776
1777		/* packet is valid, but we are out of streams to serve it */
1778		if (ret > 0) {
1779			for (; i != num; i++, k++) {
1780				rc[k] = ret;
1781				rp[k] = mb[i];
1782			}
1783		/* new stream is accepted */
1784		} else if (ret == 0) {
1785
1786			/* inform listen stream about new connections */
1787			if (s->rx.ev != NULL)
1788				tle_event_raise(s->rx.ev);
1789			else if (s->rx.cb.func != NULL &&
1790					rte_ring_count(s->rx.q) == 1)
1791				s->rx.cb.func(s->rx.cb.data, &s->s);
1792
1793			/* if there is no data, drop current packet */
1794			if (PKT_L4_PLEN(mb[i]) == 0) {
1795				rc[k] = ENODATA;
1796				rp[k++] = mb[i++];
1797			}
1798
1799			/*  process remaining packets for that stream */
1800			if (num != i) {
1801				n = rx_new_stream(cs, ts, pi + i, si + i,
1802					mb + i, rp + k, rc + k, num - i);
1803				k += num - n - i;
1804			}
1805		}
1806
1807	} else {
1808		i = rx_stream(s, ts, pi, si, mb, rp, rc, num);
1809		k = num - i;
1810	}
1811
1812	rwl_release(&s->rx.use);
1813	return num - k;
1814}
1815
1816
1817static inline uint32_t
1818rx_syn(struct tle_dev *dev, uint32_t type, uint32_t ts,
1819	const union pkt_info pi[], const union seg_info si[],
1820	struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1821	uint32_t num)
1822{
1823	struct tle_tcp_stream *s;
1824	uint32_t i, k;
1825	int32_t ret;
1826
1827	s = rx_obtain_listen_stream(dev, &pi[0], type);
1828	if (s == NULL) {
1829		for (i = 0; i != num; i++) {
1830			rc[i] = ENOENT;
1831			rp[i] = mb[i];
1832		}
1833		return 0;
1834	}
1835
1836	k = 0;
1837	for (i = 0; i != num; i++) {
1838
1839		/* check that this remote is allowed to connect */
1840		if (rx_check_stream(s, &pi[i]) != 0)
1841			ret = -ENOENT;
1842		else
1843			/* syncokie: reply with <SYN,ACK> */
1844			ret = sync_ack(s, &pi[i], &si[i], ts, mb[i]);
1845
1846		if (ret != 0) {
1847			rc[k] = -ret;
1848			rp[k] = mb[i];
1849			k++;
1850		}
1851	}
1852
1853	rwl_release(&s->rx.use);
1854	return num - k;
1855}
1856
1857uint16_t
1858tle_tcp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[],
1859	struct rte_mbuf *rp[], int32_t rc[], uint16_t num)
1860{
1861	struct stbl *st;
1862	uint32_t i, j, k, n, t, ts;
1863	uint64_t csf;
1864	union pkt_info pi[num];
1865	union seg_info si[num];
1866	union {
1867		uint8_t t[TLE_VNUM];
1868		uint32_t raw;
1869	} stu;
1870
1871	ts = tcp_get_tms();
1872	st = CTX_TCP_STLB(dev->ctx);
1873
1874	stu.raw = 0;
1875
1876	/* extract packet info and check the L3/L4 csums */
1877	for (i = 0; i != num; i++) {
1878
1879		get_pkt_info(pkt[i], &pi[i], &si[i]);
1880
1881		t = pi[i].tf.type;
1882		csf = dev->rx.ol_flags[t] &
1883			(PKT_RX_IP_CKSUM_BAD | PKT_RX_L4_CKSUM_BAD);
1884
1885		/* check csums in SW */
1886		if (pi[i].csf == 0 && csf != 0 && check_pkt_csum(pkt[i], csf,
1887				pi[i].tf.type, IPPROTO_TCP) != 0)
1888			pi[i].csf = csf;
1889
1890		stu.t[t] = 1;
1891	}
1892
1893	if (stu.t[TLE_V4] != 0)
1894		stbl_lock(st, TLE_V4);
1895	if (stu.t[TLE_V6] != 0)
1896		stbl_lock(st, TLE_V6);
1897
1898	k = 0;
1899	for (i = 0; i != num; i += j) {
1900
1901		t = pi[i].tf.type;
1902
1903		/*basic checks for incoming packet */
1904		if (t >= TLE_VNUM || pi[i].csf != 0 || dev->dp[t] == NULL) {
1905			rc[k] = EINVAL;
1906			rp[k] = pkt[i];
1907			j = 1;
1908			k++;
1909		/* process input SYN packets */
1910		} else if (pi[i].tf.flags == TCP_FLAG_SYN) {
1911			j = pkt_info_bulk_syneq(pi + i, num - i);
1912			n = rx_syn(dev, t, ts, pi + i, si + i, pkt + i,
1913				rp + k, rc + k, j);
1914			k += j - n;
1915		} else {
1916			j = pkt_info_bulk_eq(pi + i, num - i);
1917			n = rx_postsyn(dev, st, t, ts, pi + i, si + i, pkt + i,
1918				rp + k, rc + k, j);
1919			k += j - n;
1920		}
1921	}
1922
1923	if (stu.t[TLE_V4] != 0)
1924		stbl_unlock(st, TLE_V4);
1925	if (stu.t[TLE_V6] != 0)
1926		stbl_unlock(st, TLE_V6);
1927
1928	return num - k;
1929}
1930
1931uint16_t
1932tle_tcp_stream_accept(struct tle_stream *ts, struct tle_stream *rs[],
1933	uint32_t num)
1934{
1935	uint32_t n;
1936	struct tle_tcp_stream *s;
1937
1938	s = TCP_STREAM(ts);
1939	n = _rte_ring_mc_dequeue_burst(s->rx.q, (void **)rs, num);
1940	if (n == 0)
1941		return 0;
1942
1943	/*
1944	 * if we still have packets to read,
1945	 * then rearm stream RX event.
1946	 */
1947	if (n == num && rte_ring_count(s->rx.q) != 0) {
1948		if (rwl_try_acquire(&s->rx.use) > 0 && s->rx.ev != NULL)
1949			tle_event_raise(s->rx.ev);
1950		rwl_release(&s->rx.use);
1951	}
1952
1953	return n;
1954}
1955
1956uint16_t
1957tle_tcp_tx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], uint16_t num)
1958{
1959	uint32_t i, j, k, n;
1960	struct tle_drb *drb[num];
1961	struct tle_tcp_stream *s;
1962
1963	/* extract packets from device TX queue. */
1964
1965	k = num;
1966	n = tle_dring_sc_dequeue(&dev->tx.dr, (const void **)(uintptr_t)pkt,
1967		num, drb, &k);
1968
1969	if (n == 0)
1970		return 0;
1971
1972	/* free empty drbs and notify related streams. */
1973
1974	for (i = 0; i != k; i = j) {
1975		s = drb[i]->udata;
1976		for (j = i + 1; j != k && s == drb[j]->udata; j++)
1977			;
1978		stream_drb_free(s, drb + i, j - i);
1979	}
1980
1981	return n;
1982}
1983
1984static inline void
1985stream_fill_pkt_info(const struct tle_tcp_stream *s, union pkt_info *pi)
1986{
1987	if (s->s.type == TLE_V4)
1988		pi->addr4 = s->s.ipv4.addr;
1989	else
1990		pi->addr6 = &s->s.ipv6.addr;
1991
1992	pi->port = s->s.port;
1993	pi->tf.type = s->s.type;
1994}
1995
1996static int
1997stream_fill_addr(struct tle_tcp_stream *s, const struct sockaddr *addr)
1998{
1999	const struct sockaddr_in *in4;
2000	const struct sockaddr_in6 *in6;
2001	const struct tle_dev_param *prm;
2002	int32_t rc;
2003
2004	rc = 0;
2005	s->s.pmsk.raw = UINT32_MAX;
2006
2007	/* setup L4 src ports and src address fields. */
2008	if (s->s.type == TLE_V4) {
2009		in4 = (const struct sockaddr_in *)addr;
2010		if (in4->sin_addr.s_addr == INADDR_ANY || in4->sin_port == 0)
2011			return -EINVAL;
2012
2013		s->s.port.src = in4->sin_port;
2014		s->s.ipv4.addr.src = in4->sin_addr.s_addr;
2015		s->s.ipv4.mask.src = INADDR_NONE;
2016		s->s.ipv4.mask.dst = INADDR_NONE;
2017
2018	} else if (s->s.type == TLE_V6) {
2019		in6 = (const struct sockaddr_in6 *)addr;
2020		if (memcmp(&in6->sin6_addr, &tle_ipv6_any,
2021				sizeof(tle_ipv6_any)) == 0 ||
2022				in6->sin6_port == 0)
2023			return -EINVAL;
2024
2025		s->s.port.src = in6->sin6_port;
2026		rte_memcpy(&s->s.ipv6.addr.src, &in6->sin6_addr,
2027			sizeof(s->s.ipv6.addr.src));
2028		rte_memcpy(&s->s.ipv6.mask.src, &tle_ipv6_none,
2029			sizeof(s->s.ipv6.mask.src));
2030		rte_memcpy(&s->s.ipv6.mask.dst, &tle_ipv6_none,
2031			sizeof(s->s.ipv6.mask.dst));
2032	}
2033
2034	/* setup the destination device. */
2035	rc = stream_fill_dest(s);
2036	if (rc != 0)
2037		return rc;
2038
2039	/* setup L4 dst address from device param */
2040	prm = &s->tx.dst.dev->prm;
2041	if (s->s.type == TLE_V4) {
2042		if (s->s.ipv4.addr.dst == INADDR_ANY)
2043			s->s.ipv4.addr.dst = prm->local_addr4.s_addr;
2044	} else if (memcmp(&s->s.ipv6.addr.dst, &tle_ipv6_any,
2045			sizeof(tle_ipv6_any)) == 0)
2046		memcpy(&s->s.ipv6.addr.dst, &prm->local_addr6,
2047			sizeof(s->s.ipv6.addr.dst));
2048
2049	return rc;
2050}
2051
2052static inline int
2053tx_syn(struct tle_tcp_stream *s, const struct sockaddr *addr)
2054{
2055	int32_t rc;
2056	uint32_t tms, seq;
2057	union pkt_info pi;
2058	struct stbl *st;
2059	struct stbl_entry *se;
2060
2061	/* fill stream address */
2062	rc = stream_fill_addr(s, addr);
2063	if (rc != 0)
2064		return rc;
2065
2066	/* fill pkt info to generate seq.*/
2067	stream_fill_pkt_info(s, &pi);
2068
2069	tms = tcp_get_tms();
2070	s->tcb.so.ts.val = tms;
2071	s->tcb.so.ts.ecr = 0;
2072	s->tcb.so.wscale = TCP_WSCALE_DEFAULT;
2073	s->tcb.so.mss = calc_smss(s->tx.dst.mtu, &s->tx.dst);
2074
2075	/* note that rcv.nxt is 0 here for sync_gen_seq.*/
2076	seq = sync_gen_seq(&pi, s->tcb.rcv.nxt, tms, s->tcb.so.mss,
2077				s->s.ctx->prm.hash_alg,
2078				&s->s.ctx->prm.secret_key);
2079	s->tcb.snd.iss = seq;
2080	s->tcb.snd.rcvr = seq;
2081	s->tcb.snd.una = seq;
2082	s->tcb.snd.nxt = seq + 1;
2083	s->tcb.snd.rto = TCP_RTO_DEFAULT;
2084	s->tcb.snd.ts = tms;
2085
2086	s->tcb.rcv.mss = s->tcb.so.mss;
2087	s->tcb.rcv.wscale = TCP_WSCALE_DEFAULT;
2088	s->tcb.rcv.wnd = calc_rx_wnd(s, s->tcb.rcv.wscale);
2089	s->tcb.rcv.ts = 0;
2090
2091	/* add the stream in stream table */
2092	st = CTX_TCP_STLB(s->s.ctx);
2093	se = stbl_add_stream_lock(st, s);
2094	if (se == NULL)
2095		return -ENOBUFS;
2096	s->ste = se;
2097
2098	/* put stream into the to-send queue */
2099	txs_enqueue(s->s.ctx, s);
2100
2101	return 0;
2102}
2103
2104int
2105tle_tcp_stream_connect(struct tle_stream *ts, const struct sockaddr *addr)
2106{
2107	struct tle_tcp_stream *s;
2108	uint32_t type;
2109	int32_t rc;
2110
2111	if (ts == NULL || addr == NULL)
2112		return -EINVAL;
2113
2114	s = TCP_STREAM(ts);
2115	type = s->s.type;
2116	if (type >= TLE_VNUM)
2117		return -EINVAL;
2118
2119	if (rwl_try_acquire(&s->tx.use) > 0) {
2120		rc = rte_atomic16_cmpset(&s->tcb.state, TCP_ST_CLOSED,
2121			TCP_ST_SYN_SENT);
2122		rc = (rc == 0) ? -EDEADLK : 0;
2123	} else
2124		rc = -EINVAL;
2125
2126	if (rc != 0) {
2127		rwl_release(&s->tx.use);
2128		return rc;
2129	}
2130
2131	/* fill stream, prepare and transmit syn pkt */
2132	s->tcb.uop |= TCP_OP_CONNECT;
2133	rc = tx_syn(s, addr);
2134	rwl_release(&s->tx.use);
2135
2136	/* error happened, do a cleanup */
2137	if (rc != 0)
2138		tle_tcp_stream_close(ts);
2139
2140	return rc;
2141}
2142
2143uint16_t
2144tle_tcp_stream_recv(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
2145{
2146	uint32_t n;
2147	struct tle_tcp_stream *s;
2148
2149	s = TCP_STREAM(ts);
2150	n = _rte_ring_mc_dequeue_burst(s->rx.q, (void **)pkt, num);
2151	if (n == 0)
2152		return 0;
2153
2154	/*
2155	 * if we still have packets to read,
2156	 * then rearm stream RX event.
2157	 */
2158	if (n == num && rte_ring_count(s->rx.q) != 0) {
2159		if (rwl_try_acquire(&s->rx.use) > 0 && s->rx.ev != NULL)
2160			tle_event_raise(s->rx.ev);
2161		rwl_release(&s->rx.use);
2162	}
2163
2164	return n;
2165}
2166
2167static inline int32_t
2168tx_segments(struct tle_tcp_stream *s, uint64_t ol_flags,
2169	struct rte_mbuf *segs[], uint32_t num)
2170{
2171	uint32_t i;
2172	int32_t rc;
2173
2174	for (i = 0; i != num; i++) {
2175		/* Build L2/L3/L4 header */
2176		rc = tcp_fill_mbuf(segs[i], s, &s->tx.dst, ol_flags, s->s.port,
2177			0, TCP_FLAG_ACK, 0, 0);
2178		if (rc != 0) {
2179			free_segments(segs, num);
2180			break;
2181		}
2182	}
2183
2184	if (i == num) {
2185		/* queue packets for further transmission. */
2186		rc = _rte_ring_mp_enqueue_bulk(s->tx.q, (void **)segs, num);
2187		if (rc != 0)
2188			free_segments(segs, num);
2189	}
2190
2191	return rc;
2192}
2193
2194uint16_t
2195tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
2196{
2197	uint32_t i, j, k, mss, n, state, type;
2198	int32_t rc;
2199	uint64_t ol_flags;
2200	struct tle_tcp_stream *s;
2201	struct tle_dev *dev;
2202	struct rte_mbuf *segs[TCP_MAX_PKT_SEG];
2203
2204	s = TCP_STREAM(ts);
2205
2206	/* mark stream as not closable. */
2207	if (rwl_acquire(&s->tx.use) < 0) {
2208		rte_errno = EAGAIN;
2209		return 0;
2210	}
2211
2212	state = s->tcb.state;
2213	if (state != TCP_ST_ESTABLISHED && state != TCP_ST_CLOSE_WAIT) {
2214		rte_errno = ENOTCONN;
2215		rwl_release(&s->tx.use);
2216		return 0;
2217	}
2218
2219	mss = s->tcb.snd.mss;
2220	dev = s->tx.dst.dev;
2221	type = s->s.type;
2222	ol_flags = dev->tx.ol_flags[type];
2223
2224	k = 0;
2225	rc = 0;
2226	while (k != num) {
2227		/* prepare and check for TX */
2228		for (i = k; i != num; i++) {
2229			if (pkt[i]->pkt_len > mss ||
2230					pkt[i]->nb_segs > TCP_MAX_PKT_SEG)
2231				break;
2232			rc = tcp_fill_mbuf(pkt[i], s, &s->tx.dst, ol_flags,
2233				s->s.port, 0, TCP_FLAG_ACK, 0, 0);
2234			if (rc != 0)
2235				break;
2236		}
2237
2238		if (i != k) {
2239			/* queue packets for further transmission. */
2240			n = _rte_ring_mp_enqueue_burst(s->tx.q,
2241				(void **)pkt + k, (i - k));
2242			k += n;
2243
2244			/*
2245			 * for unsent, but already modified packets:
2246			 * remove pkt l2/l3 headers, restore ol_flags
2247			 */
2248			if (i != k) {
2249				ol_flags = ~dev->tx.ol_flags[type];
2250				for (j = k; j != i; j++) {
2251					rte_pktmbuf_adj(pkt[j], pkt[j]->l2_len +
2252						pkt[j]->l3_len +
2253						pkt[j]->l4_len);
2254					pkt[j]->ol_flags &= ol_flags;
2255				}
2256				break;
2257			}
2258		}
2259
2260		if (rc != 0) {
2261			rte_errno = -rc;
2262			break;
2263
2264		/* segment large packet and enqueue for sending */
2265		} else if (i != num) {
2266			/* segment the packet. */
2267			rc = tcp_segmentation(pkt[i], segs, RTE_DIM(segs),
2268				&s->tx.dst, mss);
2269			if (rc < 0) {
2270				rte_errno = -rc;
2271				break;
2272			}
2273
2274			rc = tx_segments(s, dev->tx.ol_flags[type], segs, rc);
2275			if (rc == 0) {
2276				/* free the large mbuf */
2277				rte_pktmbuf_free(pkt[i]);
2278				/* set the mbuf as consumed */
2279				k++;
2280			} else
2281				/* no space left in tx queue */
2282				break;
2283		}
2284	}
2285
2286	/* notify BE about more data to send */
2287	if (k != 0)
2288		txs_enqueue(s->s.ctx, s);
2289	/* if possible, re-arm stream write event. */
2290	if (rte_ring_free_count(s->tx.q) != 0 && s->tx.ev != NULL)
2291		tle_event_raise(s->tx.ev);
2292
2293	rwl_release(&s->tx.use);
2294
2295	return k;
2296}
2297
2298/* send data and FIN (if needed) */
2299static inline void
2300tx_data_fin(struct tle_tcp_stream *s, uint32_t tms, uint32_t state)
2301{
2302	/* try to send some data */
2303	tx_nxt_data(s, tms);
2304
2305	/* we also have to send a FIN */
2306	if (state != TCP_ST_ESTABLISHED &&
2307			state != TCP_ST_CLOSE_WAIT &&
2308			tcp_txq_nxt_cnt(s) == 0 &&
2309			s->tcb.snd.fss != s->tcb.snd.nxt) {
2310		s->tcb.snd.fss = ++s->tcb.snd.nxt;
2311		send_ack(s, tms, TCP_FLAG_FIN | TCP_FLAG_ACK);
2312	}
2313}
2314
2315static inline void
2316tx_stream(struct tle_tcp_stream *s, uint32_t tms)
2317{
2318	uint32_t state;
2319
2320	state = s->tcb.state;
2321
2322	if (state == TCP_ST_SYN_SENT) {
2323		/* send the SYN, start the rto timer */
2324		send_ack(s, tms, TCP_FLAG_SYN);
2325		timer_start(s);
2326
2327	} else if (state >= TCP_ST_ESTABLISHED && state <= TCP_ST_LAST_ACK) {
2328
2329		tx_data_fin(s, tms, state);
2330
2331		/* start RTO timer. */
2332		if (s->tcb.snd.nxt != s->tcb.snd.una)
2333			timer_start(s);
2334	}
2335}
2336
2337static inline void
2338rto_stream(struct tle_tcp_stream *s, uint32_t tms)
2339{
2340	uint32_t state;
2341
2342	state = s->tcb.state;
2343
2344	TCP_LOG(DEBUG, "%s(%p, tms=%u): state=%u, "
2345		"retx=%u, retm=%u, "
2346		"rto=%u, snd.ts=%u, tmo=%u, "
2347		"snd.nxt=%lu, snd.una=%lu, flight_size=%lu, "
2348		"snd.rcvr=%lu, snd.fastack=%u, "
2349		"wnd=%u, cwnd=%u, ssthresh=%u, "
2350		"bytes sent=%lu, pkt remain=%u;\n",
2351		__func__, s, tms, s->tcb.state,
2352		s->tcb.snd.nb_retx, s->tcb.snd.nb_retm,
2353		s->tcb.snd.rto, s->tcb.snd.ts, tms - s->tcb.snd.ts,
2354		s->tcb.snd.nxt, s->tcb.snd.una, s->tcb.snd.nxt - s->tcb.snd.una,
2355		s->tcb.snd.rcvr, s->tcb.snd.fastack,
2356		s->tcb.snd.wnd, s->tcb.snd.cwnd, s->tcb.snd.ssthresh,
2357		s->tcb.snd.nxt - s->tcb.snd.iss, tcp_txq_nxt_cnt(s));
2358
2359	if (s->tcb.snd.nb_retx < s->tcb.snd.nb_retm) {
2360
2361		if (state >= TCP_ST_ESTABLISHED && state <= TCP_ST_LAST_ACK) {
2362
2363			/* update SND.CWD and SND.SSTHRESH */
2364			rto_cwnd_update(&s->tcb);
2365
2366			/* RFC 6582 3.2.4 */
2367			s->tcb.snd.rcvr = s->tcb.snd.nxt;
2368			s->tcb.snd.fastack = 0;
2369
2370			/* restart from last acked data */
2371			tcp_txq_rst_nxt_head(s);
2372			s->tcb.snd.nxt = s->tcb.snd.una;
2373
2374			tx_data_fin(s, tms, state);
2375
2376		} else if (state == TCP_ST_SYN_SENT) {
2377			/* resending SYN */
2378			s->tcb.so.ts.val = tms;
2379			send_ack(s, tms, TCP_FLAG_SYN);
2380
2381		} else if (state == TCP_ST_TIME_WAIT) {
2382			stream_term(s);
2383		}
2384
2385		/* RFC6298:5.5 back off the timer */
2386		s->tcb.snd.rto = rto_roundup(2 * s->tcb.snd.rto);
2387		s->tcb.snd.nb_retx++;
2388		timer_restart(s);
2389
2390	} else {
2391		send_rst(s, s->tcb.snd.una);
2392		stream_term(s);
2393	}
2394}
2395
2396int
2397tle_tcp_process(struct tle_ctx *ctx, uint32_t num)
2398{
2399	uint32_t i, k, tms;
2400	struct sdr *dr;
2401	struct tle_timer_wheel *tw;
2402	struct tle_stream *p;
2403	struct tle_tcp_stream *s, *rs[num];
2404
2405	/* process streams with RTO exipred */
2406
2407	tw = CTX_TCP_TMWHL(ctx);
2408	tms = tcp_get_tms();
2409	tle_timer_expire(tw, tms);
2410
2411	k = tle_timer_get_expired_bulk(tw, (void **)rs, RTE_DIM(rs));
2412
2413	for (i = 0; i != k; i++) {
2414
2415		s = rs[i];
2416		s->timer.handle = NULL;
2417		if (rwl_try_acquire(&s->tx.use) > 0)
2418			rto_stream(s, tms);
2419		rwl_release(&s->tx.use);
2420	}
2421
2422	/* process streams from to-send queue */
2423
2424	k = txs_dequeue_bulk(ctx, rs, RTE_DIM(rs));
2425
2426	for (i = 0; i != k; i++) {
2427
2428		s = rs[i];
2429		rte_atomic32_set(&s->tx.arm, 0);
2430
2431		if (rwl_try_acquire(&s->tx.use) > 0)
2432			tx_stream(s, tms);
2433		else
2434			txs_enqueue(s->s.ctx, s);
2435		rwl_release(&s->tx.use);
2436	}
2437
2438	/* collect streams to close from the death row */
2439
2440	dr = CTX_TCP_SDR(ctx);
2441	for (k = 0, p = STAILQ_FIRST(&dr->be);
2442			k != num && p != NULL;
2443			k++, p = STAILQ_NEXT(p, link))
2444		rs[k] = TCP_STREAM(p);
2445
2446	if (p == NULL)
2447		STAILQ_INIT(&dr->be);
2448	else
2449		STAILQ_FIRST(&dr->be) = p;
2450
2451	/* cleanup closed streams */
2452	for (i = 0; i != k; i++) {
2453		s = rs[i];
2454		tcp_stream_down(s);
2455		tcp_stream_reset(ctx, s);
2456	}
2457
2458	return 0;
2459}
2460