tcp_rxtx.c revision 9af556f2
1/*
2 * Copyright (c) 2016  Intel Corporation.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#include <rte_errno.h>
17#include <rte_ethdev.h>
18#include <rte_ip.h>
19#include <rte_ip_frag.h>
20#include <rte_tcp.h>
21
22#include "tcp_stream.h"
23#include "tcp_timer.h"
24#include "stream_table.h"
25#include "syncookie.h"
26#include "misc.h"
27#include "tcp_ctl.h"
28#include "tcp_rxq.h"
29#include "tcp_txq.h"
30#include "tcp_tx_seg.h"
31
32#define	TCP_MAX_PKT_SEG	0x20
33
34/*
35 * checks if input TCP ports and IP addresses match given stream.
36 * returns zero on success.
37 */
38static inline int
39rx_check_stream(const struct tle_tcp_stream *s, const union pkt_info *pi)
40{
41	int32_t rc;
42
43	if (pi->tf.type == TLE_V4)
44		rc = (pi->port.raw & s->s.pmsk.raw) != s->s.port.raw ||
45			(pi->addr4.raw & s->s.ipv4.mask.raw) !=
46			s->s.ipv4.addr.raw;
47	else
48		rc = (pi->port.raw & s->s.pmsk.raw) != s->s.port.raw ||
49			ymm_mask_cmp(&pi->addr6->raw, &s->s.ipv6.addr.raw,
50			&s->s.ipv6.mask.raw) != 0;
51
52	return rc;
53}
54
55static inline struct tle_tcp_stream *
56rx_obtain_listen_stream(const struct tle_dev *dev, const union pkt_info *pi,
57	uint32_t type)
58{
59	struct tle_tcp_stream *s;
60
61	s = (struct tle_tcp_stream *)dev->dp[type]->streams[pi->port.dst];
62	if (s == NULL || rwl_acquire(&s->rx.use) < 0)
63		return NULL;
64
65	/* check that we have a proper stream. */
66	if (s->tcb.state != TCP_ST_LISTEN) {
67		rwl_release(&s->rx.use);
68		s = NULL;
69	}
70
71	return s;
72}
73
74static inline struct tle_tcp_stream *
75rx_obtain_stream(const struct tle_dev *dev, struct stbl *st,
76	const union pkt_info *pi, uint32_t type)
77{
78	struct tle_tcp_stream *s;
79
80	s = stbl_find_data(st, pi);
81	if (s == NULL) {
82		if (pi->tf.flags == TCP_FLAG_ACK)
83			return rx_obtain_listen_stream(dev, pi, type);
84		return NULL;
85	}
86
87	if (stbl_data_pkt(s) || rwl_acquire(&s->rx.use) < 0)
88		return NULL;
89	/* check that we have a proper stream. */
90	else if (s->tcb.state == TCP_ST_CLOSED) {
91		rwl_release(&s->rx.use);
92		s = NULL;
93	}
94
95	return s;
96}
97
98/*
99 * Consider 2 pkt_info *equal* if their:
100 * - types (IPv4/IPv6)
101 * - TCP flags
102 * - checksum flags
103 * - TCP src and dst ports
104 * - IP src and dst addresses
105 * are equal.
106 */
107static inline int
108pkt_info_bulk_eq(const union pkt_info pi[], uint32_t num)
109{
110	uint32_t i;
111
112	i = 1;
113
114	if (pi[0].tf.type == TLE_V4) {
115		while (i != num && xmm_cmp(&pi[0].raw, &pi[i].raw) == 0)
116			i++;
117
118	} else if (pi[0].tf.type == TLE_V6) {
119		while (i != num &&
120				pi[0].raw.u64[0] == pi[i].raw.u64[0] &&
121				ymm_cmp(&pi[0].addr6->raw,
122				&pi[i].addr6->raw) == 0)
123			i++;
124	}
125
126	return i;
127}
128
129static inline int
130pkt_info_bulk_syneq(const union pkt_info pi[], uint32_t num)
131{
132	uint32_t i;
133
134	i = 1;
135
136	if (pi[0].tf.type == TLE_V4) {
137		while (i != num && pi[0].tf.raw == pi[i].tf.raw &&
138				pi[0].port.dst == pi[i].port.dst &&
139				pi[0].addr4.dst == pi[i].addr4.dst)
140			i++;
141
142	} else if (pi[0].tf.type == TLE_V6) {
143		while (i != num && pi[0].tf.raw == pi[i].tf.raw &&
144				pi[0].port.dst == pi[i].port.dst &&
145				xmm_cmp(&pi[0].addr6->dst,
146				&pi[i].addr6->dst) == 0)
147			i++;
148	}
149
150	return i;
151}
152
153static inline void
154stream_drb_free(struct tle_tcp_stream *s, struct tle_drb *drbs[],
155	uint32_t nb_drb)
156{
157	rte_ring_enqueue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
158}
159
160static inline uint32_t
161stream_drb_alloc(struct tle_tcp_stream *s, struct tle_drb *drbs[],
162	uint32_t nb_drb)
163{
164	return rte_ring_dequeue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
165}
166
167static inline void
168fill_tcph(struct tcp_hdr *l4h, const struct tcb *tcb, union l4_ports port,
169	uint32_t seq, uint8_t hlen, uint8_t flags)
170{
171	uint16_t wnd;
172
173	l4h->src_port = port.dst;
174	l4h->dst_port = port.src;
175
176	wnd = (flags & TCP_FLAG_SYN) ?
177		RTE_MIN(tcb->rcv.wnd, (uint32_t)UINT16_MAX) :
178		tcb->rcv.wnd >> tcb->rcv.wscale;
179
180	/* ??? use sse shuffle to hton all remaining 16 bytes at once. ??? */
181	l4h->sent_seq = rte_cpu_to_be_32(seq);
182	l4h->recv_ack = rte_cpu_to_be_32(tcb->rcv.nxt);
183	l4h->data_off = hlen / TCP_DATA_ALIGN << TCP_DATA_OFFSET;
184	l4h->tcp_flags = flags;
185	l4h->rx_win = rte_cpu_to_be_16(wnd);
186	l4h->cksum = 0;
187	l4h->tcp_urp = 0;
188
189	if (flags & TCP_FLAG_SYN)
190		fill_syn_opts(l4h + 1, &tcb->so);
191	else if ((flags & TCP_FLAG_RST) == 0 && tcb->so.ts.raw != 0)
192		fill_tms_opts(l4h + 1, tcb->snd.ts, tcb->rcv.ts);
193}
194
195static inline int
196tcp_fill_mbuf(struct rte_mbuf *m, const struct tle_tcp_stream *s,
197	const struct tle_dest *dst, uint64_t ol_flags,
198	union l4_ports port, uint32_t seq, uint32_t flags,
199	uint32_t pid, uint32_t swcsm)
200{
201	uint32_t l4, len, plen;
202	struct tcp_hdr *l4h;
203	char *l2h;
204
205	len = dst->l2_len + dst->l3_len;
206	plen = m->pkt_len;
207
208	if (flags & TCP_FLAG_SYN)
209		l4 = sizeof(*l4h) + TCP_TX_OPT_LEN_MAX;
210	else if ((flags & TCP_FLAG_RST) == 0 && s->tcb.rcv.ts != 0)
211		l4 = sizeof(*l4h) + TCP_TX_OPT_LEN_TMS;
212	else
213		l4 = sizeof(*l4h);
214
215	/* adjust mbuf to put L2/L3/L4 headers into it. */
216	l2h = rte_pktmbuf_prepend(m, len + l4);
217	if (l2h == NULL)
218		return -EINVAL;
219
220	/* copy L2/L3 header */
221	rte_memcpy(l2h, dst->hdr, len);
222
223	/* setup TCP header & options */
224	l4h = (struct tcp_hdr *)(l2h + len);
225	fill_tcph(l4h, &s->tcb, port, seq, l4, flags);
226
227	/* setup mbuf TX offload related fields. */
228	m->tx_offload = _mbuf_tx_offload(dst->l2_len, dst->l3_len, l4, 0, 0, 0);
229	m->ol_flags |= ol_flags;
230
231	/* update proto specific fields. */
232
233	if (s->s.type == TLE_V4) {
234		struct ipv4_hdr *l3h;
235		l3h = (struct ipv4_hdr *)(l2h + dst->l2_len);
236		l3h->packet_id = rte_cpu_to_be_16(pid);
237		l3h->total_length = rte_cpu_to_be_16(plen + dst->l3_len + l4);
238
239		if ((ol_flags & PKT_TX_TCP_CKSUM) != 0)
240			l4h->cksum = _ipv4x_phdr_cksum(l3h, m->l3_len,
241				ol_flags);
242		else if (swcsm != 0)
243			l4h->cksum = _ipv4_udptcp_mbuf_cksum(m, len, l3h);
244
245		if ((ol_flags & PKT_TX_IP_CKSUM) == 0 && swcsm != 0)
246			l3h->hdr_checksum = _ipv4x_cksum(l3h, m->l3_len);
247	} else {
248		struct ipv6_hdr *l3h;
249		l3h = (struct ipv6_hdr *)(l2h + dst->l2_len);
250		l3h->payload_len = rte_cpu_to_be_16(plen + l4);
251		if ((ol_flags & PKT_TX_TCP_CKSUM) != 0)
252			l4h->cksum = rte_ipv6_phdr_cksum(l3h, ol_flags);
253		else if (swcsm != 0)
254			l4h->cksum = _ipv6_udptcp_mbuf_cksum(m, len, l3h);
255	}
256
257	return 0;
258}
259
260/*
261 * That function supposed to be used only for data packets.
262 * Assumes that L2/L3/L4 headers and mbuf fields already setup properly.
263 *  - updates tcp SEG.SEQ, SEG.ACK, TS.VAL, TS.ECR.
264 *  - if no HW cksum offloads are enabled, calculates TCP checksum.
265 */
266static inline void
267tcp_update_mbuf(struct rte_mbuf *m, uint32_t type, const struct tcb *tcb,
268	uint32_t seq, uint32_t pid)
269{
270	struct tcp_hdr *l4h;
271	uint32_t len;
272
273	len = m->l2_len + m->l3_len;
274	l4h = rte_pktmbuf_mtod_offset(m, struct tcp_hdr *, len);
275
276	l4h->sent_seq = rte_cpu_to_be_32(seq);
277	l4h->recv_ack = rte_cpu_to_be_32(tcb->rcv.nxt);
278
279	if (tcb->so.ts.raw != 0)
280		fill_tms_opts(l4h + 1, tcb->snd.ts, tcb->rcv.ts);
281
282	if (type == TLE_V4) {
283		struct ipv4_hdr *l3h;
284		l3h = rte_pktmbuf_mtod_offset(m, struct ipv4_hdr *, m->l2_len);
285		l3h->hdr_checksum = 0;
286		l3h->packet_id = rte_cpu_to_be_16(pid);
287		if ((m->ol_flags & PKT_TX_IP_CKSUM) == 0)
288			l3h->hdr_checksum = _ipv4x_cksum(l3h, m->l3_len);
289	}
290
291	/* have to calculate TCP checksum in SW */
292	if ((m->ol_flags & PKT_TX_TCP_CKSUM) == 0) {
293
294		l4h->cksum = 0;
295
296		if (type == TLE_V4) {
297			struct ipv4_hdr *l3h;
298			l3h = rte_pktmbuf_mtod_offset(m, struct ipv4_hdr *,
299				m->l2_len);
300			l4h->cksum = _ipv4_udptcp_mbuf_cksum(m, len, l3h);
301
302		} else {
303			struct ipv6_hdr *l3h;
304			l3h = rte_pktmbuf_mtod_offset(m, struct ipv6_hdr *,
305				m->l2_len);
306			l4h->cksum = _ipv6_udptcp_mbuf_cksum(m, len, l3h);
307		}
308	}
309}
310
311/* Send data packets that need to be ACK-ed by peer */
312static inline uint32_t
313tx_data_pkts(struct tle_tcp_stream *s, struct rte_mbuf *const m[], uint32_t num)
314{
315	uint32_t bsz, i, nb, nbm;
316	struct tle_dev *dev;
317	struct tle_drb *drb[num];
318
319	/* calculate how many drbs are needed.*/
320	bsz = s->tx.drb.nb_elem;
321	nbm = (num + bsz - 1) / bsz;
322
323	/* allocate drbs, adjust number of packets. */
324	nb = stream_drb_alloc(s, drb, nbm);
325
326	/* drb ring is empty. */
327	if (nb == 0)
328		return 0;
329
330	else if (nb != nbm)
331		num = nb * bsz;
332
333	dev = s->tx.dst.dev;
334
335	/* enqueue pkts for TX. */
336	nbm = nb;
337	i = tle_dring_mp_enqueue(&dev->tx.dr, (const void * const*)m,
338		num, drb, &nb);
339
340	/* free unused drbs. */
341	if (nb != 0)
342		stream_drb_free(s, drb + nbm - nb, nb);
343
344	return i;
345}
346
347static inline uint32_t
348tx_data_bulk(struct tle_tcp_stream *s, union seqlen *sl, struct rte_mbuf *mi[],
349	uint32_t num)
350{
351	uint32_t fail, i, k, n, mss, pid, plen, sz, tn, type;
352	struct tle_dev *dev;
353	struct rte_mbuf *mb;
354	struct rte_mbuf *mo[MAX_PKT_BURST + TCP_MAX_PKT_SEG];
355
356	mss = s->tcb.snd.mss;
357	type = s->s.type;
358
359	dev = s->tx.dst.dev;
360	pid = rte_atomic32_add_return(&dev->tx.packet_id[type], num) - num;
361
362	k = 0;
363	tn = 0;
364	fail = 0;
365	for (i = 0; i != num && sl->len != 0 && fail == 0; i++) {
366
367		mb = mi[i];
368		sz = RTE_MIN(sl->len, mss);
369		plen = PKT_L4_PLEN(mb);
370
371		/*fast path, no need to use indirect mbufs. */
372		if (plen <= sz) {
373
374			/* update pkt TCP header */
375			tcp_update_mbuf(mb, type, &s->tcb, sl->seq, pid + i);
376
377			/* keep mbuf till ACK is received. */
378			rte_pktmbuf_refcnt_update(mb, 1);
379			sl->len -= plen;
380			sl->seq += plen;
381			mo[k++] = mb;
382		/* remaining snd.wnd is less them MSS, send nothing */
383		} else if (sz < mss)
384			break;
385		/* packet indirection needed */
386		else
387			RTE_VERIFY(0);
388
389		if (k >= MAX_PKT_BURST) {
390			n = tx_data_pkts(s, mo, k);
391			fail = k - n;
392			tn += n;
393			k = 0;
394		}
395	}
396
397	if (k != 0) {
398		n = tx_data_pkts(s, mo, k);
399		fail = k - n;
400		tn += n;
401	}
402
403	if (fail != 0) {
404		sz = tcp_mbuf_seq_free(mo + n, fail);
405		sl->seq -= sz;
406		sl->len += sz;
407	}
408
409	return tn;
410}
411
412/*
413 * gets data from stream send buffer, updates it and
414 * queues it into TX device queue.
415 * Note that this function and is not MT safe.
416 */
417static inline uint32_t
418tx_nxt_data(struct tle_tcp_stream *s, uint32_t tms)
419{
420	uint32_t n, num, tn, wnd;
421	struct rte_mbuf **mi;
422	union seqlen sl;
423
424	tn = 0;
425	wnd = s->tcb.snd.wnd - (uint32_t)(s->tcb.snd.nxt - s->tcb.snd.una);
426	sl.seq = s->tcb.snd.nxt;
427	sl.len = RTE_MIN(wnd, s->tcb.snd.cwnd);
428
429	if (sl.len == 0)
430		return tn;
431
432	/* update send timestamp */
433	s->tcb.snd.ts = tms;
434
435	do {
436		/* get group of packets */
437		mi = tcp_txq_get_nxt_objs(s, &num);
438
439		/* stream send buffer is empty */
440		if (num == 0)
441			break;
442
443		/* queue data packets for TX */
444		n = tx_data_bulk(s, &sl, mi, num);
445		tn += n;
446
447		/* update consumer head */
448		tcp_txq_set_nxt_head(s, n);
449	} while (n == num);
450
451	s->tcb.snd.nxt += sl.seq - (uint32_t)s->tcb.snd.nxt;
452	return tn;
453}
454
455static inline void
456free_una_data(struct tle_tcp_stream *s, uint32_t len)
457{
458	uint32_t i, n, num, plen;
459	struct rte_mbuf **mi;
460
461	n = 0;
462	plen = 0;
463
464	do {
465		/* get group of packets */
466		mi = tcp_txq_get_una_objs(s, &num);
467
468		if (num == 0)
469			break;
470
471		/* free acked data */
472		for (i = 0; i != num && n != len; i++, n = plen) {
473			plen += PKT_L4_PLEN(mi[i]);
474			if (plen > len) {
475				/* keep SND.UNA at the start of the packet */
476				len -= RTE_MIN(len, plen - len);
477				break;
478			}
479			rte_pktmbuf_free(mi[i]);
480		}
481
482		/* update consumer tail */
483		tcp_txq_set_una_tail(s, i);
484	} while (plen < len);
485
486	s->tcb.snd.una += len;
487
488	/*
489	 * that could happen in case of retransmit,
490	 * adjust SND.NXT with SND.UNA.
491	 */
492	if (s->tcb.snd.una > s->tcb.snd.nxt) {
493		tcp_txq_rst_nxt_head(s);
494		s->tcb.snd.nxt = s->tcb.snd.una;
495	}
496}
497
498static inline uint16_t
499calc_smss(uint16_t mss, const struct tle_dest *dst)
500{
501	uint16_t n;
502
503	n = dst->mtu - dst->l2_len - dst->l3_len - TCP_TX_HDR_DACK;
504	mss = RTE_MIN(n, mss);
505	return mss;
506}
507
508/*
509 * RFC 5681 3.1
510 * If SMSS > 2190 bytes:
511 *     IW = 2 * SMSS bytes and MUST NOT be more than 2 segments
512 *  If (SMSS > 1095 bytes) and (SMSS <= 2190 bytes):
513 *     IW = 3 * SMSS bytes and MUST NOT be more than 3 segments
514 *  if SMSS <= 1095 bytes:
515 *     IW = 4 * SMSS bytes and MUST NOT be more than 4 segments
516 */
517static inline uint32_t
518initial_cwnd(uint16_t smss)
519{
520	if (smss > 2190)
521		return 2 * smss;
522	else if (smss > 1095)
523		return 3 * smss;
524	return 4 * smss;
525}
526
527/*
528 * queue standalone packet to he particular output device
529 * It assumes that:
530 * - L2/L3/L4 headers should be already set.
531 * - packet fits into one segment.
532 */
533static inline int
534send_pkt(struct tle_tcp_stream *s, struct tle_dev *dev, struct rte_mbuf *m)
535{
536	uint32_t n, nb;
537	struct tle_drb *drb;
538
539	if (stream_drb_alloc(s, &drb, 1) == 0)
540		return -ENOBUFS;
541
542	/* enqueue pkt for TX. */
543	nb = 1;
544	n = tle_dring_mp_enqueue(&dev->tx.dr, (const void * const*)&m, 1,
545		&drb, &nb);
546
547	/* free unused drbs. */
548	if (nb != 0)
549		stream_drb_free(s, &drb, 1);
550
551	return (n == 1) ? 0 : -ENOBUFS;
552}
553
554static inline int
555send_ctrl_pkt(struct tle_tcp_stream *s, struct rte_mbuf *m, uint32_t seq,
556	uint32_t flags)
557{
558	const struct tle_dest *dst;
559	uint32_t pid, type;
560	int32_t rc;
561
562	dst = &s->tx.dst;
563	type = s->s.type;
564	pid = rte_atomic32_add_return(&dst->dev->tx.packet_id[type], 1) - 1;
565
566	rc = tcp_fill_mbuf(m, s, dst, 0, s->s.port, seq, flags, pid, 1);
567	if (rc == 0)
568		rc = send_pkt(s, dst->dev, m);
569
570	return rc;
571}
572
573static inline int
574send_rst(struct tle_tcp_stream *s, uint32_t seq)
575{
576	struct rte_mbuf *m;
577	int32_t rc;
578
579	m = rte_pktmbuf_alloc(s->tx.dst.head_mp);
580	if (m == NULL)
581		return -ENOMEM;
582
583	rc = send_ctrl_pkt(s, m, seq, TCP_FLAG_RST);
584	if (rc != 0)
585		rte_pktmbuf_free(m);
586
587	return rc;
588}
589
590static inline int
591send_ack(struct tle_tcp_stream *s, uint32_t tms, uint32_t flags)
592{
593	struct rte_mbuf *m;
594	uint32_t seq;
595	int32_t rc;
596
597	m = rte_pktmbuf_alloc(s->tx.dst.head_mp);
598	if (m == NULL)
599		return -ENOMEM;
600
601	seq = s->tcb.snd.nxt - ((flags & (TCP_FLAG_FIN | TCP_FLAG_SYN)) != 0);
602	s->tcb.snd.ts = tms;
603
604	rc = send_ctrl_pkt(s, m, seq, flags);
605	if (rc != 0) {
606		rte_pktmbuf_free(m);
607		return rc;
608	}
609
610	s->tcb.snd.ack = s->tcb.rcv.nxt;
611	return 0;
612}
613
614
615static int
616sync_ack(struct tle_tcp_stream *s, const union pkt_info *pi,
617	const union seg_info *si, uint32_t ts, struct rte_mbuf *m)
618{
619	uint16_t len;
620	int32_t rc;
621	uint32_t pid, seq, type;
622	struct tle_dev *dev;
623	const void *da;
624	struct tle_dest dst;
625	const struct tcp_hdr *th;
626
627	type = s->s.type;
628
629	/* get destination information. */
630	if (type == TLE_V4)
631		da = &pi->addr4.src;
632	else
633		da = &pi->addr6->src;
634
635	rc = stream_get_dest(&s->s, da, &dst);
636	if (rc < 0)
637		return rc;
638
639	th = rte_pktmbuf_mtod_offset(m, const struct tcp_hdr *,
640		m->l2_len + m->l3_len);
641	get_syn_opts(&s->tcb.so, (uintptr_t)(th + 1), m->l4_len - sizeof(*th));
642
643	s->tcb.rcv.nxt = si->seq + 1;
644	seq = sync_gen_seq(pi, s->tcb.rcv.nxt, ts, s->tcb.so.mss);
645	s->tcb.so.ts.ecr = s->tcb.so.ts.val;
646	s->tcb.so.ts.val = sync_gen_ts(ts, s->tcb.so.wscale);
647	s->tcb.so.wscale = (s->tcb.so.wscale == TCP_WSCALE_NONE) ?
648		TCP_WSCALE_NONE : TCP_WSCALE_DEFAULT;
649	s->tcb.so.mss = calc_smss(dst.mtu, &dst);
650
651	/* reset mbuf's data contents. */
652	len = m->l2_len + m->l3_len + m->l4_len;
653	m->tx_offload = 0;
654	if (rte_pktmbuf_adj(m, len) == NULL)
655		return -EINVAL;
656
657	dev = dst.dev;
658	pid = rte_atomic32_add_return(&dev->tx.packet_id[type], 1) - 1;
659
660	rc = tcp_fill_mbuf(m, s, &dst, 0, pi->port, seq,
661		TCP_FLAG_SYN | TCP_FLAG_ACK, pid, 1);
662	if (rc == 0)
663		rc = send_pkt(s, dev, m);
664
665	return rc;
666}
667
668/*
669 * RFC 793:
670 * There are four cases for the acceptability test for an incoming segment:
671 * Segment Receive  Test
672 * Length  Window
673 * ------- -------  -------------------------------------------
674 *    0       0     SEG.SEQ = RCV.NXT
675 *    0      >0     RCV.NXT =< SEG.SEQ < RCV.NXT+RCV.WND
676 *   >0       0     not acceptable
677 *   >0      >0     RCV.NXT =< SEG.SEQ < RCV.NXT+RCV.WND
678 *                  or RCV.NXT =< SEG.SEQ+SEG.LEN-1 < RCV.NXT+RCV.WND
679 */
680static inline int
681check_seqn(const struct tcb *tcb, uint32_t seqn, uint32_t len)
682{
683	uint32_t n;
684
685	n = seqn + len;
686	if (seqn - tcb->rcv.nxt >= tcb->rcv.wnd &&
687			n - tcb->rcv.nxt > tcb->rcv.wnd)
688		return -ERANGE;
689
690	return 0;
691}
692
693static inline union tsopt
694rx_tms_opt(const struct tcb *tcb, const struct rte_mbuf *mb)
695{
696	union tsopt ts;
697	uintptr_t opt;
698	const struct tcp_hdr *th;
699
700	if (tcb->so.ts.val != 0) {
701		opt = rte_pktmbuf_mtod_offset(mb, uintptr_t,
702			mb->l2_len + mb->l3_len + sizeof(*th));
703		ts = get_tms_opts(opt, mb->l4_len - sizeof(*th));
704	} else
705		ts.raw = 0;
706
707	return ts;
708}
709
710/*
711 * PAWS and sequence check.
712 * RFC 1323 4.2.1
713 */
714static inline int
715rx_check_seq(struct tcb *tcb, uint32_t seq, uint32_t len, const union tsopt ts)
716{
717	int32_t rc;
718
719	/* RFC 1323 4.2.1 R2 */
720	rc = check_seqn(tcb, seq, len);
721	if (rc < 0)
722		return rc;
723
724	if (ts.raw != 0) {
725
726		/* RFC 1323 4.2.1 R1 */
727		if (tcp_seq_lt(ts.val, tcb->rcv.ts))
728			return -ERANGE;
729
730		/* RFC 1323 4.2.1 R3 */
731		if (tcp_seq_leq(seq, tcb->snd.ack) &&
732				tcp_seq_lt(tcb->snd.ack, seq + len))
733			tcb->rcv.ts = ts.val;
734	}
735
736	return rc;
737}
738
739static inline int
740rx_check_ack(const struct tcb *tcb, uint32_t ack)
741{
742	uint32_t max;
743
744	max = (uint32_t)RTE_MAX(tcb->snd.nxt, tcb->snd.rcvr);
745
746	if (tcp_seq_leq(tcb->snd.una, ack) && tcp_seq_leq(ack, max))
747		return 0;
748
749	return -ERANGE;
750}
751
752static inline int
753rx_check_seqack(struct tcb *tcb, uint32_t seq, uint32_t ack, uint32_t len,
754	const union tsopt ts)
755{
756	int32_t rc;
757
758	rc = rx_check_seq(tcb, seq, len, ts);
759	rc |= rx_check_ack(tcb, ack);
760	return rc;
761}
762
763static inline int
764restore_syn_opt(struct syn_opts *so, const union pkt_info *pi,
765	const union seg_info *si, uint32_t ts, const struct rte_mbuf *mb)
766{
767	int32_t rc;
768	uint32_t len;
769	const struct tcp_hdr *th;
770
771	/* check that ACK, etc fields are what we expected. */
772	rc = sync_check_ack(pi, si->seq, si->ack - 1, ts);
773	if (rc < 0)
774		return rc;
775
776	so->mss = rc;
777
778	th = rte_pktmbuf_mtod_offset(mb, const struct tcp_hdr *,
779		mb->l2_len + mb->l3_len);
780	len = mb->l4_len - sizeof(*th);
781	sync_get_opts(so, (uintptr_t)(th + 1), len);
782	return 0;
783}
784
785static inline void
786stream_term(struct tle_tcp_stream *s)
787{
788	struct sdr *dr;
789
790	s->tcb.state = TCP_ST_CLOSED;
791	rte_smp_wmb();
792
793	timer_stop(s);
794
795	/* close() was already invoked, schedule final cleanup */
796	if ((s->tcb.uop & TCP_OP_CLOSE) != 0) {
797
798		dr = CTX_TCP_SDR(s->s.ctx);
799		STAILQ_INSERT_TAIL(&dr->be, &s->s, link);
800
801	/* notify user that stream need to be closed */
802	} else if (s->err.ev != NULL)
803		tle_event_raise(s->err.ev);
804	else if (s->err.cb.func != NULL)
805		s->err.cb.func(s->err.cb.data, &s->s);
806}
807
808static inline int
809stream_fill_dest(struct tle_tcp_stream *s)
810{
811	int32_t rc;
812	const void *da;
813
814	if (s->s.type == TLE_V4)
815		da = &s->s.ipv4.addr.src;
816	else
817		da = &s->s.ipv6.addr.src;
818
819	rc = stream_get_dest(&s->s, da, &s->tx.dst);
820	return (rc < 0) ? rc : 0;
821}
822
823/*
824 * helper function, prepares a new accept stream.
825 */
826static inline int
827accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st,
828	struct tle_tcp_stream *cs, const struct syn_opts *so,
829	uint32_t tms, const union pkt_info *pi, const union seg_info *si)
830{
831	int32_t rc;
832	uint32_t rtt;
833
834	/* some TX still pending for that stream. */
835	if (TCP_STREAM_TX_PENDING(cs))
836		return -EAGAIN;
837
838	/* setup L4 ports and L3 addresses fields. */
839	cs->s.port.raw = pi->port.raw;
840	cs->s.pmsk.raw = UINT32_MAX;
841
842	if (pi->tf.type == TLE_V4) {
843		cs->s.ipv4.addr = pi->addr4;
844		cs->s.ipv4.mask.src = INADDR_NONE;
845		cs->s.ipv4.mask.dst = INADDR_NONE;
846	} else if (pi->tf.type == TLE_V6) {
847		cs->s.ipv6.addr = *pi->addr6;
848		rte_memcpy(&cs->s.ipv6.mask.src, &tle_ipv6_none,
849			sizeof(cs->s.ipv6.mask.src));
850		rte_memcpy(&cs->s.ipv6.mask.dst, &tle_ipv6_none,
851			sizeof(cs->s.ipv6.mask.dst));
852	}
853
854	/* setup TCB */
855	sync_fill_tcb(&cs->tcb, si, so);
856	cs->tcb.rcv.wnd = calc_rx_wnd(cs, cs->tcb.rcv.wscale);
857
858	/*
859	 * estimate the rto
860	 * for now rtt is calculated based on the tcp TMS option,
861	 * later add real-time one
862	 */
863	if (cs->tcb.so.ts.ecr) {
864		rtt = tms - cs->tcb.so.ts.ecr;
865		rto_estimate(&cs->tcb, rtt);
866	} else
867		cs->tcb.snd.rto = TCP_RTO_DEFAULT;
868
869	/* copy streams type. */
870	cs->s.type = ps->s.type;
871
872	/* retrive and cache destination information. */
873	rc = stream_fill_dest(cs);
874	if (rc != 0)
875		return rc;
876
877	/* update snd.mss with SMSS value */
878	cs->tcb.snd.mss = calc_smss(cs->tcb.snd.mss, &cs->tx.dst);
879
880	/* setup congestion variables */
881	cs->tcb.snd.cwnd = initial_cwnd(cs->tcb.snd.mss);
882	cs->tcb.snd.ssthresh = cs->tcb.snd.wnd;
883
884	cs->tcb.state = TCP_ST_ESTABLISHED;
885
886	/* add stream to the table */
887	cs->ste = stbl_add_stream(st, pi, cs);
888	if (cs->ste == NULL)
889		return -ENOBUFS;
890
891	cs->tcb.uop |= TCP_OP_ACCEPT;
892	tcp_stream_up(cs);
893	return 0;
894}
895
896
897/*
898 * ACK for new connection request arrived.
899 * Check that the packet meets all conditions and try to open a new stream.
900 * returns:
901 * < 0  - invalid packet
902 * == 0 - packet is valid and new stream was opened for it.
903 * > 0  - packet is valid, but failed to open new stream.
904 */
905static inline int
906rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st,
907	const union pkt_info *pi, const union seg_info *si,
908	uint32_t tms, struct rte_mbuf *mb, struct tle_tcp_stream **csp)
909{
910	int32_t rc;
911	struct tle_ctx *ctx;
912	struct tle_stream *ts;
913	struct tle_tcp_stream *cs;
914	struct syn_opts so;
915
916	*csp = NULL;
917
918	if (pi->tf.flags != TCP_FLAG_ACK || rx_check_stream(s, pi) != 0)
919		return -EINVAL;
920
921	rc = restore_syn_opt(&so, pi, si, tms, mb);
922	if (rc < 0)
923		return rc;
924
925	ctx = s->s.ctx;
926
927	/* allocate new stream */
928	ts = get_stream(ctx);
929	cs = TCP_STREAM(ts);
930	if (ts == NULL)
931		return ENFILE;
932
933	/* prepare stream to handle new connection */
934	if (accept_prep_stream(s, st, cs, &so, tms, pi, si) == 0) {
935
936		/* put new stream in the accept queue */
937		if (rte_ring_enqueue_burst(s->rx.q,
938				(void * const *)&ts, 1) == 1) {
939			*csp = cs;
940			return 0;
941		}
942
943		/* cleanup on failure */
944		tcp_stream_down(cs);
945		stbl_del_pkt(st, cs->ste, pi);
946		cs->ste = NULL;
947	}
948
949	tcp_stream_reset(ctx, cs);
950	return ENOBUFS;
951}
952
953static inline int
954data_pkt_adjust(const struct tcb *tcb, struct rte_mbuf *mb, uint32_t hlen,
955	uint32_t *seqn, uint32_t *plen)
956{
957	uint32_t len, n, seq;
958
959	seq = *seqn;
960	len = *plen;
961
962	rte_pktmbuf_adj(mb, hlen);
963	if (len == 0)
964		return -ENODATA;
965	/* cut off the start of the packet */
966	else if (tcp_seq_lt(seq, tcb->rcv.nxt)) {
967		n = tcb->rcv.nxt - seq;
968		if (n >= len)
969			return -ENODATA;
970
971		rte_pktmbuf_adj(mb, n);
972		*seqn = seq + n;
973		*plen = len - n;
974	}
975
976	return 0;
977}
978
979static inline uint32_t
980rx_ackdata(struct tle_tcp_stream *s, uint32_t ack)
981{
982	uint32_t k, n;
983
984	n = ack - (uint32_t)s->tcb.snd.una;
985
986	/* some more data was acked. */
987	if (n != 0) {
988
989		/* advance SND.UNA and free related packets. */
990		k = rte_ring_free_count(s->tx.q);
991		free_una_data(s, n);
992
993		/* mark the stream as available for writing */
994		if (rte_ring_free_count(s->tx.q) != 0) {
995			if (s->tx.ev != NULL)
996				tle_event_raise(s->tx.ev);
997			else if (k == 0 && s->tx.cb.func != NULL)
998				s->tx.cb.func(s->tx.cb.data, &s->s);
999		}
1000	}
1001
1002	return n;
1003}
1004
1005static void
1006rx_fin_state(struct tle_tcp_stream *s, struct resp_info *rsp)
1007{
1008	uint32_t state;
1009	int32_t ackfin;
1010
1011	s->tcb.rcv.nxt += 1;
1012
1013	ackfin = (s->tcb.snd.una == s->tcb.snd.fss);
1014	state = s->tcb.state;
1015
1016	if (state == TCP_ST_ESTABLISHED) {
1017		s->tcb.state = TCP_ST_CLOSE_WAIT;
1018		/* raise err.ev & err.cb */
1019		if (s->err.ev != NULL)
1020			tle_event_raise(s->err.ev);
1021		else if (s->err.cb.func != NULL)
1022			s->err.cb.func(s->err.cb.data, &s->s);
1023	} else if (state == TCP_ST_FIN_WAIT_1 || state == TCP_ST_CLOSING) {
1024		rsp->flags |= TCP_FLAG_ACK;
1025		if (ackfin != 0) {
1026			s->tcb.state = TCP_ST_TIME_WAIT;
1027			s->tcb.snd.rto = TCP_RTO_2MSL;
1028			timer_reset(s);
1029		} else
1030			s->tcb.state = TCP_ST_CLOSING;
1031	} else if (state == TCP_ST_FIN_WAIT_2) {
1032		rsp->flags |= TCP_FLAG_ACK;
1033		s->tcb.state = TCP_ST_TIME_WAIT;
1034		s->tcb.snd.rto = TCP_RTO_2MSL;
1035		timer_reset(s);
1036	} else if (state == TCP_ST_LAST_ACK && ackfin != 0) {
1037		stream_term(s);
1038	}
1039}
1040
1041/*
1042 * FIN process for ESTABLISHED state
1043 * returns:
1044 * 0 < - error occurred
1045 * 0 - FIN was processed OK, and mbuf can be free/reused.
1046 * 0 > - FIN was processed OK and mbuf can't be free/reused.
1047 */
1048static inline int
1049rx_fin(struct tle_tcp_stream *s, uint32_t state,
1050	const union seg_info *si, struct rte_mbuf *mb,
1051	struct resp_info *rsp)
1052{
1053	uint32_t hlen, plen, seq;
1054	int32_t ret;
1055	union tsopt ts;
1056
1057	hlen = PKT_L234_HLEN(mb);
1058	plen = mb->pkt_len - hlen;
1059	seq = si->seq;
1060
1061	ts = rx_tms_opt(&s->tcb, mb);
1062	ret = rx_check_seqack(&s->tcb, seq, si->ack, plen, ts);
1063	if (ret != 0)
1064		return ret;
1065
1066	if (state < TCP_ST_ESTABLISHED)
1067		return -EINVAL;
1068
1069	if (plen != 0) {
1070
1071		ret = data_pkt_adjust(&s->tcb, mb, hlen, &seq, &plen);
1072		if (ret != 0)
1073			return ret;
1074		if (rx_data_enqueue(s, seq, plen, &mb, 1) != 1)
1075			return -ENOBUFS;
1076	}
1077
1078	/* process ack here */
1079	rx_ackdata(s, si->ack);
1080
1081	/* some fragments still missing */
1082	if (seq + plen != s->tcb.rcv.nxt) {
1083		s->tcb.rcv.frs.seq = seq + plen;
1084		s->tcb.rcv.frs.on = 1;
1085	} else
1086		rx_fin_state(s, rsp);
1087
1088	return plen;
1089}
1090
1091static inline int
1092rx_rst(struct tle_tcp_stream *s, uint32_t state, uint32_t flags,
1093	const union seg_info *si)
1094{
1095	int32_t rc;
1096
1097	/*
1098	 * RFC 793: In all states except SYN-SENT, all reset (RST) segments
1099	 * are validated by checking their SEQ-fields.
1100	 * A reset is valid if its sequence number is in the window.
1101	 * In the SYN-SENT state (a RST received in response to an initial SYN),
1102	 * the RST is acceptable if the ACK field acknowledges the SYN.
1103	 */
1104	if (state == TCP_ST_SYN_SENT) {
1105		rc = ((flags & TCP_FLAG_ACK) == 0 ||
1106				si->ack != s->tcb.snd.nxt) ?
1107			-ERANGE : 0;
1108	}
1109
1110	else
1111		rc = check_seqn(&s->tcb, si->seq, 0);
1112
1113	if (rc == 0)
1114		stream_term(s);
1115
1116	return rc;
1117}
1118
1119/*
1120 *  check do we have FIN  that was received out-of-order.
1121 *  if yes, try to process it now.
1122 */
1123static inline void
1124rx_ofo_fin(struct tle_tcp_stream *s, struct resp_info *rsp)
1125{
1126	if (s->tcb.rcv.frs.on != 0 && s->tcb.rcv.nxt == s->tcb.rcv.frs.seq)
1127		rx_fin_state(s, rsp);
1128}
1129
1130static inline void
1131dack_info_init(struct dack_info *tack, const struct tcb *tcb)
1132{
1133	memset(tack, 0, sizeof(*tack));
1134	tack->ack = tcb->snd.una;
1135	tack->segs.dup = tcb->rcv.dupack;
1136	tack->wu.raw = tcb->snd.wu.raw;
1137	tack->wnd = tcb->snd.wnd >> tcb->snd.wscale;
1138}
1139
1140static inline void
1141ack_window_update(struct tcb *tcb, const struct dack_info *tack)
1142{
1143	tcb->snd.wu.raw = tack->wu.raw;
1144	tcb->snd.wnd = tack->wnd << tcb->snd.wscale;
1145}
1146
1147static inline void
1148ack_cwnd_update(struct tcb *tcb, uint32_t acked, const struct dack_info *tack)
1149{
1150	uint32_t n;
1151
1152	n = tack->segs.ack * tcb->snd.mss;
1153
1154	/* slow start phase, RFC 5681 3.1 (2)  */
1155	if (tcb->snd.cwnd < tcb->snd.ssthresh)
1156		tcb->snd.cwnd += RTE_MIN(acked, n);
1157	/* congestion avoidance phase, RFC 5681 3.1 (3) */
1158	else
1159		tcb->snd.cwnd += RTE_MAX(1U, n * tcb->snd.mss / tcb->snd.cwnd);
1160}
1161
1162static inline void
1163rto_ssthresh_update(struct tcb *tcb)
1164{
1165	uint32_t k, n;
1166
1167	/* RFC 5681 3.1 (4)  */
1168	n = (tcb->snd.nxt - tcb->snd.una) / 2;
1169	k = 2 * tcb->snd.mss;
1170	tcb->snd.ssthresh = RTE_MAX(n, k);
1171}
1172
1173static inline void
1174rto_cwnd_update(struct tcb *tcb)
1175{
1176
1177	if (tcb->snd.nb_retx == 0)
1178		rto_ssthresh_update(tcb);
1179
1180	/*
1181	 * RFC 5681 3.1: upon a timeout cwnd MUST be set to
1182	 * no more than 1 full-sized segment.
1183	 */
1184	tcb->snd.cwnd = tcb->snd.mss;
1185}
1186
1187static inline void
1188ack_info_update(struct dack_info *tack, const union seg_info *si,
1189	int32_t badseq, uint32_t dlen, const union tsopt ts)
1190{
1191	if (badseq != 0) {
1192		tack->segs.badseq++;
1193		return;
1194	}
1195
1196	/* segnt with incoming data */
1197	tack->segs.data += (dlen != 0);
1198
1199	/* segment with newly acked data */
1200	if (tcp_seq_lt(tack->ack, si->ack)) {
1201		tack->segs.dup = 0;
1202		tack->segs.ack++;
1203		tack->ack = si->ack;
1204		tack->ts = ts;
1205
1206	/*
1207	 * RFC 5681: An acknowledgment is considered a "duplicate" when:
1208	 * (a) the receiver of the ACK has outstanding data
1209	 * (b) the incoming acknowledgment carries no data
1210	 * (c) the SYN and FIN bits are both off
1211	 * (d) the acknowledgment number is equal to the TCP.UNA
1212	 * (e) the advertised window in the incoming acknowledgment equals the
1213	 * advertised window in the last incoming acknowledgment.
1214	 *
1215	 * Here will have only to check only for (b),(d),(e).
1216	 * (a) will be checked later for the whole bulk of packets,
1217	 * (c) should never happen here.
1218	 */
1219	} else if (dlen == 0 && si->wnd == tack->wnd && ++tack->segs.dup == 3) {
1220		tack->dup3.seg = tack->segs.ack + 1;
1221		tack->dup3.ack = tack->ack;
1222	}
1223
1224	/*
1225	 * RFC 793:
1226	 * If SND.UNA < SEG.ACK =< SND.NXT, the send window should be
1227	 * updated.  If (SND.WL1 < SEG.SEQ or (SND.WL1 = SEG.SEQ and
1228	 * SND.WL2 =< SEG.ACK)), set SND.WND <- SEG.WND, set
1229	 * SND.WL1 <- SEG.SEQ, and set SND.WL2 <- SEG.ACK.
1230	 */
1231	if (tcp_seq_lt(tack->wu.wl1, si->seq) ||
1232			(si->seq == tack->wu.wl1 &&
1233			tcp_seq_leq(tack->wu.wl2, si->ack))) {
1234
1235		tack->wu.wl1 = si->seq;
1236		tack->wu.wl2 = si->ack;
1237		tack->wnd = si->wnd;
1238	}
1239}
1240
1241static inline uint32_t
1242rx_data_ack(struct tle_tcp_stream *s, struct dack_info *tack,
1243	const union seg_info si[], struct rte_mbuf *mb[], struct rte_mbuf *rp[],
1244	int32_t rc[], uint32_t num)
1245{
1246	uint32_t i, j, k, n, t;
1247	uint32_t hlen, plen, seq, tlen;
1248	int32_t ret;
1249	union tsopt ts;
1250
1251	k = 0;
1252	for (i = 0; i != num; i = j) {
1253
1254		hlen = PKT_L234_HLEN(mb[i]);
1255		plen = mb[i]->pkt_len - hlen;
1256		seq = si[i].seq;
1257
1258		ts = rx_tms_opt(&s->tcb, mb[i]);
1259		ret = rx_check_seqack(&s->tcb, seq, si[i].ack, plen, ts);
1260
1261		/* account segment received */
1262		ack_info_update(tack, &si[i], ret != 0, plen, ts);
1263
1264		if (ret == 0) {
1265			/* skip duplicate data, if any */
1266			ret = data_pkt_adjust(&s->tcb, mb[i], hlen,
1267				&seq, &plen);
1268		}
1269
1270		j = i + 1;
1271		if (ret != 0) {
1272			rp[k] = mb[i];
1273			rc[k] = -ret;
1274			k++;
1275			continue;
1276		}
1277
1278		/* group sequential packets together. */
1279		for (tlen = plen; j != num; tlen += plen, j++) {
1280
1281			hlen = PKT_L234_HLEN(mb[j]);
1282			plen = mb[j]->pkt_len - hlen;
1283
1284			/* not consecutive packet */
1285			if (plen == 0 || seq + tlen != si[j].seq)
1286				break;
1287
1288			/* check SEQ/ACK */
1289			ts = rx_tms_opt(&s->tcb, mb[j]);
1290			ret = rx_check_seqack(&s->tcb, si[j].seq, si[j].ack,
1291				plen, ts);
1292
1293			/* account for segment received */
1294			ack_info_update(tack, &si[j], ret != 0, plen, ts);
1295
1296			if (ret != 0) {
1297				rp[k] = mb[j];
1298				rc[k] = -ret;
1299				k++;
1300				break;
1301			}
1302			rte_pktmbuf_adj(mb[j], hlen);
1303		}
1304
1305		n = j - i;
1306		j += (ret != 0);
1307
1308		/* account for OFO data */
1309		if (seq != s->tcb.rcv.nxt)
1310			tack->segs.ofo += n;
1311
1312		/* enqueue packets */
1313		t = rx_data_enqueue(s, seq, tlen, mb + i, n);
1314
1315		/* if we are out of space in stream recv buffer. */
1316		for (; t != n; t++) {
1317			rp[k] = mb[i + t];
1318			rc[k] = -ENOBUFS;
1319			k++;
1320		}
1321	}
1322
1323	return num - k;
1324}
1325
1326static inline void
1327start_fast_retransmit(struct tle_tcp_stream *s)
1328{
1329	struct tcb *tcb;
1330
1331	tcb = &s->tcb;
1332
1333	/* RFC 6582 3.2.2 */
1334	tcb->snd.rcvr = tcb->snd.nxt;
1335	tcb->snd.fastack = 1;
1336
1337	/* RFC 5681 3.2.2 */
1338	rto_ssthresh_update(tcb);
1339
1340	/* RFC 5681 3.2.3 */
1341	tcp_txq_rst_nxt_head(s);
1342	tcb->snd.nxt = tcb->snd.una;
1343	tcb->snd.cwnd = tcb->snd.ssthresh + 3 * tcb->snd.mss;
1344}
1345
1346static inline void
1347stop_fast_retransmit(struct tle_tcp_stream *s)
1348{
1349	struct tcb *tcb;
1350	uint32_t n;
1351
1352	tcb = &s->tcb;
1353	n = tcb->snd.nxt - tcb->snd.una;
1354	tcb->snd.cwnd = RTE_MIN(tcb->snd.ssthresh,
1355		RTE_MAX(n, tcb->snd.mss) + tcb->snd.mss);
1356	tcb->snd.fastack = 0;
1357}
1358
1359static inline int
1360in_fast_retransmit(struct tle_tcp_stream *s, uint32_t ack_len, uint32_t ack_num,
1361	uint32_t dup_num)
1362{
1363	uint32_t n;
1364	struct tcb *tcb;
1365
1366	tcb = &s->tcb;
1367
1368	/* RFC 5682 3.2.3 partial ACK */
1369	if (ack_len != 0) {
1370
1371		n = ack_num * tcb->snd.mss;
1372		if (ack_len >= n)
1373			tcb->snd.cwnd -= ack_len - n;
1374		else
1375			tcb->snd.cwnd -= ack_len % tcb->snd.mss;
1376
1377		/*
1378		 * For the first partial ACK that arrives
1379		 * during fast recovery, also reset the
1380		 * retransmit timer.
1381		 */
1382		if (tcb->snd.fastack == 1)
1383			timer_reset(s);
1384
1385		tcb->snd.fastack += ack_num;
1386		return 1;
1387
1388	/* RFC 5681 3.2.4 */
1389	} else if (dup_num > 3) {
1390		s->tcb.snd.cwnd += (dup_num - 3) * tcb->snd.mss;
1391		return 1;
1392	}
1393
1394	return 0;
1395}
1396
1397static inline int
1398process_ack(struct tle_tcp_stream *s, uint32_t acked,
1399	const struct dack_info *tack)
1400{
1401	int32_t send;
1402
1403	send = 0;
1404
1405	/* normal mode */
1406	if (s->tcb.snd.fastack == 0) {
1407
1408		send = 1;
1409
1410		/* RFC 6582 3.2.2 switch to fast retransmit mode */
1411		if (tack->dup3.seg != 0 && s->tcb.snd.una != s->tcb.snd.nxt &&
1412				s->tcb.snd.una >= s->tcb.snd.rcvr) {
1413
1414			start_fast_retransmit(s);
1415			in_fast_retransmit(s,
1416				tack->ack - tack->dup3.ack,
1417				tack->segs.ack - tack->dup3.seg - 1,
1418				tack->segs.dup);
1419
1420		/* remain in normal mode */
1421		} else if (acked != 0) {
1422			ack_cwnd_update(&s->tcb, acked, tack);
1423			timer_stop(s);
1424		}
1425
1426	/* fast retransmit mode */
1427	} else {
1428
1429		/* remain in fast retransmit mode */
1430		if (s->tcb.snd.una < s->tcb.snd.rcvr) {
1431
1432			send = in_fast_retransmit(s, acked, tack->segs.ack,
1433				tack->segs.dup);
1434		} else {
1435			/* RFC 5682 3.2.3 full ACK */
1436			stop_fast_retransmit(s);
1437			timer_stop(s);
1438
1439			/* if we have another series of dup ACKs */
1440			if (tack->dup3.seg != 0 &&
1441					s->tcb.snd.una != s->tcb.snd.nxt &&
1442					tcp_seq_leq((uint32_t)s->tcb.snd.rcvr,
1443					tack->dup3.ack)) {
1444
1445				/* restart fast retransmit again. */
1446				start_fast_retransmit(s);
1447				send = in_fast_retransmit(s,
1448					tack->ack - tack->dup3.ack,
1449					tack->segs.ack - tack->dup3.seg - 1,
1450					tack->segs.dup);
1451			}
1452		}
1453	}
1454
1455	return send;
1456}
1457
1458/*
1459 * our FIN was acked, stop rto timer, change stream state,
1460 * and possibly close the stream.
1461 */
1462static inline void
1463rx_ackfin(struct tle_tcp_stream *s)
1464{
1465	uint32_t state;
1466
1467	s->tcb.snd.una = s->tcb.snd.fss;
1468	empty_mbuf_ring(s->tx.q);
1469
1470	state = s->tcb.state;
1471	if (state == TCP_ST_LAST_ACK)
1472		stream_term(s);
1473	else if (state == TCP_ST_FIN_WAIT_1) {
1474		timer_stop(s);
1475		s->tcb.state = TCP_ST_FIN_WAIT_2;
1476	} else if (state == TCP_ST_CLOSING) {
1477		s->tcb.state = TCP_ST_TIME_WAIT;
1478		s->tcb.snd.rto = TCP_RTO_2MSL;
1479		timer_reset(s);
1480	}
1481}
1482
1483static inline void
1484rx_process_ack(struct tle_tcp_stream *s, uint32_t ts,
1485	const struct dack_info *tack)
1486{
1487	int32_t send;
1488	uint32_t n;
1489
1490	s->tcb.rcv.dupack = tack->segs.dup;
1491
1492	n = rx_ackdata(s, tack->ack);
1493	send = process_ack(s, n, tack);
1494
1495	/* try to send more data. */
1496	if ((n != 0 || send != 0) && tcp_txq_nxt_cnt(s) != 0)
1497		txs_enqueue(s->s.ctx, s);
1498
1499	/* restart RTO timer. */
1500	if (s->tcb.snd.nxt != s->tcb.snd.una)
1501		timer_start(s);
1502
1503	/* update rto, if fresh packet is here then calculate rtt */
1504	if (tack->ts.ecr != 0)
1505		rto_estimate(&s->tcb, ts - tack->ts.ecr);
1506}
1507
1508/*
1509 * process <SYN,ACK>
1510 * returns negative value on failure, or zero on success.
1511 */
1512static inline int
1513rx_synack(struct tle_tcp_stream *s, uint32_t ts, uint32_t state,
1514	const union seg_info *si, struct rte_mbuf *mb,
1515	struct resp_info *rsp)
1516{
1517	struct syn_opts so;
1518	struct tcp_hdr *th;
1519
1520	if (state != TCP_ST_SYN_SENT)
1521		return -EINVAL;
1522
1523	/* invalid SEG.SEQ */
1524	if (si->ack != (uint32_t)s->tcb.snd.nxt) {
1525		rsp->flags = TCP_FLAG_RST;
1526		return 0;
1527	}
1528
1529	th = rte_pktmbuf_mtod_offset(mb, struct tcp_hdr *,
1530		mb->l2_len + mb->l3_len);
1531	get_syn_opts(&so, (uintptr_t)(th + 1), mb->l4_len - sizeof(*th));
1532
1533	s->tcb.so = so;
1534
1535	s->tcb.snd.una = s->tcb.snd.nxt;
1536	s->tcb.snd.mss = so.mss;
1537	s->tcb.snd.wnd = si->wnd << so.wscale;
1538	s->tcb.snd.wu.wl1 = si->seq;
1539	s->tcb.snd.wu.wl2 = si->ack;
1540	s->tcb.snd.wscale = so.wscale;
1541
1542	/* setup congestion variables */
1543	s->tcb.snd.cwnd = initial_cwnd(s->tcb.snd.mss);
1544	s->tcb.snd.ssthresh = s->tcb.snd.wnd;
1545
1546	s->tcb.rcv.ts = so.ts.val;
1547	s->tcb.rcv.irs = si->seq;
1548	s->tcb.rcv.nxt = si->seq + 1;
1549
1550	/* if peer doesn't support WSCALE opt, recalculate RCV.WND */
1551	s->tcb.rcv.wscale = (so.wscale == TCP_WSCALE_NONE) ?
1552		TCP_WSCALE_NONE : TCP_WSCALE_DEFAULT;
1553	s->tcb.rcv.wnd = calc_rx_wnd(s, s->tcb.rcv.wscale);
1554
1555	/* calculate initial rto */
1556	rto_estimate(&s->tcb, ts - s->tcb.snd.ts);
1557
1558	rsp->flags |= TCP_FLAG_ACK;
1559
1560	timer_stop(s);
1561	s->tcb.state = TCP_ST_ESTABLISHED;
1562	rte_smp_wmb();
1563
1564	if (s->tx.ev != NULL)
1565		tle_event_raise(s->tx.ev);
1566	else if (s->tx.cb.func != NULL)
1567		s->tx.cb.func(s->tx.cb.data, &s->s);
1568
1569	return 0;
1570}
1571
1572static inline uint32_t
1573rx_stream(struct tle_tcp_stream *s, uint32_t ts,
1574	const union pkt_info *pi, const union seg_info si[],
1575	struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1576	uint32_t num)
1577{
1578	uint32_t i, k, n, state;
1579	int32_t ret;
1580	struct resp_info rsp;
1581	struct dack_info tack;
1582
1583	k = 0;
1584	rsp.flags = 0;
1585
1586	state = s->tcb.state;
1587
1588	/*
1589	 * first check for the states/flags where we don't
1590	 * expect groups of packets.
1591	 */
1592
1593	/* process RST */
1594	if ((pi->tf.flags & TCP_FLAG_RST) != 0) {
1595		for (i = 0;
1596				i != num &&
1597				rx_rst(s, state, pi->tf.flags, &si[i]);
1598				i++)
1599			;
1600		i = 0;
1601
1602	/* RFC 793: if the ACK bit is off drop the segment and return */
1603	} else if ((pi->tf.flags & TCP_FLAG_ACK) == 0) {
1604		i = 0;
1605	/*
1606	 * first check for the states/flags where we don't
1607	 * expect groups of packets.
1608	 */
1609
1610	/* process <SYN,ACK> */
1611	} else if ((pi->tf.flags & TCP_FLAG_SYN) != 0) {
1612		ret = 0;
1613		for (i = 0; i != num; i++) {
1614			ret = rx_synack(s, ts, state, &si[i], mb[i], &rsp);
1615			if (ret == 0)
1616				break;
1617
1618			rc[k] = -ret;
1619			rp[k] = mb[i];
1620			k++;
1621		}
1622
1623	/* process FIN */
1624	} else if ((pi->tf.flags & TCP_FLAG_FIN) != 0) {
1625		ret = 0;
1626		for (i = 0; i != num; i++) {
1627			ret = rx_fin(s, state, &si[i], mb[i], &rsp);
1628			if (ret >= 0)
1629				break;
1630
1631			rc[k] = -ret;
1632			rp[k] = mb[i];
1633			k++;
1634		}
1635		i += (ret > 0);
1636
1637	/* normal data/ack packets */
1638	} else if (state >= TCP_ST_ESTABLISHED && state <= TCP_ST_LAST_ACK) {
1639
1640		/* process incoming data packets. */
1641		dack_info_init(&tack, &s->tcb);
1642		n = rx_data_ack(s, &tack, si, mb, rp, rc, num);
1643
1644		/* follow up actions based on aggregated information */
1645
1646		/* update SND.WND */
1647		ack_window_update(&s->tcb, &tack);
1648
1649		/*
1650		 * fast-path: all data & FIN was already sent out
1651		 * and now is acknowledged.
1652		 */
1653		if (s->tcb.snd.fss == s->tcb.snd.nxt &&
1654				tack.ack == (uint32_t) s->tcb.snd.nxt)
1655			rx_ackfin(s);
1656		else
1657			rx_process_ack(s, ts, &tack);
1658
1659		/*
1660		 * send an immediate ACK if either:
1661		 * - received segment with invalid seq/ack number
1662		 * - received segment with OFO data
1663		 * - received segment with INO data and no TX is scheduled
1664		 *   for that stream.
1665		 */
1666		if (tack.segs.badseq != 0 || tack.segs.ofo != 0 ||
1667				(tack.segs.data != 0 &&
1668				rte_atomic32_read(&s->tx.arm) == 0))
1669			rsp.flags |= TCP_FLAG_ACK;
1670
1671		rx_ofo_fin(s, &rsp);
1672
1673		k += num - n;
1674		i = num;
1675
1676	/* unhandled state, drop all packets. */
1677	} else
1678		i = 0;
1679
1680	/* we have a response packet to send. */
1681	if (rsp.flags == TCP_FLAG_RST) {
1682		send_rst(s, si[i].ack);
1683		stream_term(s);
1684	} else if (rsp.flags != 0) {
1685		send_ack(s, ts, rsp.flags);
1686
1687		/* start the timer for FIN packet */
1688		if ((rsp.flags & TCP_FLAG_FIN) != 0)
1689			timer_reset(s);
1690	}
1691
1692	/* unprocessed packets */
1693	for (; i != num; i++, k++) {
1694		rc[k] = EINVAL;
1695		rp[k] = mb[i];
1696	}
1697
1698	return num - k;
1699}
1700
1701static inline uint32_t
1702rx_new_stream(struct tle_tcp_stream *s, uint32_t ts,
1703	const union pkt_info *pi, const union seg_info si[],
1704	struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1705	uint32_t num)
1706{
1707	uint32_t i;
1708
1709	if (rwl_acquire(&s->rx.use) > 0) {
1710		i = rx_stream(s, ts, pi, si, mb, rp, rc, num);
1711		rwl_release(&s->rx.use);
1712		return i;
1713	}
1714
1715	for (i = 0; i != num; i++) {
1716		rc[i] = ENOENT;
1717		rp[i] = mb[i];
1718	}
1719	return 0;
1720}
1721
1722static inline uint32_t
1723rx_postsyn(struct tle_dev *dev, struct stbl *st, uint32_t type, uint32_t ts,
1724	const union pkt_info pi[], const union seg_info si[],
1725	struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1726	uint32_t num)
1727{
1728	struct tle_tcp_stream *cs, *s;
1729	uint32_t i, k, n, state;
1730	int32_t ret;
1731
1732	s = rx_obtain_stream(dev, st, &pi[0], type);
1733	if (s == NULL) {
1734		for (i = 0; i != num; i++) {
1735			rc[i] = ENOENT;
1736			rp[i] = mb[i];
1737		}
1738		return 0;
1739	}
1740
1741	k = 0;
1742	state = s->tcb.state;
1743
1744	if (state == TCP_ST_LISTEN) {
1745
1746		/* one connection per flow */
1747		cs = NULL;
1748		ret = -EINVAL;
1749		for (i = 0; i != num; i++) {
1750
1751			ret = rx_ack_listen(s, st, pi, &si[i], ts, mb[i], &cs);
1752
1753			/* valid packet encountered */
1754			if (ret >= 0)
1755				break;
1756
1757			/* invalid packet, keep trying to find a proper one */
1758			rc[k] = -ret;
1759			rp[k] = mb[i];
1760			k++;
1761		}
1762
1763		/* packet is valid, but we are out of streams to serve it */
1764		if (ret > 0) {
1765			for (; i != num; i++, k++) {
1766				rc[k] = ret;
1767				rp[k] = mb[i];
1768			}
1769		/* new stream is accepted */
1770		} else if (ret == 0) {
1771
1772			/* inform listen stream about new connections */
1773			if (s->rx.ev != NULL)
1774				tle_event_raise(s->rx.ev);
1775			else if (s->rx.cb.func != NULL &&
1776					rte_ring_count(s->rx.q) == 1)
1777				s->rx.cb.func(s->rx.cb.data, &s->s);
1778
1779			/* if there is no data, drop current packet */
1780			if (PKT_L4_PLEN(mb[i]) == 0) {
1781				rc[k] = ENODATA;
1782				rp[k++] = mb[i++];
1783			}
1784
1785			/*  process remaining packets for that stream */
1786			if (num != i) {
1787				n = rx_new_stream(cs, ts, pi + i, si + i,
1788					mb + i, rp + k, rc + k, num - i);
1789				k += num - n - i;
1790			}
1791		}
1792
1793	} else {
1794		i = rx_stream(s, ts, pi, si, mb, rp, rc, num);
1795		k = num - i;
1796	}
1797
1798	rwl_release(&s->rx.use);
1799	return num - k;
1800}
1801
1802
1803static inline uint32_t
1804rx_syn(struct tle_dev *dev, uint32_t type, uint32_t ts,
1805	const union pkt_info pi[], const union seg_info si[],
1806	struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1807	uint32_t num)
1808{
1809	struct tle_tcp_stream *s;
1810	uint32_t i, k;
1811	int32_t ret;
1812
1813	s = rx_obtain_listen_stream(dev, &pi[0], type);
1814	if (s == NULL) {
1815		for (i = 0; i != num; i++) {
1816			rc[i] = ENOENT;
1817			rp[i] = mb[i];
1818		}
1819		return 0;
1820	}
1821
1822	k = 0;
1823	for (i = 0; i != num; i++) {
1824
1825		/* check that this remote is allowed to connect */
1826		if (rx_check_stream(s, &pi[i]) != 0)
1827			ret = -ENOENT;
1828		else
1829			/* syncokie: reply with <SYN,ACK> */
1830			ret = sync_ack(s, &pi[i], &si[i], ts, mb[i]);
1831
1832		if (ret != 0) {
1833			rc[k] = -ret;
1834			rp[k] = mb[i];
1835			k++;
1836		}
1837	}
1838
1839	rwl_release(&s->rx.use);
1840	return num - k;
1841}
1842
1843uint16_t
1844tle_tcp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[],
1845	struct rte_mbuf *rp[], int32_t rc[], uint16_t num)
1846{
1847	struct stbl *st;
1848	uint32_t i, j, k, n, t, ts;
1849	uint64_t csf;
1850	union pkt_info pi[num];
1851	union seg_info si[num];
1852	union {
1853		uint8_t t[TLE_VNUM];
1854		uint32_t raw;
1855	} stu;
1856
1857	ts = tcp_get_tms();
1858	st = CTX_TCP_STLB(dev->ctx);
1859
1860	stu.raw = 0;
1861
1862	/* extract packet info and check the L3/L4 csums */
1863	for (i = 0; i != num; i++) {
1864
1865		get_pkt_info(pkt[i], &pi[i], &si[i]);
1866
1867		t = pi[i].tf.type;
1868		csf = dev->rx.ol_flags[t] &
1869			(PKT_RX_IP_CKSUM_BAD | PKT_RX_L4_CKSUM_BAD);
1870
1871		/* check csums in SW */
1872		if (pi[i].csf == 0 && csf != 0 && check_pkt_csum(pkt[i], csf,
1873				pi[i].tf.type, IPPROTO_TCP) != 0)
1874			pi[i].csf = csf;
1875
1876		stu.t[t] = 1;
1877	}
1878
1879	if (stu.t[TLE_V4] != 0)
1880		stbl_lock(st, TLE_V4);
1881	if (stu.t[TLE_V6] != 0)
1882		stbl_lock(st, TLE_V6);
1883
1884	k = 0;
1885	for (i = 0; i != num; i += j) {
1886
1887		t = pi[i].tf.type;
1888
1889		/*basic checks for incoming packet */
1890		if (t >= TLE_VNUM || pi[i].csf != 0 || dev->dp[t] == NULL) {
1891			rc[k] = EINVAL;
1892			rp[k] = pkt[i];
1893			j = 1;
1894			k++;
1895		/* process input SYN packets */
1896		} else if (pi[i].tf.flags == TCP_FLAG_SYN) {
1897			j = pkt_info_bulk_syneq(pi + i, num - i);
1898			n = rx_syn(dev, t, ts, pi + i, si + i, pkt + i,
1899				rp + k, rc + k, j);
1900			k += j - n;
1901		} else {
1902			j = pkt_info_bulk_eq(pi + i, num - i);
1903			n = rx_postsyn(dev, st, t, ts, pi + i, si + i, pkt + i,
1904				rp + k, rc + k, j);
1905			k += j - n;
1906		}
1907	}
1908
1909	if (stu.t[TLE_V4] != 0)
1910		stbl_unlock(st, TLE_V4);
1911	if (stu.t[TLE_V6] != 0)
1912		stbl_unlock(st, TLE_V6);
1913
1914	return num - k;
1915}
1916
1917uint16_t
1918tle_tcp_stream_accept(struct tle_stream *ts, struct tle_stream *rs[],
1919	uint32_t num)
1920{
1921	uint32_t n;
1922	struct tle_tcp_stream *s;
1923
1924	s = TCP_STREAM(ts);
1925	n = rte_ring_mc_dequeue_burst(s->rx.q, (void **)rs, num);
1926	if (n == 0)
1927		return 0;
1928
1929	/*
1930	 * if we still have packets to read,
1931	 * then rearm stream RX event.
1932	 */
1933	if (n == num && rte_ring_count(s->rx.q) != 0) {
1934		if (rwl_try_acquire(&s->rx.use) > 0 && s->rx.ev != NULL)
1935			tle_event_raise(s->rx.ev);
1936		rwl_release(&s->rx.use);
1937	}
1938
1939	return n;
1940}
1941
1942uint16_t
1943tle_tcp_tx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], uint16_t num)
1944{
1945	uint32_t i, j, k, n;
1946	struct tle_drb *drb[num];
1947	struct tle_tcp_stream *s;
1948
1949	/* extract packets from device TX queue. */
1950
1951	k = num;
1952	n = tle_dring_sc_dequeue(&dev->tx.dr, (const void **)(uintptr_t)pkt,
1953		num, drb, &k);
1954
1955	if (n == 0)
1956		return 0;
1957
1958	/* free empty drbs and notify related streams. */
1959
1960	for (i = 0; i != k; i = j) {
1961		s = drb[i]->udata;
1962		for (j = i + 1; j != k && s == drb[j]->udata; j++)
1963			;
1964		stream_drb_free(s, drb + i, j - i);
1965	}
1966
1967	return n;
1968}
1969
1970static inline void
1971stream_fill_pkt_info(const struct tle_tcp_stream *s, union pkt_info *pi)
1972{
1973	if (s->s.type == TLE_V4)
1974		pi->addr4 = s->s.ipv4.addr;
1975	else
1976		pi->addr6 = &s->s.ipv6.addr;
1977
1978	pi->port = s->s.port;
1979	pi->tf.type = s->s.type;
1980}
1981
1982static int
1983stream_fill_addr(struct tle_tcp_stream *s, const struct sockaddr *addr)
1984{
1985	const struct sockaddr_in *in4;
1986	const struct sockaddr_in6 *in6;
1987	const struct tle_dev_param *prm;
1988	int32_t rc;
1989
1990	rc = 0;
1991	s->s.pmsk.raw = UINT32_MAX;
1992
1993	/* setup L4 src ports and src address fields. */
1994	if (s->s.type == TLE_V4) {
1995		in4 = (const struct sockaddr_in *)addr;
1996		if (in4->sin_addr.s_addr == INADDR_ANY || in4->sin_port == 0)
1997			return -EINVAL;
1998
1999		s->s.port.src = in4->sin_port;
2000		s->s.ipv4.addr.src = in4->sin_addr.s_addr;
2001		s->s.ipv4.mask.src = INADDR_NONE;
2002		s->s.ipv4.mask.dst = INADDR_NONE;
2003
2004	} else if (s->s.type == TLE_V6) {
2005		in6 = (const struct sockaddr_in6 *)addr;
2006		if (memcmp(&in6->sin6_addr, &tle_ipv6_any,
2007				sizeof(tle_ipv6_any)) == 0 ||
2008				in6->sin6_port == 0)
2009			return -EINVAL;
2010
2011		s->s.port.src = in6->sin6_port;
2012		rte_memcpy(&s->s.ipv6.addr.src, &in6->sin6_addr,
2013			sizeof(s->s.ipv6.addr.src));
2014		rte_memcpy(&s->s.ipv6.mask.src, &tle_ipv6_none,
2015			sizeof(s->s.ipv6.mask.src));
2016		rte_memcpy(&s->s.ipv6.mask.dst, &tle_ipv6_none,
2017			sizeof(s->s.ipv6.mask.dst));
2018	}
2019
2020	/* setup the destination device. */
2021	rc = stream_fill_dest(s);
2022	if (rc != 0)
2023		return rc;
2024
2025	/* setup L4 dst address from device param */
2026	prm = &s->tx.dst.dev->prm;
2027	if (s->s.type == TLE_V4) {
2028		if (s->s.ipv4.addr.dst == INADDR_ANY)
2029			s->s.ipv4.addr.dst = prm->local_addr4.s_addr;
2030	} else if (memcmp(&s->s.ipv6.addr.dst, &tle_ipv6_any,
2031			sizeof(tle_ipv6_any)) == 0)
2032		memcpy(&s->s.ipv6.addr.dst, &prm->local_addr6,
2033			sizeof(s->s.ipv6.addr.dst));
2034
2035	return rc;
2036}
2037
2038static inline int
2039tx_syn(struct tle_tcp_stream *s, const struct sockaddr *addr)
2040{
2041	int32_t rc;
2042	uint32_t tms, seq;
2043	union pkt_info pi;
2044	struct stbl *st;
2045	struct stbl_entry *se;
2046
2047	/* fill stream address */
2048	rc = stream_fill_addr(s, addr);
2049	if (rc != 0)
2050		return rc;
2051
2052	/* fill pkt info to generate seq.*/
2053	stream_fill_pkt_info(s, &pi);
2054
2055	tms = tcp_get_tms();
2056	s->tcb.so.ts.val = tms;
2057	s->tcb.so.ts.ecr = 0;
2058	s->tcb.so.wscale = TCP_WSCALE_DEFAULT;
2059	s->tcb.so.mss = calc_smss(s->tx.dst.mtu, &s->tx.dst);
2060
2061	/* note that rcv.nxt is 0 here for sync_gen_seq.*/
2062	seq = sync_gen_seq(&pi, s->tcb.rcv.nxt, tms, s->tcb.so.mss);
2063	s->tcb.snd.iss = seq;
2064	s->tcb.snd.rcvr = seq;
2065	s->tcb.snd.una = seq;
2066	s->tcb.snd.nxt = seq + 1;
2067	s->tcb.snd.rto = TCP_RTO_DEFAULT;
2068	s->tcb.snd.ts = tms;
2069
2070	s->tcb.rcv.mss = s->tcb.so.mss;
2071	s->tcb.rcv.wscale = TCP_WSCALE_DEFAULT;
2072	s->tcb.rcv.wnd = calc_rx_wnd(s, s->tcb.rcv.wscale);
2073	s->tcb.rcv.ts = 0;
2074
2075	/* add the stream in stream table */
2076	st = CTX_TCP_STLB(s->s.ctx);
2077	se = stbl_add_stream_lock(st, s);
2078	if (se == NULL)
2079		return -ENOBUFS;
2080	s->ste = se;
2081
2082	/* put stream into the to-send queue */
2083	txs_enqueue(s->s.ctx, s);
2084
2085	return 0;
2086}
2087
2088int
2089tle_tcp_stream_connect(struct tle_stream *ts, const struct sockaddr *addr)
2090{
2091	struct tle_tcp_stream *s;
2092	uint32_t type;
2093	int32_t rc;
2094
2095	if (ts == NULL || addr == NULL)
2096		return -EINVAL;
2097
2098	s = TCP_STREAM(ts);
2099	type = s->s.type;
2100	if (type >= TLE_VNUM)
2101		return -EINVAL;
2102
2103	if (rwl_try_acquire(&s->tx.use) > 0) {
2104		rc = rte_atomic16_cmpset(&s->tcb.state, TCP_ST_CLOSED,
2105			TCP_ST_SYN_SENT);
2106		rc = (rc == 0) ? -EDEADLK : 0;
2107	} else
2108		rc = -EINVAL;
2109
2110	if (rc != 0) {
2111		rwl_release(&s->tx.use);
2112		return rc;
2113	}
2114
2115	/* fill stream, prepare and transmit syn pkt */
2116	s->tcb.uop |= TCP_OP_CONNECT;
2117	rc = tx_syn(s, addr);
2118	rwl_release(&s->tx.use);
2119
2120	/* error happened, do a cleanup */
2121	if (rc != 0)
2122		tle_tcp_stream_close(ts);
2123
2124	return rc;
2125}
2126
2127uint16_t
2128tle_tcp_stream_recv(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
2129{
2130	uint32_t n;
2131	struct tle_tcp_stream *s;
2132
2133	s = TCP_STREAM(ts);
2134	n = rte_ring_mc_dequeue_burst(s->rx.q, (void **)pkt, num);
2135	if (n == 0)
2136		return 0;
2137
2138	/*
2139	 * if we still have packets to read,
2140	 * then rearm stream RX event.
2141	 */
2142	if (n == num && rte_ring_count(s->rx.q) != 0) {
2143		if (rwl_try_acquire(&s->rx.use) > 0 && s->rx.ev != NULL)
2144			tle_event_raise(s->rx.ev);
2145		rwl_release(&s->rx.use);
2146	}
2147
2148	return n;
2149}
2150
2151static inline int32_t
2152tx_segments(struct tle_tcp_stream *s, uint64_t ol_flags,
2153	struct rte_mbuf *segs[], uint32_t num)
2154{
2155	uint32_t i;
2156	int32_t rc;
2157
2158	for (i = 0; i != num; i++) {
2159		/* Build L2/L3/L4 header */
2160		rc = tcp_fill_mbuf(segs[i], s, &s->tx.dst, ol_flags, s->s.port,
2161			0, TCP_FLAG_ACK, 0, 0);
2162		if (rc != 0) {
2163			free_segments(segs, num);
2164			break;
2165		}
2166	}
2167
2168	if (i == num) {
2169		/* queue packets for further transmission. */
2170		rc = rte_ring_mp_enqueue_bulk(s->tx.q, (void **)segs, num);
2171		if (rc != 0)
2172			free_segments(segs, num);
2173	}
2174
2175	return rc;
2176}
2177
2178uint16_t
2179tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
2180{
2181	uint32_t i, j, k, mss, n, state, type;
2182	int32_t rc;
2183	uint64_t ol_flags;
2184	struct tle_tcp_stream *s;
2185	struct tle_dev *dev;
2186	struct rte_mbuf *segs[TCP_MAX_PKT_SEG];
2187
2188	s = TCP_STREAM(ts);
2189
2190	/* mark stream as not closable. */
2191	if (rwl_acquire(&s->tx.use) < 0) {
2192		rte_errno = EAGAIN;
2193		return 0;
2194	}
2195
2196	state = s->tcb.state;
2197	if (state != TCP_ST_ESTABLISHED && state != TCP_ST_CLOSE_WAIT) {
2198		rte_errno = ENOTCONN;
2199		rwl_release(&s->tx.use);
2200		return 0;
2201	}
2202
2203	mss = s->tcb.snd.mss;
2204	dev = s->tx.dst.dev;
2205	type = s->s.type;
2206	ol_flags = dev->tx.ol_flags[type];
2207
2208	k = 0;
2209	rc = 0;
2210	while (k != num) {
2211		/* prepare and check for TX */
2212		for (i = k; i != num; i++) {
2213			if (pkt[i]->pkt_len > mss ||
2214					pkt[i]->nb_segs > TCP_MAX_PKT_SEG)
2215				break;
2216			rc = tcp_fill_mbuf(pkt[i], s, &s->tx.dst, ol_flags,
2217				s->s.port, 0, TCP_FLAG_ACK, 0, 0);
2218			if (rc != 0)
2219				break;
2220		}
2221
2222		if (i != k) {
2223			/* queue packets for further transmission. */
2224			n = rte_ring_mp_enqueue_burst(s->tx.q, (void **)pkt + k,
2225				(i - k));
2226			k += n;
2227
2228			/*
2229			 * for unsent, but already modified packets:
2230			 * remove pkt l2/l3 headers, restore ol_flags
2231			 */
2232			if (i != k) {
2233				ol_flags = ~dev->tx.ol_flags[type];
2234				for (j = k; j != i; j++) {
2235					rte_pktmbuf_adj(pkt[j], pkt[j]->l2_len +
2236						pkt[j]->l3_len +
2237						pkt[j]->l4_len);
2238					pkt[j]->ol_flags &= ol_flags;
2239				}
2240				break;
2241			}
2242		}
2243
2244		if (rc != 0) {
2245			rte_errno = -rc;
2246			break;
2247
2248		/* segment large packet and enqueue for sending */
2249		} else if (i != num) {
2250			/* segment the packet. */
2251			rc = tcp_segmentation(pkt[i], segs, RTE_DIM(segs),
2252				&s->tx.dst, mss);
2253			if (rc < 0) {
2254				rte_errno = -rc;
2255				break;
2256			}
2257
2258			rc = tx_segments(s, dev->tx.ol_flags[type], segs, rc);
2259			if (rc == 0) {
2260				/* free the large mbuf */
2261				rte_pktmbuf_free(pkt[i]);
2262				/* set the mbuf as consumed */
2263				k++;
2264			} else
2265				/* no space left in tx queue */
2266				break;
2267		}
2268	}
2269
2270	/* notify BE about more data to send */
2271	if (k != 0)
2272		txs_enqueue(s->s.ctx, s);
2273	/* if possible, re-arm stream write event. */
2274	if (rte_ring_free_count(s->tx.q) != 0 && s->tx.ev != NULL)
2275		tle_event_raise(s->tx.ev);
2276
2277	rwl_release(&s->tx.use);
2278
2279	return k;
2280}
2281
2282/* send data and FIN (if needed) */
2283static inline void
2284tx_data_fin(struct tle_tcp_stream *s, uint32_t tms, uint32_t state)
2285{
2286	/* try to send some data */
2287	tx_nxt_data(s, tms);
2288
2289	/* we also have to send a FIN */
2290	if (state != TCP_ST_ESTABLISHED &&
2291			state != TCP_ST_CLOSE_WAIT &&
2292			tcp_txq_nxt_cnt(s) == 0 &&
2293			s->tcb.snd.fss != s->tcb.snd.nxt) {
2294		s->tcb.snd.fss = ++s->tcb.snd.nxt;
2295		send_ack(s, tms, TCP_FLAG_FIN | TCP_FLAG_ACK);
2296	}
2297}
2298
2299static inline void
2300tx_stream(struct tle_tcp_stream *s, uint32_t tms)
2301{
2302	uint32_t state;
2303
2304	state = s->tcb.state;
2305
2306	if (state == TCP_ST_SYN_SENT) {
2307		/* send the SYN, start the rto timer */
2308		send_ack(s, tms, TCP_FLAG_SYN);
2309		timer_start(s);
2310
2311	} else if (state >= TCP_ST_ESTABLISHED && state <= TCP_ST_LAST_ACK) {
2312
2313		tx_data_fin(s, tms, state);
2314
2315		/* start RTO timer. */
2316		if (s->tcb.snd.nxt != s->tcb.snd.una)
2317			timer_start(s);
2318	}
2319}
2320
2321static inline void
2322rto_stream(struct tle_tcp_stream *s, uint32_t tms)
2323{
2324	uint32_t state;
2325
2326	state = s->tcb.state;
2327
2328	TCP_LOG(DEBUG, "%s(%p, tms=%u): state=%u, "
2329		"retx=%u, retm=%u, "
2330		"rto=%u, snd.ts=%u, tmo=%u, "
2331		"snd.nxt=%lu, snd.una=%lu, flight_size=%lu, "
2332		"snd.rcvr=%lu, snd.fastack=%u, "
2333		"wnd=%u, cwnd=%u, ssthresh=%u, "
2334		"bytes sent=%lu, pkt remain=%u;\n",
2335		__func__, s, tms, s->tcb.state,
2336		s->tcb.snd.nb_retx, s->tcb.snd.nb_retm,
2337		s->tcb.snd.rto, s->tcb.snd.ts, tms - s->tcb.snd.ts,
2338		s->tcb.snd.nxt, s->tcb.snd.una, s->tcb.snd.nxt - s->tcb.snd.una,
2339		s->tcb.snd.rcvr, s->tcb.snd.fastack,
2340		s->tcb.snd.wnd, s->tcb.snd.cwnd, s->tcb.snd.ssthresh,
2341		s->tcb.snd.nxt - s->tcb.snd.iss, tcp_txq_nxt_cnt(s));
2342
2343	if (s->tcb.snd.nb_retx < s->tcb.snd.nb_retm) {
2344
2345		if (state >= TCP_ST_ESTABLISHED && state <= TCP_ST_LAST_ACK) {
2346
2347			/* update SND.CWD and SND.SSTHRESH */
2348			rto_cwnd_update(&s->tcb);
2349
2350			/* RFC 6582 3.2.4 */
2351			s->tcb.snd.rcvr = s->tcb.snd.nxt;
2352			s->tcb.snd.fastack = 0;
2353
2354			/* restart from last acked data */
2355			tcp_txq_rst_nxt_head(s);
2356			s->tcb.snd.nxt = s->tcb.snd.una;
2357
2358			tx_data_fin(s, tms, state);
2359
2360		} else if (state == TCP_ST_SYN_SENT) {
2361			/* resending SYN */
2362			s->tcb.so.ts.val = tms;
2363			send_ack(s, tms, TCP_FLAG_SYN);
2364
2365		} else if (state == TCP_ST_TIME_WAIT) {
2366			stream_term(s);
2367		}
2368
2369		/* RFC6298:5.5 back off the timer */
2370		s->tcb.snd.rto = rto_roundup(2 * s->tcb.snd.rto);
2371		s->tcb.snd.nb_retx++;
2372		timer_restart(s);
2373
2374	} else {
2375		send_rst(s, s->tcb.snd.una);
2376		stream_term(s);
2377	}
2378}
2379
2380int
2381tle_tcp_process(struct tle_ctx *ctx, uint32_t num)
2382{
2383	uint32_t i, k, tms;
2384	struct sdr *dr;
2385	struct tle_timer_wheel *tw;
2386	struct tle_stream *p;
2387	struct tle_tcp_stream *s, *rs[num];
2388
2389	/* process streams with RTO exipred */
2390
2391	tw = CTX_TCP_TMWHL(ctx);
2392	tms = tcp_get_tms();
2393	tle_timer_expire(tw, tms);
2394
2395	k = tle_timer_get_expired_bulk(tw, (void **)rs, RTE_DIM(rs));
2396
2397	for (i = 0; i != k; i++) {
2398
2399		s = rs[i];
2400		s->timer.handle = NULL;
2401		if (rwl_try_acquire(&s->tx.use) > 0)
2402			rto_stream(s, tms);
2403		rwl_release(&s->tx.use);
2404	}
2405
2406	/* process streams from to-send queue */
2407
2408	k = txs_dequeue_bulk(ctx, rs, RTE_DIM(rs));
2409
2410	for (i = 0; i != k; i++) {
2411
2412		s = rs[i];
2413		if (rwl_try_acquire(&s->tx.use) > 0 &&
2414				rte_atomic32_read(&s->tx.arm) > 0) {
2415			rte_atomic32_set(&s->tx.arm, 0);
2416			tx_stream(s, tms);
2417		}
2418		rwl_release(&s->tx.use);
2419	}
2420
2421	/* collect streams to close from the death row */
2422
2423	dr = CTX_TCP_SDR(ctx);
2424	for (k = 0, p = STAILQ_FIRST(&dr->be);
2425			k != num && p != NULL;
2426			k++, p = STAILQ_NEXT(p, link))
2427		rs[k] = TCP_STREAM(p);
2428
2429	if (p == NULL)
2430		STAILQ_INIT(&dr->be);
2431	else
2432		STAILQ_FIRST(&dr->be) = p;
2433
2434	/* cleanup closed streams */
2435	for (i = 0; i != k; i++) {
2436		s = rs[i];
2437		tcp_stream_down(s);
2438		tcp_stream_reset(ctx, s);
2439	}
2440
2441	return 0;
2442}
2443