tcp_rxtx.c revision 9fa82a63
1/*
2 * Copyright (c) 2016  Intel Corporation.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#include <rte_errno.h>
17#include <rte_ethdev.h>
18#include <rte_ip.h>
19#include <rte_ip_frag.h>
20#include <rte_tcp.h>
21
22#include "tcp_stream.h"
23#include "tcp_timer.h"
24#include "stream_table.h"
25#include "syncookie.h"
26#include "misc.h"
27#include "tcp_ctl.h"
28#include "tcp_rxq.h"
29#include "tcp_txq.h"
30#include "tcp_tx_seg.h"
31
32#define	TCP_MAX_PKT_SEG	0x20
33
34/*
35 * checks if input TCP ports and IP addresses match given stream.
36 * returns zero on success.
37 */
38static inline int
39rx_check_stream(const struct tle_tcp_stream *s, const union pkt_info *pi)
40{
41	int32_t rc;
42
43	if (pi->tf.type == TLE_V4)
44		rc = (pi->port.raw & s->s.pmsk.raw) != s->s.port.raw ||
45			(pi->addr4.raw & s->s.ipv4.mask.raw) !=
46			s->s.ipv4.addr.raw;
47	else
48		rc = (pi->port.raw & s->s.pmsk.raw) != s->s.port.raw ||
49			ymm_mask_cmp(&pi->addr6->raw, &s->s.ipv6.addr.raw,
50			&s->s.ipv6.mask.raw) != 0;
51
52	return rc;
53}
54
55static inline struct tle_tcp_stream *
56rx_obtain_listen_stream(const struct tle_dev *dev, const union pkt_info *pi,
57	uint32_t type)
58{
59	struct tle_tcp_stream *s;
60
61	s = (struct tle_tcp_stream *)dev->dp[type]->streams[pi->port.dst];
62	if (s == NULL || rwl_acquire(&s->rx.use) < 0)
63		return NULL;
64
65	/* check that we have a proper stream. */
66	if (s->tcb.state != TCP_ST_LISTEN) {
67		rwl_release(&s->rx.use);
68		s = NULL;
69	}
70
71	return s;
72}
73
74static inline struct tle_tcp_stream *
75rx_obtain_stream(const struct tle_dev *dev, struct stbl *st,
76	const union pkt_info *pi, uint32_t type)
77{
78	struct tle_tcp_stream *s;
79
80	s = stbl_find_data(st, pi);
81	if (s == NULL) {
82		if (pi->tf.flags == TCP_FLAG_ACK)
83			return rx_obtain_listen_stream(dev, pi, type);
84		return NULL;
85	}
86
87	if (stbl_data_pkt(s) || rwl_acquire(&s->rx.use) < 0)
88		return NULL;
89	/* check that we have a proper stream. */
90	else if (s->tcb.state == TCP_ST_CLOSED) {
91		rwl_release(&s->rx.use);
92		s = NULL;
93	}
94
95	return s;
96}
97
98/*
99 * Consider 2 pkt_info *equal* if their:
100 * - types (IPv4/IPv6)
101 * - TCP flags
102 * - checksum flags
103 * - TCP src and dst ports
104 * - IP src and dst addresses
105 * are equal.
106 */
107static inline int
108pkt_info_bulk_eq(const union pkt_info pi[], uint32_t num)
109{
110	uint32_t i;
111
112	i = 1;
113
114	if (pi[0].tf.type == TLE_V4) {
115		while (i != num && xmm_cmp(&pi[0].raw, &pi[i].raw) == 0)
116			i++;
117
118	} else if (pi[0].tf.type == TLE_V6) {
119		while (i != num &&
120				pi[0].raw.u64[0] == pi[i].raw.u64[0] &&
121				ymm_cmp(&pi[0].addr6->raw,
122				&pi[i].addr6->raw) == 0)
123			i++;
124	}
125
126	return i;
127}
128
129static inline int
130pkt_info_bulk_syneq(const union pkt_info pi[], uint32_t num)
131{
132	uint32_t i;
133
134	i = 1;
135
136	if (pi[0].tf.type == TLE_V4) {
137		while (i != num && pi[0].tf.raw == pi[i].tf.raw &&
138				pi[0].port.dst == pi[i].port.dst &&
139				pi[0].addr4.dst == pi[i].addr4.dst)
140			i++;
141
142	} else if (pi[0].tf.type == TLE_V6) {
143		while (i != num && pi[0].tf.raw == pi[i].tf.raw &&
144				pi[0].port.dst == pi[i].port.dst &&
145				xmm_cmp(&pi[0].addr6->dst,
146				&pi[i].addr6->dst) == 0)
147			i++;
148	}
149
150	return i;
151}
152
153static inline void
154stream_drb_free(struct tle_tcp_stream *s, struct tle_drb *drbs[],
155	uint32_t nb_drb)
156{
157	rte_ring_enqueue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
158}
159
160static inline uint32_t
161stream_drb_alloc(struct tle_tcp_stream *s, struct tle_drb *drbs[],
162	uint32_t nb_drb)
163{
164	return rte_ring_dequeue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
165}
166
167static inline void
168fill_tcph(struct tcp_hdr *l4h, const struct tcb *tcb, union l4_ports port,
169	uint32_t seq, uint8_t hlen, uint8_t flags)
170{
171	uint16_t wnd;
172
173	l4h->src_port = port.dst;
174	l4h->dst_port = port.src;
175
176	wnd = (flags & TCP_FLAG_SYN) ?
177		RTE_MIN(tcb->rcv.wnd, (uint32_t)UINT16_MAX) :
178		tcb->rcv.wnd >> tcb->rcv.wscale;
179
180	/* ??? use sse shuffle to hton all remaining 16 bytes at once. ??? */
181	l4h->sent_seq = rte_cpu_to_be_32(seq);
182	l4h->recv_ack = rte_cpu_to_be_32(tcb->rcv.nxt);
183	l4h->data_off = hlen / TCP_DATA_ALIGN << TCP_DATA_OFFSET;
184	l4h->tcp_flags = flags;
185	l4h->rx_win = rte_cpu_to_be_16(wnd);
186	l4h->cksum = 0;
187	l4h->tcp_urp = 0;
188
189	if (flags & TCP_FLAG_SYN)
190		fill_syn_opts(l4h + 1, &tcb->so);
191	else if ((flags & TCP_FLAG_RST) == 0 && tcb->so.ts.raw != 0)
192		fill_tms_opts(l4h + 1, tcb->snd.ts, tcb->rcv.ts);
193}
194
195static inline int
196tcp_fill_mbuf(struct rte_mbuf *m, const struct tle_tcp_stream *s,
197	const struct tle_dest *dst, uint64_t ol_flags,
198	union l4_ports port, uint32_t seq, uint32_t flags,
199	uint32_t pid, uint32_t swcsm)
200{
201	uint32_t l4, len, plen;
202	struct tcp_hdr *l4h;
203	char *l2h;
204
205	len = dst->l2_len + dst->l3_len;
206	plen = m->pkt_len;
207
208	if (flags & TCP_FLAG_SYN)
209		l4 = sizeof(*l4h) + TCP_TX_OPT_LEN_MAX;
210	else if ((flags & TCP_FLAG_RST) == 0 && s->tcb.rcv.ts != 0)
211		l4 = sizeof(*l4h) + TCP_TX_OPT_LEN_TMS;
212	else
213		l4 = sizeof(*l4h);
214
215	/* adjust mbuf to put L2/L3/L4 headers into it. */
216	l2h = rte_pktmbuf_prepend(m, len + l4);
217	if (l2h == NULL)
218		return -EINVAL;
219
220	/* copy L2/L3 header */
221	rte_memcpy(l2h, dst->hdr, len);
222
223	/* setup TCP header & options */
224	l4h = (struct tcp_hdr *)(l2h + len);
225	fill_tcph(l4h, &s->tcb, port, seq, l4, flags);
226
227	/* setup mbuf TX offload related fields. */
228	m->tx_offload = _mbuf_tx_offload(dst->l2_len, dst->l3_len, l4, 0, 0, 0);
229	m->ol_flags |= ol_flags;
230
231	/* update proto specific fields. */
232
233	if (s->s.type == TLE_V4) {
234		struct ipv4_hdr *l3h;
235		l3h = (struct ipv4_hdr *)(l2h + dst->l2_len);
236		l3h->packet_id = rte_cpu_to_be_16(pid);
237		l3h->total_length = rte_cpu_to_be_16(plen + dst->l3_len + l4);
238
239		if ((ol_flags & PKT_TX_TCP_CKSUM) != 0)
240			l4h->cksum = _ipv4x_phdr_cksum(l3h, m->l3_len,
241				ol_flags);
242		else if (swcsm != 0)
243			l4h->cksum = _ipv4_udptcp_mbuf_cksum(m, len, l3h);
244
245		if ((ol_flags & PKT_TX_IP_CKSUM) == 0 && swcsm != 0)
246			l3h->hdr_checksum = _ipv4x_cksum(l3h, m->l3_len);
247	} else {
248		struct ipv6_hdr *l3h;
249		l3h = (struct ipv6_hdr *)(l2h + dst->l2_len);
250		l3h->payload_len = rte_cpu_to_be_16(plen + l4);
251		if ((ol_flags & PKT_TX_TCP_CKSUM) != 0)
252			l4h->cksum = rte_ipv6_phdr_cksum(l3h, ol_flags);
253		else if (swcsm != 0)
254			l4h->cksum = _ipv6_udptcp_mbuf_cksum(m, len, l3h);
255	}
256
257	return 0;
258}
259
260/*
261 * That function supposed to be used only for data packets.
262 * Assumes that L2/L3/L4 headers and mbuf fields already setup properly.
263 *  - updates tcp SEG.SEQ, SEG.ACK, TS.VAL, TS.ECR.
264 *  - if no HW cksum offloads are enabled, calculates TCP checksum.
265 */
266static inline void
267tcp_update_mbuf(struct rte_mbuf *m, uint32_t type, const struct tcb *tcb,
268	uint32_t seq, uint32_t pid)
269{
270	struct tcp_hdr *l4h;
271	uint32_t len;
272
273	len = m->l2_len + m->l3_len;
274	l4h = rte_pktmbuf_mtod_offset(m, struct tcp_hdr *, len);
275
276	l4h->sent_seq = rte_cpu_to_be_32(seq);
277	l4h->recv_ack = rte_cpu_to_be_32(tcb->rcv.nxt);
278
279	if (tcb->so.ts.raw != 0)
280		fill_tms_opts(l4h + 1, tcb->snd.ts, tcb->rcv.ts);
281
282	if (type == TLE_V4) {
283		struct ipv4_hdr *l3h;
284		l3h = rte_pktmbuf_mtod_offset(m, struct ipv4_hdr *, m->l2_len);
285		l3h->hdr_checksum = 0;
286		l3h->packet_id = rte_cpu_to_be_16(pid);
287		if ((m->ol_flags & PKT_TX_IP_CKSUM) == 0)
288			l3h->hdr_checksum = _ipv4x_cksum(l3h, m->l3_len);
289	}
290
291	/* have to calculate TCP checksum in SW */
292	if ((m->ol_flags & PKT_TX_TCP_CKSUM) == 0) {
293
294		l4h->cksum = 0;
295
296		if (type == TLE_V4) {
297			struct ipv4_hdr *l3h;
298			l3h = rte_pktmbuf_mtod_offset(m, struct ipv4_hdr *,
299				m->l2_len);
300			l4h->cksum = _ipv4_udptcp_mbuf_cksum(m, len, l3h);
301
302		} else {
303			struct ipv6_hdr *l3h;
304			l3h = rte_pktmbuf_mtod_offset(m, struct ipv6_hdr *,
305				m->l2_len);
306			l4h->cksum = _ipv6_udptcp_mbuf_cksum(m, len, l3h);
307		}
308	}
309}
310
311/* Send data packets that need to be ACK-ed by peer */
312static inline uint32_t
313tx_data_pkts(struct tle_tcp_stream *s, struct rte_mbuf *const m[], uint32_t num)
314{
315	uint32_t bsz, i, nb, nbm;
316	struct tle_dev *dev;
317	struct tle_drb *drb[num];
318
319	/* calculate how many drbs are needed.*/
320	bsz = s->tx.drb.nb_elem;
321	nbm = (num + bsz - 1) / bsz;
322
323	/* allocate drbs, adjust number of packets. */
324	nb = stream_drb_alloc(s, drb, nbm);
325
326	/* drb ring is empty. */
327	if (nb == 0)
328		return 0;
329
330	else if (nb != nbm)
331		num = nb * bsz;
332
333	dev = s->tx.dst.dev;
334
335	/* enqueue pkts for TX. */
336	nbm = nb;
337	i = tle_dring_mp_enqueue(&dev->tx.dr, (const void * const*)m,
338		num, drb, &nb);
339
340	/* free unused drbs. */
341	if (nb != 0)
342		stream_drb_free(s, drb + nbm - nb, nb);
343
344	return i;
345}
346
347static inline uint32_t
348tx_data_bulk(struct tle_tcp_stream *s, union seqlen *sl, struct rte_mbuf *mi[],
349	uint32_t num)
350{
351	uint32_t fail, i, k, n, mss, pid, plen, sz, tn, type;
352	struct tle_dev *dev;
353	struct rte_mbuf *mb;
354	struct rte_mbuf *mo[MAX_PKT_BURST + TCP_MAX_PKT_SEG];
355
356	mss = s->tcb.snd.mss;
357	type = s->s.type;
358
359	dev = s->tx.dst.dev;
360	pid = rte_atomic32_add_return(&dev->tx.packet_id[type], num) - num;
361
362	k = 0;
363	tn = 0;
364	fail = 0;
365	for (i = 0; i != num && sl->len != 0 && fail == 0; i++) {
366
367		mb = mi[i];
368		sz = RTE_MIN(sl->len, mss);
369		plen = PKT_L4_PLEN(mb);
370
371		/*fast path, no need to use indirect mbufs. */
372		if (plen <= sz) {
373
374			/* update pkt TCP header */
375			tcp_update_mbuf(mb, type, &s->tcb, sl->seq, pid + i);
376
377			/* keep mbuf till ACK is received. */
378			rte_pktmbuf_refcnt_update(mb, 1);
379			sl->len -= plen;
380			sl->seq += plen;
381			mo[k++] = mb;
382		/* remaining snd.wnd is less them MSS, send nothing */
383		} else if (sz < mss)
384			break;
385		/* packet indirection needed */
386		else
387			RTE_VERIFY(0);
388
389		if (k >= MAX_PKT_BURST) {
390			n = tx_data_pkts(s, mo, k);
391			fail = k - n;
392			tn += n;
393			k = 0;
394		}
395	}
396
397	if (k != 0) {
398		n = tx_data_pkts(s, mo, k);
399		fail = k - n;
400		tn += n;
401	}
402
403	if (fail != 0) {
404		sz = tcp_mbuf_seq_free(mo + n, fail);
405		sl->seq -= sz;
406		sl->len += sz;
407	}
408
409	return tn;
410}
411
412/*
413 * gets data from stream send buffer, updates it and
414 * queues it into TX device queue.
415 * Note that this function and is not MT safe.
416 */
417static inline uint32_t
418tx_nxt_data(struct tle_tcp_stream *s, uint32_t tms)
419{
420	uint32_t n, num, tn, wnd;
421	struct rte_mbuf **mi;
422	union seqlen sl;
423
424	tn = 0;
425	wnd = s->tcb.snd.wnd - (uint32_t)(s->tcb.snd.nxt - s->tcb.snd.una);
426	sl.seq = s->tcb.snd.nxt;
427	sl.len = RTE_MIN(wnd, s->tcb.snd.cwnd);
428
429	if (sl.len == 0)
430		return tn;
431
432	/* update send timestamp */
433	s->tcb.snd.ts = tms;
434
435	do {
436		/* get group of packets */
437		mi = tcp_txq_get_nxt_objs(s, &num);
438
439		/* stream send buffer is empty */
440		if (num == 0)
441			break;
442
443		/* queue data packets for TX */
444		n = tx_data_bulk(s, &sl, mi, num);
445		tn += n;
446
447		/* update consumer head */
448		tcp_txq_set_nxt_head(s, n);
449	} while (n == num);
450
451	s->tcb.snd.nxt += sl.seq - (uint32_t)s->tcb.snd.nxt;
452	return tn;
453}
454
455static inline void
456free_una_data(struct tle_tcp_stream *s, uint32_t len)
457{
458	uint32_t i, n, num, plen;
459	struct rte_mbuf **mi;
460
461	n = 0;
462	plen = 0;
463
464	do {
465		/* get group of packets */
466		mi = tcp_txq_get_una_objs(s, &num);
467
468		if (num == 0)
469			break;
470
471		/* free acked data */
472		for (i = 0; i != num && n != len; i++, n = plen) {
473			plen += PKT_L4_PLEN(mi[i]);
474			if (plen > len) {
475				/* keep SND.UNA at the start of the packet */
476				len -= RTE_MIN(len, plen - len);
477				break;
478			}
479			rte_pktmbuf_free(mi[i]);
480		}
481
482		/* update consumer tail */
483		tcp_txq_set_una_tail(s, i);
484	} while (plen < len);
485
486	s->tcb.snd.una += len;
487
488	/*
489	 * that could happen in case of retransmit,
490	 * adjust SND.NXT with SND.UNA.
491	 */
492	if (s->tcb.snd.una > s->tcb.snd.nxt) {
493		tcp_txq_rst_nxt_head(s);
494		s->tcb.snd.nxt = s->tcb.snd.una;
495	}
496}
497
498static inline uint16_t
499calc_smss(uint16_t mss, const struct tle_dest *dst)
500{
501	uint16_t n;
502
503	n = dst->mtu - dst->l2_len - dst->l3_len - TCP_TX_HDR_DACK;
504	mss = RTE_MIN(n, mss);
505	return mss;
506}
507
508/*
509 * RFC 5681 3.1
510 * If SMSS > 2190 bytes:
511 *     IW = 2 * SMSS bytes and MUST NOT be more than 2 segments
512 *  If (SMSS > 1095 bytes) and (SMSS <= 2190 bytes):
513 *     IW = 3 * SMSS bytes and MUST NOT be more than 3 segments
514 *  if SMSS <= 1095 bytes:
515 *     IW = 4 * SMSS bytes and MUST NOT be more than 4 segments
516 */
517static inline uint32_t
518initial_cwnd(uint16_t smss)
519{
520	if (smss > 2190)
521		return 2 * smss;
522	else if (smss > 1095)
523		return 3 * smss;
524	return 4 * smss;
525}
526
527/*
528 * queue standalone packet to he particular output device
529 * It assumes that:
530 * - L2/L3/L4 headers should be already set.
531 * - packet fits into one segment.
532 */
533static inline int
534send_pkt(struct tle_tcp_stream *s, struct tle_dev *dev, struct rte_mbuf *m)
535{
536	uint32_t n, nb;
537	struct tle_drb *drb;
538
539	if (stream_drb_alloc(s, &drb, 1) == 0)
540		return -ENOBUFS;
541
542	/* enqueue pkt for TX. */
543	nb = 1;
544	n = tle_dring_mp_enqueue(&dev->tx.dr, (const void * const*)&m, 1,
545		&drb, &nb);
546
547	/* free unused drbs. */
548	if (nb != 0)
549		stream_drb_free(s, &drb, 1);
550
551	return (n == 1) ? 0 : -ENOBUFS;
552}
553
554static inline int
555send_ctrl_pkt(struct tle_tcp_stream *s, struct rte_mbuf *m, uint32_t seq,
556	uint32_t flags)
557{
558	const struct tle_dest *dst;
559	uint32_t pid, type;
560	int32_t rc;
561
562	dst = &s->tx.dst;
563	type = s->s.type;
564	pid = rte_atomic32_add_return(&dst->dev->tx.packet_id[type], 1) - 1;
565
566	rc = tcp_fill_mbuf(m, s, dst, 0, s->s.port, seq, flags, pid, 1);
567	if (rc == 0)
568		rc = send_pkt(s, dst->dev, m);
569
570	return rc;
571}
572
573static inline int
574send_rst(struct tle_tcp_stream *s, uint32_t seq)
575{
576	struct rte_mbuf *m;
577	int32_t rc;
578
579	m = rte_pktmbuf_alloc(s->tx.dst.head_mp);
580	if (m == NULL)
581		return -ENOMEM;
582
583	rc = send_ctrl_pkt(s, m, seq, TCP_FLAG_RST);
584	if (rc != 0)
585		rte_pktmbuf_free(m);
586
587	return rc;
588}
589
590static inline int
591send_ack(struct tle_tcp_stream *s, uint32_t tms, uint32_t flags)
592{
593	struct rte_mbuf *m;
594	uint32_t seq;
595	int32_t rc;
596
597	m = rte_pktmbuf_alloc(s->tx.dst.head_mp);
598	if (m == NULL)
599		return -ENOMEM;
600
601	seq = s->tcb.snd.nxt - ((flags & (TCP_FLAG_FIN | TCP_FLAG_SYN)) != 0);
602	s->tcb.snd.ts = tms;
603
604	rc = send_ctrl_pkt(s, m, seq, flags);
605	if (rc != 0) {
606		rte_pktmbuf_free(m);
607		return rc;
608	}
609
610	s->tcb.snd.ack = s->tcb.rcv.nxt;
611	return 0;
612}
613
614
615static int
616sync_ack(struct tle_tcp_stream *s, const union pkt_info *pi,
617	const union seg_info *si, uint32_t ts, struct rte_mbuf *m)
618{
619	uint16_t len;
620	int32_t rc;
621	uint32_t pid, seq, type;
622	struct tle_dev *dev;
623	const void *da;
624	struct tle_dest dst;
625	const struct tcp_hdr *th;
626
627	type = s->s.type;
628
629	/* get destination information. */
630	if (type == TLE_V4)
631		da = &pi->addr4.src;
632	else
633		da = &pi->addr6->src;
634
635	rc = stream_get_dest(&s->s, da, &dst);
636	if (rc < 0)
637		return rc;
638
639	th = rte_pktmbuf_mtod_offset(m, const struct tcp_hdr *,
640		m->l2_len + m->l3_len);
641	get_syn_opts(&s->tcb.so, (uintptr_t)(th + 1), m->l4_len - sizeof(*th));
642
643	s->tcb.rcv.nxt = si->seq + 1;
644	seq = sync_gen_seq(pi, s->tcb.rcv.nxt, ts, s->tcb.so.mss,
645				s->s.ctx->prm.hash_alg,
646				&s->s.ctx->prm.secret_key);
647	s->tcb.so.ts.ecr = s->tcb.so.ts.val;
648	s->tcb.so.ts.val = sync_gen_ts(ts, s->tcb.so.wscale);
649	s->tcb.so.wscale = (s->tcb.so.wscale == TCP_WSCALE_NONE) ?
650		TCP_WSCALE_NONE : TCP_WSCALE_DEFAULT;
651	s->tcb.so.mss = calc_smss(dst.mtu, &dst);
652
653	/* reset mbuf's data contents. */
654	len = m->l2_len + m->l3_len + m->l4_len;
655	m->tx_offload = 0;
656	if (rte_pktmbuf_adj(m, len) == NULL)
657		return -EINVAL;
658
659	dev = dst.dev;
660	pid = rte_atomic32_add_return(&dev->tx.packet_id[type], 1) - 1;
661
662	rc = tcp_fill_mbuf(m, s, &dst, 0, pi->port, seq,
663		TCP_FLAG_SYN | TCP_FLAG_ACK, pid, 1);
664	if (rc == 0)
665		rc = send_pkt(s, dev, m);
666
667	return rc;
668}
669
670/*
671 * RFC 793:
672 * There are four cases for the acceptability test for an incoming segment:
673 * Segment Receive  Test
674 * Length  Window
675 * ------- -------  -------------------------------------------
676 *    0       0     SEG.SEQ = RCV.NXT
677 *    0      >0     RCV.NXT =< SEG.SEQ < RCV.NXT+RCV.WND
678 *   >0       0     not acceptable
679 *   >0      >0     RCV.NXT =< SEG.SEQ < RCV.NXT+RCV.WND
680 *                  or RCV.NXT =< SEG.SEQ+SEG.LEN-1 < RCV.NXT+RCV.WND
681 */
682static inline int
683check_seqn(const struct tcb *tcb, uint32_t seqn, uint32_t len)
684{
685	uint32_t n;
686
687	n = seqn + len;
688	if (seqn - tcb->rcv.nxt >= tcb->rcv.wnd &&
689			n - tcb->rcv.nxt > tcb->rcv.wnd)
690		return -ERANGE;
691
692	return 0;
693}
694
695static inline union tsopt
696rx_tms_opt(const struct tcb *tcb, const struct rte_mbuf *mb)
697{
698	union tsopt ts;
699	uintptr_t opt;
700	const struct tcp_hdr *th;
701
702	if (tcb->so.ts.val != 0) {
703		opt = rte_pktmbuf_mtod_offset(mb, uintptr_t,
704			mb->l2_len + mb->l3_len + sizeof(*th));
705		ts = get_tms_opts(opt, mb->l4_len - sizeof(*th));
706	} else
707		ts.raw = 0;
708
709	return ts;
710}
711
712/*
713 * PAWS and sequence check.
714 * RFC 1323 4.2.1
715 */
716static inline int
717rx_check_seq(struct tcb *tcb, uint32_t seq, uint32_t len, const union tsopt ts)
718{
719	int32_t rc;
720
721	/* RFC 1323 4.2.1 R2 */
722	rc = check_seqn(tcb, seq, len);
723	if (rc < 0)
724		return rc;
725
726	if (ts.raw != 0) {
727
728		/* RFC 1323 4.2.1 R1 */
729		if (tcp_seq_lt(ts.val, tcb->rcv.ts))
730			return -ERANGE;
731
732		/* RFC 1323 4.2.1 R3 */
733		if (tcp_seq_leq(seq, tcb->snd.ack) &&
734				tcp_seq_lt(tcb->snd.ack, seq + len))
735			tcb->rcv.ts = ts.val;
736	}
737
738	return rc;
739}
740
741static inline int
742rx_check_ack(const struct tcb *tcb, uint32_t ack)
743{
744	uint32_t max;
745
746	max = (uint32_t)RTE_MAX(tcb->snd.nxt, tcb->snd.rcvr);
747
748	if (tcp_seq_leq(tcb->snd.una, ack) && tcp_seq_leq(ack, max))
749		return 0;
750
751	return -ERANGE;
752}
753
754static inline int
755rx_check_seqack(struct tcb *tcb, uint32_t seq, uint32_t ack, uint32_t len,
756	const union tsopt ts)
757{
758	int32_t rc;
759
760	rc = rx_check_seq(tcb, seq, len, ts);
761	rc |= rx_check_ack(tcb, ack);
762	return rc;
763}
764
765static inline int
766restore_syn_opt(struct syn_opts *so, const union pkt_info *pi,
767	const union seg_info *si, uint32_t ts, const struct rte_mbuf *mb,
768	uint32_t hash_alg, rte_xmm_t *secret_key)
769{
770	int32_t rc;
771	uint32_t len;
772	const struct tcp_hdr *th;
773
774	/* check that ACK, etc fields are what we expected. */
775	rc = sync_check_ack(pi, si->seq, si->ack - 1, ts,
776				hash_alg,
777				secret_key);
778	if (rc < 0)
779		return rc;
780
781	so->mss = rc;
782
783	th = rte_pktmbuf_mtod_offset(mb, const struct tcp_hdr *,
784		mb->l2_len + mb->l3_len);
785	len = mb->l4_len - sizeof(*th);
786	sync_get_opts(so, (uintptr_t)(th + 1), len);
787	return 0;
788}
789
790static inline void
791stream_term(struct tle_tcp_stream *s)
792{
793	struct sdr *dr;
794
795	s->tcb.state = TCP_ST_CLOSED;
796	rte_smp_wmb();
797
798	timer_stop(s);
799
800	/* close() was already invoked, schedule final cleanup */
801	if ((s->tcb.uop & TCP_OP_CLOSE) != 0) {
802
803		dr = CTX_TCP_SDR(s->s.ctx);
804		STAILQ_INSERT_TAIL(&dr->be, &s->s, link);
805
806	/* notify user that stream need to be closed */
807	} else if (s->err.ev != NULL)
808		tle_event_raise(s->err.ev);
809	else if (s->err.cb.func != NULL)
810		s->err.cb.func(s->err.cb.data, &s->s);
811}
812
813static inline int
814stream_fill_dest(struct tle_tcp_stream *s)
815{
816	int32_t rc;
817	const void *da;
818
819	if (s->s.type == TLE_V4)
820		da = &s->s.ipv4.addr.src;
821	else
822		da = &s->s.ipv6.addr.src;
823
824	rc = stream_get_dest(&s->s, da, &s->tx.dst);
825	return (rc < 0) ? rc : 0;
826}
827
828/*
829 * helper function, prepares a new accept stream.
830 */
831static inline int
832accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st,
833	struct tle_tcp_stream *cs, const struct syn_opts *so,
834	uint32_t tms, const union pkt_info *pi, const union seg_info *si)
835{
836	int32_t rc;
837	uint32_t rtt;
838
839	/* some TX still pending for that stream. */
840	if (TCP_STREAM_TX_PENDING(cs))
841		return -EAGAIN;
842
843	/* setup L4 ports and L3 addresses fields. */
844	cs->s.port.raw = pi->port.raw;
845	cs->s.pmsk.raw = UINT32_MAX;
846
847	if (pi->tf.type == TLE_V4) {
848		cs->s.ipv4.addr = pi->addr4;
849		cs->s.ipv4.mask.src = INADDR_NONE;
850		cs->s.ipv4.mask.dst = INADDR_NONE;
851	} else if (pi->tf.type == TLE_V6) {
852		cs->s.ipv6.addr = *pi->addr6;
853		rte_memcpy(&cs->s.ipv6.mask.src, &tle_ipv6_none,
854			sizeof(cs->s.ipv6.mask.src));
855		rte_memcpy(&cs->s.ipv6.mask.dst, &tle_ipv6_none,
856			sizeof(cs->s.ipv6.mask.dst));
857	}
858
859	/* setup TCB */
860	sync_fill_tcb(&cs->tcb, si, so);
861	cs->tcb.rcv.wnd = calc_rx_wnd(cs, cs->tcb.rcv.wscale);
862
863	/*
864	 * estimate the rto
865	 * for now rtt is calculated based on the tcp TMS option,
866	 * later add real-time one
867	 */
868	if (cs->tcb.so.ts.ecr) {
869		rtt = tms - cs->tcb.so.ts.ecr;
870		rto_estimate(&cs->tcb, rtt);
871	} else
872		cs->tcb.snd.rto = TCP_RTO_DEFAULT;
873
874	/* copy streams type. */
875	cs->s.type = ps->s.type;
876
877	/* retrive and cache destination information. */
878	rc = stream_fill_dest(cs);
879	if (rc != 0)
880		return rc;
881
882	/* update snd.mss with SMSS value */
883	cs->tcb.snd.mss = calc_smss(cs->tcb.snd.mss, &cs->tx.dst);
884
885	/* setup congestion variables */
886	cs->tcb.snd.cwnd = initial_cwnd(cs->tcb.snd.mss);
887	cs->tcb.snd.ssthresh = cs->tcb.snd.wnd;
888
889	cs->tcb.state = TCP_ST_ESTABLISHED;
890
891	/* add stream to the table */
892	cs->ste = stbl_add_stream(st, pi, cs);
893	if (cs->ste == NULL)
894		return -ENOBUFS;
895
896	cs->tcb.uop |= TCP_OP_ACCEPT;
897	tcp_stream_up(cs);
898	return 0;
899}
900
901
902/*
903 * ACK for new connection request arrived.
904 * Check that the packet meets all conditions and try to open a new stream.
905 * returns:
906 * < 0  - invalid packet
907 * == 0 - packet is valid and new stream was opened for it.
908 * > 0  - packet is valid, but failed to open new stream.
909 */
910static inline int
911rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st,
912	const union pkt_info *pi, const union seg_info *si,
913	uint32_t tms, struct rte_mbuf *mb, struct tle_tcp_stream **csp)
914{
915	int32_t rc;
916	struct tle_ctx *ctx;
917	struct tle_stream *ts;
918	struct tle_tcp_stream *cs;
919	struct syn_opts so;
920
921	*csp = NULL;
922
923	if (pi->tf.flags != TCP_FLAG_ACK || rx_check_stream(s, pi) != 0)
924		return -EINVAL;
925
926	ctx = s->s.ctx;
927	rc = restore_syn_opt(&so, pi, si, tms, mb, ctx->prm.hash_alg,
928				&ctx->prm.secret_key);
929	if (rc < 0)
930		return rc;
931
932	/* allocate new stream */
933	ts = get_stream(ctx);
934	cs = TCP_STREAM(ts);
935	if (ts == NULL)
936		return ENFILE;
937
938	/* prepare stream to handle new connection */
939	if (accept_prep_stream(s, st, cs, &so, tms, pi, si) == 0) {
940
941		/* put new stream in the accept queue */
942		if (rte_ring_enqueue_burst(s->rx.q,
943				(void * const *)&ts, 1) == 1) {
944			*csp = cs;
945			return 0;
946		}
947
948		/* cleanup on failure */
949		tcp_stream_down(cs);
950		stbl_del_pkt(st, cs->ste, pi);
951		cs->ste = NULL;
952	}
953
954	tcp_stream_reset(ctx, cs);
955	return ENOBUFS;
956}
957
958static inline int
959data_pkt_adjust(const struct tcb *tcb, struct rte_mbuf *mb, uint32_t hlen,
960	uint32_t *seqn, uint32_t *plen)
961{
962	uint32_t len, n, seq;
963
964	seq = *seqn;
965	len = *plen;
966
967	rte_pktmbuf_adj(mb, hlen);
968	if (len == 0)
969		return -ENODATA;
970	/* cut off the start of the packet */
971	else if (tcp_seq_lt(seq, tcb->rcv.nxt)) {
972		n = tcb->rcv.nxt - seq;
973		if (n >= len)
974			return -ENODATA;
975
976		rte_pktmbuf_adj(mb, n);
977		*seqn = seq + n;
978		*plen = len - n;
979	}
980
981	return 0;
982}
983
984static inline uint32_t
985rx_ackdata(struct tle_tcp_stream *s, uint32_t ack)
986{
987	uint32_t k, n;
988
989	n = ack - (uint32_t)s->tcb.snd.una;
990
991	/* some more data was acked. */
992	if (n != 0) {
993
994		/* advance SND.UNA and free related packets. */
995		k = rte_ring_free_count(s->tx.q);
996		free_una_data(s, n);
997
998		/* mark the stream as available for writing */
999		if (rte_ring_free_count(s->tx.q) != 0) {
1000			if (s->tx.ev != NULL)
1001				tle_event_raise(s->tx.ev);
1002			else if (k == 0 && s->tx.cb.func != NULL)
1003				s->tx.cb.func(s->tx.cb.data, &s->s);
1004		}
1005	}
1006
1007	return n;
1008}
1009
1010static void
1011rx_fin_state(struct tle_tcp_stream *s, struct resp_info *rsp)
1012{
1013	uint32_t state;
1014	int32_t ackfin;
1015
1016	s->tcb.rcv.nxt += 1;
1017
1018	ackfin = (s->tcb.snd.una == s->tcb.snd.fss);
1019	state = s->tcb.state;
1020
1021	if (state == TCP_ST_ESTABLISHED) {
1022		s->tcb.state = TCP_ST_CLOSE_WAIT;
1023		/* raise err.ev & err.cb */
1024		if (s->err.ev != NULL)
1025			tle_event_raise(s->err.ev);
1026		else if (s->err.cb.func != NULL)
1027			s->err.cb.func(s->err.cb.data, &s->s);
1028	} else if (state == TCP_ST_FIN_WAIT_1 || state == TCP_ST_CLOSING) {
1029		rsp->flags |= TCP_FLAG_ACK;
1030		if (ackfin != 0) {
1031			s->tcb.state = TCP_ST_TIME_WAIT;
1032			s->tcb.snd.rto = TCP_RTO_2MSL;
1033			timer_reset(s);
1034		} else
1035			s->tcb.state = TCP_ST_CLOSING;
1036	} else if (state == TCP_ST_FIN_WAIT_2) {
1037		rsp->flags |= TCP_FLAG_ACK;
1038		s->tcb.state = TCP_ST_TIME_WAIT;
1039		s->tcb.snd.rto = TCP_RTO_2MSL;
1040		timer_reset(s);
1041	} else if (state == TCP_ST_LAST_ACK && ackfin != 0) {
1042		stream_term(s);
1043	}
1044}
1045
1046/*
1047 * FIN process for ESTABLISHED state
1048 * returns:
1049 * 0 < - error occurred
1050 * 0 - FIN was processed OK, and mbuf can be free/reused.
1051 * 0 > - FIN was processed OK and mbuf can't be free/reused.
1052 */
1053static inline int
1054rx_fin(struct tle_tcp_stream *s, uint32_t state,
1055	const union seg_info *si, struct rte_mbuf *mb,
1056	struct resp_info *rsp)
1057{
1058	uint32_t hlen, plen, seq;
1059	int32_t ret;
1060	union tsopt ts;
1061
1062	hlen = PKT_L234_HLEN(mb);
1063	plen = mb->pkt_len - hlen;
1064	seq = si->seq;
1065
1066	ts = rx_tms_opt(&s->tcb, mb);
1067	ret = rx_check_seqack(&s->tcb, seq, si->ack, plen, ts);
1068	if (ret != 0)
1069		return ret;
1070
1071	if (state < TCP_ST_ESTABLISHED)
1072		return -EINVAL;
1073
1074	if (plen != 0) {
1075
1076		ret = data_pkt_adjust(&s->tcb, mb, hlen, &seq, &plen);
1077		if (ret != 0)
1078			return ret;
1079		if (rx_data_enqueue(s, seq, plen, &mb, 1) != 1)
1080			return -ENOBUFS;
1081	}
1082
1083	/* process ack here */
1084	rx_ackdata(s, si->ack);
1085
1086	/* some fragments still missing */
1087	if (seq + plen != s->tcb.rcv.nxt) {
1088		s->tcb.rcv.frs.seq = seq + plen;
1089		s->tcb.rcv.frs.on = 1;
1090	} else
1091		rx_fin_state(s, rsp);
1092
1093	return plen;
1094}
1095
1096static inline int
1097rx_rst(struct tle_tcp_stream *s, uint32_t state, uint32_t flags,
1098	const union seg_info *si)
1099{
1100	int32_t rc;
1101
1102	/*
1103	 * RFC 793: In all states except SYN-SENT, all reset (RST) segments
1104	 * are validated by checking their SEQ-fields.
1105	 * A reset is valid if its sequence number is in the window.
1106	 * In the SYN-SENT state (a RST received in response to an initial SYN),
1107	 * the RST is acceptable if the ACK field acknowledges the SYN.
1108	 */
1109	if (state == TCP_ST_SYN_SENT) {
1110		rc = ((flags & TCP_FLAG_ACK) == 0 ||
1111				si->ack != s->tcb.snd.nxt) ?
1112			-ERANGE : 0;
1113	}
1114
1115	else
1116		rc = check_seqn(&s->tcb, si->seq, 0);
1117
1118	if (rc == 0)
1119		stream_term(s);
1120
1121	return rc;
1122}
1123
1124/*
1125 *  check do we have FIN  that was received out-of-order.
1126 *  if yes, try to process it now.
1127 */
1128static inline void
1129rx_ofo_fin(struct tle_tcp_stream *s, struct resp_info *rsp)
1130{
1131	if (s->tcb.rcv.frs.on != 0 && s->tcb.rcv.nxt == s->tcb.rcv.frs.seq)
1132		rx_fin_state(s, rsp);
1133}
1134
1135static inline void
1136dack_info_init(struct dack_info *tack, const struct tcb *tcb)
1137{
1138	memset(tack, 0, sizeof(*tack));
1139	tack->ack = tcb->snd.una;
1140	tack->segs.dup = tcb->rcv.dupack;
1141	tack->wu.raw = tcb->snd.wu.raw;
1142	tack->wnd = tcb->snd.wnd >> tcb->snd.wscale;
1143}
1144
1145static inline void
1146ack_window_update(struct tcb *tcb, const struct dack_info *tack)
1147{
1148	tcb->snd.wu.raw = tack->wu.raw;
1149	tcb->snd.wnd = tack->wnd << tcb->snd.wscale;
1150}
1151
1152static inline void
1153ack_cwnd_update(struct tcb *tcb, uint32_t acked, const struct dack_info *tack)
1154{
1155	uint32_t n;
1156
1157	n = tack->segs.ack * tcb->snd.mss;
1158
1159	/* slow start phase, RFC 5681 3.1 (2)  */
1160	if (tcb->snd.cwnd < tcb->snd.ssthresh)
1161		tcb->snd.cwnd += RTE_MIN(acked, n);
1162	/* congestion avoidance phase, RFC 5681 3.1 (3) */
1163	else
1164		tcb->snd.cwnd += RTE_MAX(1U, n * tcb->snd.mss / tcb->snd.cwnd);
1165}
1166
1167static inline void
1168rto_ssthresh_update(struct tcb *tcb)
1169{
1170	uint32_t k, n;
1171
1172	/* RFC 5681 3.1 (4)  */
1173	n = (tcb->snd.nxt - tcb->snd.una) / 2;
1174	k = 2 * tcb->snd.mss;
1175	tcb->snd.ssthresh = RTE_MAX(n, k);
1176}
1177
1178static inline void
1179rto_cwnd_update(struct tcb *tcb)
1180{
1181
1182	if (tcb->snd.nb_retx == 0)
1183		rto_ssthresh_update(tcb);
1184
1185	/*
1186	 * RFC 5681 3.1: upon a timeout cwnd MUST be set to
1187	 * no more than 1 full-sized segment.
1188	 */
1189	tcb->snd.cwnd = tcb->snd.mss;
1190}
1191
1192static inline void
1193ack_info_update(struct dack_info *tack, const union seg_info *si,
1194	int32_t badseq, uint32_t dlen, const union tsopt ts)
1195{
1196	if (badseq != 0) {
1197		tack->segs.badseq++;
1198		return;
1199	}
1200
1201	/* segnt with incoming data */
1202	tack->segs.data += (dlen != 0);
1203
1204	/* segment with newly acked data */
1205	if (tcp_seq_lt(tack->ack, si->ack)) {
1206		tack->segs.dup = 0;
1207		tack->segs.ack++;
1208		tack->ack = si->ack;
1209		tack->ts = ts;
1210
1211	/*
1212	 * RFC 5681: An acknowledgment is considered a "duplicate" when:
1213	 * (a) the receiver of the ACK has outstanding data
1214	 * (b) the incoming acknowledgment carries no data
1215	 * (c) the SYN and FIN bits are both off
1216	 * (d) the acknowledgment number is equal to the TCP.UNA
1217	 * (e) the advertised window in the incoming acknowledgment equals the
1218	 * advertised window in the last incoming acknowledgment.
1219	 *
1220	 * Here will have only to check only for (b),(d),(e).
1221	 * (a) will be checked later for the whole bulk of packets,
1222	 * (c) should never happen here.
1223	 */
1224	} else if (dlen == 0 && si->wnd == tack->wnd && ++tack->segs.dup == 3) {
1225		tack->dup3.seg = tack->segs.ack + 1;
1226		tack->dup3.ack = tack->ack;
1227	}
1228
1229	/*
1230	 * RFC 793:
1231	 * If SND.UNA < SEG.ACK =< SND.NXT, the send window should be
1232	 * updated.  If (SND.WL1 < SEG.SEQ or (SND.WL1 = SEG.SEQ and
1233	 * SND.WL2 =< SEG.ACK)), set SND.WND <- SEG.WND, set
1234	 * SND.WL1 <- SEG.SEQ, and set SND.WL2 <- SEG.ACK.
1235	 */
1236	if (tcp_seq_lt(tack->wu.wl1, si->seq) ||
1237			(si->seq == tack->wu.wl1 &&
1238			tcp_seq_leq(tack->wu.wl2, si->ack))) {
1239
1240		tack->wu.wl1 = si->seq;
1241		tack->wu.wl2 = si->ack;
1242		tack->wnd = si->wnd;
1243	}
1244}
1245
1246static inline uint32_t
1247rx_data_ack(struct tle_tcp_stream *s, struct dack_info *tack,
1248	const union seg_info si[], struct rte_mbuf *mb[], struct rte_mbuf *rp[],
1249	int32_t rc[], uint32_t num)
1250{
1251	uint32_t i, j, k, n, t;
1252	uint32_t hlen, plen, seq, tlen;
1253	int32_t ret;
1254	union tsopt ts;
1255
1256	k = 0;
1257	for (i = 0; i != num; i = j) {
1258
1259		hlen = PKT_L234_HLEN(mb[i]);
1260		plen = mb[i]->pkt_len - hlen;
1261		seq = si[i].seq;
1262
1263		ts = rx_tms_opt(&s->tcb, mb[i]);
1264		ret = rx_check_seqack(&s->tcb, seq, si[i].ack, plen, ts);
1265
1266		/* account segment received */
1267		ack_info_update(tack, &si[i], ret != 0, plen, ts);
1268
1269		if (ret == 0) {
1270			/* skip duplicate data, if any */
1271			ret = data_pkt_adjust(&s->tcb, mb[i], hlen,
1272				&seq, &plen);
1273		}
1274
1275		j = i + 1;
1276		if (ret != 0) {
1277			rp[k] = mb[i];
1278			rc[k] = -ret;
1279			k++;
1280			continue;
1281		}
1282
1283		/* group sequential packets together. */
1284		for (tlen = plen; j != num; tlen += plen, j++) {
1285
1286			hlen = PKT_L234_HLEN(mb[j]);
1287			plen = mb[j]->pkt_len - hlen;
1288
1289			/* not consecutive packet */
1290			if (plen == 0 || seq + tlen != si[j].seq)
1291				break;
1292
1293			/* check SEQ/ACK */
1294			ts = rx_tms_opt(&s->tcb, mb[j]);
1295			ret = rx_check_seqack(&s->tcb, si[j].seq, si[j].ack,
1296				plen, ts);
1297
1298			/* account for segment received */
1299			ack_info_update(tack, &si[j], ret != 0, plen, ts);
1300
1301			if (ret != 0) {
1302				rp[k] = mb[j];
1303				rc[k] = -ret;
1304				k++;
1305				break;
1306			}
1307			rte_pktmbuf_adj(mb[j], hlen);
1308		}
1309
1310		n = j - i;
1311		j += (ret != 0);
1312
1313		/* account for OFO data */
1314		if (seq != s->tcb.rcv.nxt)
1315			tack->segs.ofo += n;
1316
1317		/* enqueue packets */
1318		t = rx_data_enqueue(s, seq, tlen, mb + i, n);
1319
1320		/* if we are out of space in stream recv buffer. */
1321		for (; t != n; t++) {
1322			rp[k] = mb[i + t];
1323			rc[k] = -ENOBUFS;
1324			k++;
1325		}
1326	}
1327
1328	return num - k;
1329}
1330
1331static inline void
1332start_fast_retransmit(struct tle_tcp_stream *s)
1333{
1334	struct tcb *tcb;
1335
1336	tcb = &s->tcb;
1337
1338	/* RFC 6582 3.2.2 */
1339	tcb->snd.rcvr = tcb->snd.nxt;
1340	tcb->snd.fastack = 1;
1341
1342	/* RFC 5681 3.2.2 */
1343	rto_ssthresh_update(tcb);
1344
1345	/* RFC 5681 3.2.3 */
1346	tcp_txq_rst_nxt_head(s);
1347	tcb->snd.nxt = tcb->snd.una;
1348	tcb->snd.cwnd = tcb->snd.ssthresh + 3 * tcb->snd.mss;
1349}
1350
1351static inline void
1352stop_fast_retransmit(struct tle_tcp_stream *s)
1353{
1354	struct tcb *tcb;
1355	uint32_t n;
1356
1357	tcb = &s->tcb;
1358	n = tcb->snd.nxt - tcb->snd.una;
1359	tcb->snd.cwnd = RTE_MIN(tcb->snd.ssthresh,
1360		RTE_MAX(n, tcb->snd.mss) + tcb->snd.mss);
1361	tcb->snd.fastack = 0;
1362}
1363
1364static inline int
1365in_fast_retransmit(struct tle_tcp_stream *s, uint32_t ack_len, uint32_t ack_num,
1366	uint32_t dup_num)
1367{
1368	uint32_t n;
1369	struct tcb *tcb;
1370
1371	tcb = &s->tcb;
1372
1373	/* RFC 5682 3.2.3 partial ACK */
1374	if (ack_len != 0) {
1375
1376		n = ack_num * tcb->snd.mss;
1377		if (ack_len >= n)
1378			tcb->snd.cwnd -= ack_len - n;
1379		else
1380			tcb->snd.cwnd -= ack_len % tcb->snd.mss;
1381
1382		/*
1383		 * For the first partial ACK that arrives
1384		 * during fast recovery, also reset the
1385		 * retransmit timer.
1386		 */
1387		if (tcb->snd.fastack == 1)
1388			timer_reset(s);
1389
1390		tcb->snd.fastack += ack_num;
1391		return 1;
1392
1393	/* RFC 5681 3.2.4 */
1394	} else if (dup_num > 3) {
1395		s->tcb.snd.cwnd += (dup_num - 3) * tcb->snd.mss;
1396		return 1;
1397	}
1398
1399	return 0;
1400}
1401
1402static inline int
1403process_ack(struct tle_tcp_stream *s, uint32_t acked,
1404	const struct dack_info *tack)
1405{
1406	int32_t send;
1407
1408	send = 0;
1409
1410	/* normal mode */
1411	if (s->tcb.snd.fastack == 0) {
1412
1413		send = 1;
1414
1415		/* RFC 6582 3.2.2 switch to fast retransmit mode */
1416		if (tack->dup3.seg != 0 && s->tcb.snd.una != s->tcb.snd.nxt &&
1417				s->tcb.snd.una >= s->tcb.snd.rcvr) {
1418
1419			start_fast_retransmit(s);
1420			in_fast_retransmit(s,
1421				tack->ack - tack->dup3.ack,
1422				tack->segs.ack - tack->dup3.seg - 1,
1423				tack->segs.dup);
1424
1425		/* remain in normal mode */
1426		} else if (acked != 0) {
1427			ack_cwnd_update(&s->tcb, acked, tack);
1428			timer_stop(s);
1429		}
1430
1431	/* fast retransmit mode */
1432	} else {
1433
1434		/* remain in fast retransmit mode */
1435		if (s->tcb.snd.una < s->tcb.snd.rcvr) {
1436
1437			send = in_fast_retransmit(s, acked, tack->segs.ack,
1438				tack->segs.dup);
1439		} else {
1440			/* RFC 5682 3.2.3 full ACK */
1441			stop_fast_retransmit(s);
1442			timer_stop(s);
1443
1444			/* if we have another series of dup ACKs */
1445			if (tack->dup3.seg != 0 &&
1446					s->tcb.snd.una != s->tcb.snd.nxt &&
1447					tcp_seq_leq((uint32_t)s->tcb.snd.rcvr,
1448					tack->dup3.ack)) {
1449
1450				/* restart fast retransmit again. */
1451				start_fast_retransmit(s);
1452				send = in_fast_retransmit(s,
1453					tack->ack - tack->dup3.ack,
1454					tack->segs.ack - tack->dup3.seg - 1,
1455					tack->segs.dup);
1456			}
1457		}
1458	}
1459
1460	return send;
1461}
1462
1463/*
1464 * our FIN was acked, stop rto timer, change stream state,
1465 * and possibly close the stream.
1466 */
1467static inline void
1468rx_ackfin(struct tle_tcp_stream *s)
1469{
1470	uint32_t state;
1471
1472	s->tcb.snd.una = s->tcb.snd.fss;
1473	empty_mbuf_ring(s->tx.q);
1474
1475	state = s->tcb.state;
1476	if (state == TCP_ST_LAST_ACK)
1477		stream_term(s);
1478	else if (state == TCP_ST_FIN_WAIT_1) {
1479		timer_stop(s);
1480		s->tcb.state = TCP_ST_FIN_WAIT_2;
1481	} else if (state == TCP_ST_CLOSING) {
1482		s->tcb.state = TCP_ST_TIME_WAIT;
1483		s->tcb.snd.rto = TCP_RTO_2MSL;
1484		timer_reset(s);
1485	}
1486}
1487
1488static inline void
1489rx_process_ack(struct tle_tcp_stream *s, uint32_t ts,
1490	const struct dack_info *tack)
1491{
1492	int32_t send;
1493	uint32_t n;
1494
1495	s->tcb.rcv.dupack = tack->segs.dup;
1496
1497	n = rx_ackdata(s, tack->ack);
1498	send = process_ack(s, n, tack);
1499
1500	/* try to send more data. */
1501	if ((n != 0 || send != 0) && tcp_txq_nxt_cnt(s) != 0)
1502		txs_enqueue(s->s.ctx, s);
1503
1504	/* restart RTO timer. */
1505	if (s->tcb.snd.nxt != s->tcb.snd.una)
1506		timer_start(s);
1507
1508	/* update rto, if fresh packet is here then calculate rtt */
1509	if (tack->ts.ecr != 0)
1510		rto_estimate(&s->tcb, ts - tack->ts.ecr);
1511}
1512
1513/*
1514 * process <SYN,ACK>
1515 * returns negative value on failure, or zero on success.
1516 */
1517static inline int
1518rx_synack(struct tle_tcp_stream *s, uint32_t ts, uint32_t state,
1519	const union seg_info *si, struct rte_mbuf *mb,
1520	struct resp_info *rsp)
1521{
1522	struct syn_opts so;
1523	struct tcp_hdr *th;
1524
1525	if (state != TCP_ST_SYN_SENT)
1526		return -EINVAL;
1527
1528	/* invalid SEG.SEQ */
1529	if (si->ack != (uint32_t)s->tcb.snd.nxt) {
1530		rsp->flags = TCP_FLAG_RST;
1531		return 0;
1532	}
1533
1534	th = rte_pktmbuf_mtod_offset(mb, struct tcp_hdr *,
1535		mb->l2_len + mb->l3_len);
1536	get_syn_opts(&so, (uintptr_t)(th + 1), mb->l4_len - sizeof(*th));
1537
1538	s->tcb.so = so;
1539
1540	s->tcb.snd.una = s->tcb.snd.nxt;
1541	s->tcb.snd.mss = so.mss;
1542	s->tcb.snd.wnd = si->wnd << so.wscale;
1543	s->tcb.snd.wu.wl1 = si->seq;
1544	s->tcb.snd.wu.wl2 = si->ack;
1545	s->tcb.snd.wscale = so.wscale;
1546
1547	/* setup congestion variables */
1548	s->tcb.snd.cwnd = initial_cwnd(s->tcb.snd.mss);
1549	s->tcb.snd.ssthresh = s->tcb.snd.wnd;
1550
1551	s->tcb.rcv.ts = so.ts.val;
1552	s->tcb.rcv.irs = si->seq;
1553	s->tcb.rcv.nxt = si->seq + 1;
1554
1555	/* if peer doesn't support WSCALE opt, recalculate RCV.WND */
1556	s->tcb.rcv.wscale = (so.wscale == TCP_WSCALE_NONE) ?
1557		TCP_WSCALE_NONE : TCP_WSCALE_DEFAULT;
1558	s->tcb.rcv.wnd = calc_rx_wnd(s, s->tcb.rcv.wscale);
1559
1560	/* calculate initial rto */
1561	rto_estimate(&s->tcb, ts - s->tcb.snd.ts);
1562
1563	rsp->flags |= TCP_FLAG_ACK;
1564
1565	timer_stop(s);
1566	s->tcb.state = TCP_ST_ESTABLISHED;
1567	rte_smp_wmb();
1568
1569	if (s->tx.ev != NULL)
1570		tle_event_raise(s->tx.ev);
1571	else if (s->tx.cb.func != NULL)
1572		s->tx.cb.func(s->tx.cb.data, &s->s);
1573
1574	return 0;
1575}
1576
1577static inline uint32_t
1578rx_stream(struct tle_tcp_stream *s, uint32_t ts,
1579	const union pkt_info *pi, const union seg_info si[],
1580	struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1581	uint32_t num)
1582{
1583	uint32_t i, k, n, state;
1584	int32_t ret;
1585	struct resp_info rsp;
1586	struct dack_info tack;
1587
1588	k = 0;
1589	rsp.flags = 0;
1590
1591	state = s->tcb.state;
1592
1593	/*
1594	 * first check for the states/flags where we don't
1595	 * expect groups of packets.
1596	 */
1597
1598	/* process RST */
1599	if ((pi->tf.flags & TCP_FLAG_RST) != 0) {
1600		for (i = 0;
1601				i != num &&
1602				rx_rst(s, state, pi->tf.flags, &si[i]);
1603				i++)
1604			;
1605		i = 0;
1606
1607	/* RFC 793: if the ACK bit is off drop the segment and return */
1608	} else if ((pi->tf.flags & TCP_FLAG_ACK) == 0) {
1609		i = 0;
1610	/*
1611	 * first check for the states/flags where we don't
1612	 * expect groups of packets.
1613	 */
1614
1615	/* process <SYN,ACK> */
1616	} else if ((pi->tf.flags & TCP_FLAG_SYN) != 0) {
1617		ret = 0;
1618		for (i = 0; i != num; i++) {
1619			ret = rx_synack(s, ts, state, &si[i], mb[i], &rsp);
1620			if (ret == 0)
1621				break;
1622
1623			rc[k] = -ret;
1624			rp[k] = mb[i];
1625			k++;
1626		}
1627
1628	/* process FIN */
1629	} else if ((pi->tf.flags & TCP_FLAG_FIN) != 0) {
1630		ret = 0;
1631		for (i = 0; i != num; i++) {
1632			ret = rx_fin(s, state, &si[i], mb[i], &rsp);
1633			if (ret >= 0)
1634				break;
1635
1636			rc[k] = -ret;
1637			rp[k] = mb[i];
1638			k++;
1639		}
1640		i += (ret > 0);
1641
1642	/* normal data/ack packets */
1643	} else if (state >= TCP_ST_ESTABLISHED && state <= TCP_ST_LAST_ACK) {
1644
1645		/* process incoming data packets. */
1646		dack_info_init(&tack, &s->tcb);
1647		n = rx_data_ack(s, &tack, si, mb, rp, rc, num);
1648
1649		/* follow up actions based on aggregated information */
1650
1651		/* update SND.WND */
1652		ack_window_update(&s->tcb, &tack);
1653
1654		/*
1655		 * fast-path: all data & FIN was already sent out
1656		 * and now is acknowledged.
1657		 */
1658		if (s->tcb.snd.fss == s->tcb.snd.nxt &&
1659				tack.ack == (uint32_t) s->tcb.snd.nxt)
1660			rx_ackfin(s);
1661		else
1662			rx_process_ack(s, ts, &tack);
1663
1664		/*
1665		 * send an immediate ACK if either:
1666		 * - received segment with invalid seq/ack number
1667		 * - received segment with OFO data
1668		 * - received segment with INO data and no TX is scheduled
1669		 *   for that stream.
1670		 */
1671		if (tack.segs.badseq != 0 || tack.segs.ofo != 0 ||
1672				(tack.segs.data != 0 &&
1673				rte_atomic32_read(&s->tx.arm) == 0))
1674			rsp.flags |= TCP_FLAG_ACK;
1675
1676		rx_ofo_fin(s, &rsp);
1677
1678		k += num - n;
1679		i = num;
1680
1681	/* unhandled state, drop all packets. */
1682	} else
1683		i = 0;
1684
1685	/* we have a response packet to send. */
1686	if (rsp.flags == TCP_FLAG_RST) {
1687		send_rst(s, si[i].ack);
1688		stream_term(s);
1689	} else if (rsp.flags != 0) {
1690		send_ack(s, ts, rsp.flags);
1691
1692		/* start the timer for FIN packet */
1693		if ((rsp.flags & TCP_FLAG_FIN) != 0)
1694			timer_reset(s);
1695	}
1696
1697	/* unprocessed packets */
1698	for (; i != num; i++, k++) {
1699		rc[k] = EINVAL;
1700		rp[k] = mb[i];
1701	}
1702
1703	return num - k;
1704}
1705
1706static inline uint32_t
1707rx_new_stream(struct tle_tcp_stream *s, uint32_t ts,
1708	const union pkt_info *pi, const union seg_info si[],
1709	struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1710	uint32_t num)
1711{
1712	uint32_t i;
1713
1714	if (rwl_acquire(&s->rx.use) > 0) {
1715		i = rx_stream(s, ts, pi, si, mb, rp, rc, num);
1716		rwl_release(&s->rx.use);
1717		return i;
1718	}
1719
1720	for (i = 0; i != num; i++) {
1721		rc[i] = ENOENT;
1722		rp[i] = mb[i];
1723	}
1724	return 0;
1725}
1726
1727static inline uint32_t
1728rx_postsyn(struct tle_dev *dev, struct stbl *st, uint32_t type, uint32_t ts,
1729	const union pkt_info pi[], const union seg_info si[],
1730	struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1731	uint32_t num)
1732{
1733	struct tle_tcp_stream *cs, *s;
1734	uint32_t i, k, n, state;
1735	int32_t ret;
1736
1737	s = rx_obtain_stream(dev, st, &pi[0], type);
1738	if (s == NULL) {
1739		for (i = 0; i != num; i++) {
1740			rc[i] = ENOENT;
1741			rp[i] = mb[i];
1742		}
1743		return 0;
1744	}
1745
1746	k = 0;
1747	state = s->tcb.state;
1748
1749	if (state == TCP_ST_LISTEN) {
1750
1751		/* one connection per flow */
1752		cs = NULL;
1753		ret = -EINVAL;
1754		for (i = 0; i != num; i++) {
1755
1756			ret = rx_ack_listen(s, st, pi, &si[i], ts, mb[i], &cs);
1757
1758			/* valid packet encountered */
1759			if (ret >= 0)
1760				break;
1761
1762			/* invalid packet, keep trying to find a proper one */
1763			rc[k] = -ret;
1764			rp[k] = mb[i];
1765			k++;
1766		}
1767
1768		/* packet is valid, but we are out of streams to serve it */
1769		if (ret > 0) {
1770			for (; i != num; i++, k++) {
1771				rc[k] = ret;
1772				rp[k] = mb[i];
1773			}
1774		/* new stream is accepted */
1775		} else if (ret == 0) {
1776
1777			/* inform listen stream about new connections */
1778			if (s->rx.ev != NULL)
1779				tle_event_raise(s->rx.ev);
1780			else if (s->rx.cb.func != NULL &&
1781					rte_ring_count(s->rx.q) == 1)
1782				s->rx.cb.func(s->rx.cb.data, &s->s);
1783
1784			/* if there is no data, drop current packet */
1785			if (PKT_L4_PLEN(mb[i]) == 0) {
1786				rc[k] = ENODATA;
1787				rp[k++] = mb[i++];
1788			}
1789
1790			/*  process remaining packets for that stream */
1791			if (num != i) {
1792				n = rx_new_stream(cs, ts, pi + i, si + i,
1793					mb + i, rp + k, rc + k, num - i);
1794				k += num - n - i;
1795			}
1796		}
1797
1798	} else {
1799		i = rx_stream(s, ts, pi, si, mb, rp, rc, num);
1800		k = num - i;
1801	}
1802
1803	rwl_release(&s->rx.use);
1804	return num - k;
1805}
1806
1807
1808static inline uint32_t
1809rx_syn(struct tle_dev *dev, uint32_t type, uint32_t ts,
1810	const union pkt_info pi[], const union seg_info si[],
1811	struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1812	uint32_t num)
1813{
1814	struct tle_tcp_stream *s;
1815	uint32_t i, k;
1816	int32_t ret;
1817
1818	s = rx_obtain_listen_stream(dev, &pi[0], type);
1819	if (s == NULL) {
1820		for (i = 0; i != num; i++) {
1821			rc[i] = ENOENT;
1822			rp[i] = mb[i];
1823		}
1824		return 0;
1825	}
1826
1827	k = 0;
1828	for (i = 0; i != num; i++) {
1829
1830		/* check that this remote is allowed to connect */
1831		if (rx_check_stream(s, &pi[i]) != 0)
1832			ret = -ENOENT;
1833		else
1834			/* syncokie: reply with <SYN,ACK> */
1835			ret = sync_ack(s, &pi[i], &si[i], ts, mb[i]);
1836
1837		if (ret != 0) {
1838			rc[k] = -ret;
1839			rp[k] = mb[i];
1840			k++;
1841		}
1842	}
1843
1844	rwl_release(&s->rx.use);
1845	return num - k;
1846}
1847
1848uint16_t
1849tle_tcp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[],
1850	struct rte_mbuf *rp[], int32_t rc[], uint16_t num)
1851{
1852	struct stbl *st;
1853	uint32_t i, j, k, n, t, ts;
1854	uint64_t csf;
1855	union pkt_info pi[num];
1856	union seg_info si[num];
1857	union {
1858		uint8_t t[TLE_VNUM];
1859		uint32_t raw;
1860	} stu;
1861
1862	ts = tcp_get_tms();
1863	st = CTX_TCP_STLB(dev->ctx);
1864
1865	stu.raw = 0;
1866
1867	/* extract packet info and check the L3/L4 csums */
1868	for (i = 0; i != num; i++) {
1869
1870		get_pkt_info(pkt[i], &pi[i], &si[i]);
1871
1872		t = pi[i].tf.type;
1873		csf = dev->rx.ol_flags[t] &
1874			(PKT_RX_IP_CKSUM_BAD | PKT_RX_L4_CKSUM_BAD);
1875
1876		/* check csums in SW */
1877		if (pi[i].csf == 0 && csf != 0 && check_pkt_csum(pkt[i], csf,
1878				pi[i].tf.type, IPPROTO_TCP) != 0)
1879			pi[i].csf = csf;
1880
1881		stu.t[t] = 1;
1882	}
1883
1884	if (stu.t[TLE_V4] != 0)
1885		stbl_lock(st, TLE_V4);
1886	if (stu.t[TLE_V6] != 0)
1887		stbl_lock(st, TLE_V6);
1888
1889	k = 0;
1890	for (i = 0; i != num; i += j) {
1891
1892		t = pi[i].tf.type;
1893
1894		/*basic checks for incoming packet */
1895		if (t >= TLE_VNUM || pi[i].csf != 0 || dev->dp[t] == NULL) {
1896			rc[k] = EINVAL;
1897			rp[k] = pkt[i];
1898			j = 1;
1899			k++;
1900		/* process input SYN packets */
1901		} else if (pi[i].tf.flags == TCP_FLAG_SYN) {
1902			j = pkt_info_bulk_syneq(pi + i, num - i);
1903			n = rx_syn(dev, t, ts, pi + i, si + i, pkt + i,
1904				rp + k, rc + k, j);
1905			k += j - n;
1906		} else {
1907			j = pkt_info_bulk_eq(pi + i, num - i);
1908			n = rx_postsyn(dev, st, t, ts, pi + i, si + i, pkt + i,
1909				rp + k, rc + k, j);
1910			k += j - n;
1911		}
1912	}
1913
1914	if (stu.t[TLE_V4] != 0)
1915		stbl_unlock(st, TLE_V4);
1916	if (stu.t[TLE_V6] != 0)
1917		stbl_unlock(st, TLE_V6);
1918
1919	return num - k;
1920}
1921
1922uint16_t
1923tle_tcp_stream_accept(struct tle_stream *ts, struct tle_stream *rs[],
1924	uint32_t num)
1925{
1926	uint32_t n;
1927	struct tle_tcp_stream *s;
1928
1929	s = TCP_STREAM(ts);
1930	n = rte_ring_mc_dequeue_burst(s->rx.q, (void **)rs, num);
1931	if (n == 0)
1932		return 0;
1933
1934	/*
1935	 * if we still have packets to read,
1936	 * then rearm stream RX event.
1937	 */
1938	if (n == num && rte_ring_count(s->rx.q) != 0) {
1939		if (rwl_try_acquire(&s->rx.use) > 0 && s->rx.ev != NULL)
1940			tle_event_raise(s->rx.ev);
1941		rwl_release(&s->rx.use);
1942	}
1943
1944	return n;
1945}
1946
1947uint16_t
1948tle_tcp_tx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], uint16_t num)
1949{
1950	uint32_t i, j, k, n;
1951	struct tle_drb *drb[num];
1952	struct tle_tcp_stream *s;
1953
1954	/* extract packets from device TX queue. */
1955
1956	k = num;
1957	n = tle_dring_sc_dequeue(&dev->tx.dr, (const void **)(uintptr_t)pkt,
1958		num, drb, &k);
1959
1960	if (n == 0)
1961		return 0;
1962
1963	/* free empty drbs and notify related streams. */
1964
1965	for (i = 0; i != k; i = j) {
1966		s = drb[i]->udata;
1967		for (j = i + 1; j != k && s == drb[j]->udata; j++)
1968			;
1969		stream_drb_free(s, drb + i, j - i);
1970	}
1971
1972	return n;
1973}
1974
1975static inline void
1976stream_fill_pkt_info(const struct tle_tcp_stream *s, union pkt_info *pi)
1977{
1978	if (s->s.type == TLE_V4)
1979		pi->addr4 = s->s.ipv4.addr;
1980	else
1981		pi->addr6 = &s->s.ipv6.addr;
1982
1983	pi->port = s->s.port;
1984	pi->tf.type = s->s.type;
1985}
1986
1987static int
1988stream_fill_addr(struct tle_tcp_stream *s, const struct sockaddr *addr)
1989{
1990	const struct sockaddr_in *in4;
1991	const struct sockaddr_in6 *in6;
1992	const struct tle_dev_param *prm;
1993	int32_t rc;
1994
1995	rc = 0;
1996	s->s.pmsk.raw = UINT32_MAX;
1997
1998	/* setup L4 src ports and src address fields. */
1999	if (s->s.type == TLE_V4) {
2000		in4 = (const struct sockaddr_in *)addr;
2001		if (in4->sin_addr.s_addr == INADDR_ANY || in4->sin_port == 0)
2002			return -EINVAL;
2003
2004		s->s.port.src = in4->sin_port;
2005		s->s.ipv4.addr.src = in4->sin_addr.s_addr;
2006		s->s.ipv4.mask.src = INADDR_NONE;
2007		s->s.ipv4.mask.dst = INADDR_NONE;
2008
2009	} else if (s->s.type == TLE_V6) {
2010		in6 = (const struct sockaddr_in6 *)addr;
2011		if (memcmp(&in6->sin6_addr, &tle_ipv6_any,
2012				sizeof(tle_ipv6_any)) == 0 ||
2013				in6->sin6_port == 0)
2014			return -EINVAL;
2015
2016		s->s.port.src = in6->sin6_port;
2017		rte_memcpy(&s->s.ipv6.addr.src, &in6->sin6_addr,
2018			sizeof(s->s.ipv6.addr.src));
2019		rte_memcpy(&s->s.ipv6.mask.src, &tle_ipv6_none,
2020			sizeof(s->s.ipv6.mask.src));
2021		rte_memcpy(&s->s.ipv6.mask.dst, &tle_ipv6_none,
2022			sizeof(s->s.ipv6.mask.dst));
2023	}
2024
2025	/* setup the destination device. */
2026	rc = stream_fill_dest(s);
2027	if (rc != 0)
2028		return rc;
2029
2030	/* setup L4 dst address from device param */
2031	prm = &s->tx.dst.dev->prm;
2032	if (s->s.type == TLE_V4) {
2033		if (s->s.ipv4.addr.dst == INADDR_ANY)
2034			s->s.ipv4.addr.dst = prm->local_addr4.s_addr;
2035	} else if (memcmp(&s->s.ipv6.addr.dst, &tle_ipv6_any,
2036			sizeof(tle_ipv6_any)) == 0)
2037		memcpy(&s->s.ipv6.addr.dst, &prm->local_addr6,
2038			sizeof(s->s.ipv6.addr.dst));
2039
2040	return rc;
2041}
2042
2043static inline int
2044tx_syn(struct tle_tcp_stream *s, const struct sockaddr *addr)
2045{
2046	int32_t rc;
2047	uint32_t tms, seq;
2048	union pkt_info pi;
2049	struct stbl *st;
2050	struct stbl_entry *se;
2051
2052	/* fill stream address */
2053	rc = stream_fill_addr(s, addr);
2054	if (rc != 0)
2055		return rc;
2056
2057	/* fill pkt info to generate seq.*/
2058	stream_fill_pkt_info(s, &pi);
2059
2060	tms = tcp_get_tms();
2061	s->tcb.so.ts.val = tms;
2062	s->tcb.so.ts.ecr = 0;
2063	s->tcb.so.wscale = TCP_WSCALE_DEFAULT;
2064	s->tcb.so.mss = calc_smss(s->tx.dst.mtu, &s->tx.dst);
2065
2066	/* note that rcv.nxt is 0 here for sync_gen_seq.*/
2067	seq = sync_gen_seq(&pi, s->tcb.rcv.nxt, tms, s->tcb.so.mss,
2068				s->s.ctx->prm.hash_alg,
2069				&s->s.ctx->prm.secret_key);
2070	s->tcb.snd.iss = seq;
2071	s->tcb.snd.rcvr = seq;
2072	s->tcb.snd.una = seq;
2073	s->tcb.snd.nxt = seq + 1;
2074	s->tcb.snd.rto = TCP_RTO_DEFAULT;
2075	s->tcb.snd.ts = tms;
2076
2077	s->tcb.rcv.mss = s->tcb.so.mss;
2078	s->tcb.rcv.wscale = TCP_WSCALE_DEFAULT;
2079	s->tcb.rcv.wnd = calc_rx_wnd(s, s->tcb.rcv.wscale);
2080	s->tcb.rcv.ts = 0;
2081
2082	/* add the stream in stream table */
2083	st = CTX_TCP_STLB(s->s.ctx);
2084	se = stbl_add_stream_lock(st, s);
2085	if (se == NULL)
2086		return -ENOBUFS;
2087	s->ste = se;
2088
2089	/* put stream into the to-send queue */
2090	txs_enqueue(s->s.ctx, s);
2091
2092	return 0;
2093}
2094
2095int
2096tle_tcp_stream_connect(struct tle_stream *ts, const struct sockaddr *addr)
2097{
2098	struct tle_tcp_stream *s;
2099	uint32_t type;
2100	int32_t rc;
2101
2102	if (ts == NULL || addr == NULL)
2103		return -EINVAL;
2104
2105	s = TCP_STREAM(ts);
2106	type = s->s.type;
2107	if (type >= TLE_VNUM)
2108		return -EINVAL;
2109
2110	if (rwl_try_acquire(&s->tx.use) > 0) {
2111		rc = rte_atomic16_cmpset(&s->tcb.state, TCP_ST_CLOSED,
2112			TCP_ST_SYN_SENT);
2113		rc = (rc == 0) ? -EDEADLK : 0;
2114	} else
2115		rc = -EINVAL;
2116
2117	if (rc != 0) {
2118		rwl_release(&s->tx.use);
2119		return rc;
2120	}
2121
2122	/* fill stream, prepare and transmit syn pkt */
2123	s->tcb.uop |= TCP_OP_CONNECT;
2124	rc = tx_syn(s, addr);
2125	rwl_release(&s->tx.use);
2126
2127	/* error happened, do a cleanup */
2128	if (rc != 0)
2129		tle_tcp_stream_close(ts);
2130
2131	return rc;
2132}
2133
2134uint16_t
2135tle_tcp_stream_recv(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
2136{
2137	uint32_t n;
2138	struct tle_tcp_stream *s;
2139
2140	s = TCP_STREAM(ts);
2141	n = rte_ring_mc_dequeue_burst(s->rx.q, (void **)pkt, num);
2142	if (n == 0)
2143		return 0;
2144
2145	/*
2146	 * if we still have packets to read,
2147	 * then rearm stream RX event.
2148	 */
2149	if (n == num && rte_ring_count(s->rx.q) != 0) {
2150		if (rwl_try_acquire(&s->rx.use) > 0 && s->rx.ev != NULL)
2151			tle_event_raise(s->rx.ev);
2152		rwl_release(&s->rx.use);
2153	}
2154
2155	return n;
2156}
2157
2158static inline int32_t
2159tx_segments(struct tle_tcp_stream *s, uint64_t ol_flags,
2160	struct rte_mbuf *segs[], uint32_t num)
2161{
2162	uint32_t i;
2163	int32_t rc;
2164
2165	for (i = 0; i != num; i++) {
2166		/* Build L2/L3/L4 header */
2167		rc = tcp_fill_mbuf(segs[i], s, &s->tx.dst, ol_flags, s->s.port,
2168			0, TCP_FLAG_ACK, 0, 0);
2169		if (rc != 0) {
2170			free_segments(segs, num);
2171			break;
2172		}
2173	}
2174
2175	if (i == num) {
2176		/* queue packets for further transmission. */
2177		rc = rte_ring_mp_enqueue_bulk(s->tx.q, (void **)segs, num);
2178		if (rc != 0)
2179			free_segments(segs, num);
2180	}
2181
2182	return rc;
2183}
2184
2185uint16_t
2186tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
2187{
2188	uint32_t i, j, k, mss, n, state, type;
2189	int32_t rc;
2190	uint64_t ol_flags;
2191	struct tle_tcp_stream *s;
2192	struct tle_dev *dev;
2193	struct rte_mbuf *segs[TCP_MAX_PKT_SEG];
2194
2195	s = TCP_STREAM(ts);
2196
2197	/* mark stream as not closable. */
2198	if (rwl_acquire(&s->tx.use) < 0) {
2199		rte_errno = EAGAIN;
2200		return 0;
2201	}
2202
2203	state = s->tcb.state;
2204	if (state != TCP_ST_ESTABLISHED && state != TCP_ST_CLOSE_WAIT) {
2205		rte_errno = ENOTCONN;
2206		rwl_release(&s->tx.use);
2207		return 0;
2208	}
2209
2210	mss = s->tcb.snd.mss;
2211	dev = s->tx.dst.dev;
2212	type = s->s.type;
2213	ol_flags = dev->tx.ol_flags[type];
2214
2215	k = 0;
2216	rc = 0;
2217	while (k != num) {
2218		/* prepare and check for TX */
2219		for (i = k; i != num; i++) {
2220			if (pkt[i]->pkt_len > mss ||
2221					pkt[i]->nb_segs > TCP_MAX_PKT_SEG)
2222				break;
2223			rc = tcp_fill_mbuf(pkt[i], s, &s->tx.dst, ol_flags,
2224				s->s.port, 0, TCP_FLAG_ACK, 0, 0);
2225			if (rc != 0)
2226				break;
2227		}
2228
2229		if (i != k) {
2230			/* queue packets for further transmission. */
2231			n = rte_ring_mp_enqueue_burst(s->tx.q, (void **)pkt + k,
2232				(i - k));
2233			k += n;
2234
2235			/*
2236			 * for unsent, but already modified packets:
2237			 * remove pkt l2/l3 headers, restore ol_flags
2238			 */
2239			if (i != k) {
2240				ol_flags = ~dev->tx.ol_flags[type];
2241				for (j = k; j != i; j++) {
2242					rte_pktmbuf_adj(pkt[j], pkt[j]->l2_len +
2243						pkt[j]->l3_len +
2244						pkt[j]->l4_len);
2245					pkt[j]->ol_flags &= ol_flags;
2246				}
2247				break;
2248			}
2249		}
2250
2251		if (rc != 0) {
2252			rte_errno = -rc;
2253			break;
2254
2255		/* segment large packet and enqueue for sending */
2256		} else if (i != num) {
2257			/* segment the packet. */
2258			rc = tcp_segmentation(pkt[i], segs, RTE_DIM(segs),
2259				&s->tx.dst, mss);
2260			if (rc < 0) {
2261				rte_errno = -rc;
2262				break;
2263			}
2264
2265			rc = tx_segments(s, dev->tx.ol_flags[type], segs, rc);
2266			if (rc == 0) {
2267				/* free the large mbuf */
2268				rte_pktmbuf_free(pkt[i]);
2269				/* set the mbuf as consumed */
2270				k++;
2271			} else
2272				/* no space left in tx queue */
2273				break;
2274		}
2275	}
2276
2277	/* notify BE about more data to send */
2278	if (k != 0)
2279		txs_enqueue(s->s.ctx, s);
2280	/* if possible, re-arm stream write event. */
2281	if (rte_ring_free_count(s->tx.q) != 0 && s->tx.ev != NULL)
2282		tle_event_raise(s->tx.ev);
2283
2284	rwl_release(&s->tx.use);
2285
2286	return k;
2287}
2288
2289/* send data and FIN (if needed) */
2290static inline void
2291tx_data_fin(struct tle_tcp_stream *s, uint32_t tms, uint32_t state)
2292{
2293	/* try to send some data */
2294	tx_nxt_data(s, tms);
2295
2296	/* we also have to send a FIN */
2297	if (state != TCP_ST_ESTABLISHED &&
2298			state != TCP_ST_CLOSE_WAIT &&
2299			tcp_txq_nxt_cnt(s) == 0 &&
2300			s->tcb.snd.fss != s->tcb.snd.nxt) {
2301		s->tcb.snd.fss = ++s->tcb.snd.nxt;
2302		send_ack(s, tms, TCP_FLAG_FIN | TCP_FLAG_ACK);
2303	}
2304}
2305
2306static inline void
2307tx_stream(struct tle_tcp_stream *s, uint32_t tms)
2308{
2309	uint32_t state;
2310
2311	state = s->tcb.state;
2312
2313	if (state == TCP_ST_SYN_SENT) {
2314		/* send the SYN, start the rto timer */
2315		send_ack(s, tms, TCP_FLAG_SYN);
2316		timer_start(s);
2317
2318	} else if (state >= TCP_ST_ESTABLISHED && state <= TCP_ST_LAST_ACK) {
2319
2320		tx_data_fin(s, tms, state);
2321
2322		/* start RTO timer. */
2323		if (s->tcb.snd.nxt != s->tcb.snd.una)
2324			timer_start(s);
2325	}
2326}
2327
2328static inline void
2329rto_stream(struct tle_tcp_stream *s, uint32_t tms)
2330{
2331	uint32_t state;
2332
2333	state = s->tcb.state;
2334
2335	TCP_LOG(DEBUG, "%s(%p, tms=%u): state=%u, "
2336		"retx=%u, retm=%u, "
2337		"rto=%u, snd.ts=%u, tmo=%u, "
2338		"snd.nxt=%lu, snd.una=%lu, flight_size=%lu, "
2339		"snd.rcvr=%lu, snd.fastack=%u, "
2340		"wnd=%u, cwnd=%u, ssthresh=%u, "
2341		"bytes sent=%lu, pkt remain=%u;\n",
2342		__func__, s, tms, s->tcb.state,
2343		s->tcb.snd.nb_retx, s->tcb.snd.nb_retm,
2344		s->tcb.snd.rto, s->tcb.snd.ts, tms - s->tcb.snd.ts,
2345		s->tcb.snd.nxt, s->tcb.snd.una, s->tcb.snd.nxt - s->tcb.snd.una,
2346		s->tcb.snd.rcvr, s->tcb.snd.fastack,
2347		s->tcb.snd.wnd, s->tcb.snd.cwnd, s->tcb.snd.ssthresh,
2348		s->tcb.snd.nxt - s->tcb.snd.iss, tcp_txq_nxt_cnt(s));
2349
2350	if (s->tcb.snd.nb_retx < s->tcb.snd.nb_retm) {
2351
2352		if (state >= TCP_ST_ESTABLISHED && state <= TCP_ST_LAST_ACK) {
2353
2354			/* update SND.CWD and SND.SSTHRESH */
2355			rto_cwnd_update(&s->tcb);
2356
2357			/* RFC 6582 3.2.4 */
2358			s->tcb.snd.rcvr = s->tcb.snd.nxt;
2359			s->tcb.snd.fastack = 0;
2360
2361			/* restart from last acked data */
2362			tcp_txq_rst_nxt_head(s);
2363			s->tcb.snd.nxt = s->tcb.snd.una;
2364
2365			tx_data_fin(s, tms, state);
2366
2367		} else if (state == TCP_ST_SYN_SENT) {
2368			/* resending SYN */
2369			s->tcb.so.ts.val = tms;
2370			send_ack(s, tms, TCP_FLAG_SYN);
2371
2372		} else if (state == TCP_ST_TIME_WAIT) {
2373			stream_term(s);
2374		}
2375
2376		/* RFC6298:5.5 back off the timer */
2377		s->tcb.snd.rto = rto_roundup(2 * s->tcb.snd.rto);
2378		s->tcb.snd.nb_retx++;
2379		timer_restart(s);
2380
2381	} else {
2382		send_rst(s, s->tcb.snd.una);
2383		stream_term(s);
2384	}
2385}
2386
2387int
2388tle_tcp_process(struct tle_ctx *ctx, uint32_t num)
2389{
2390	uint32_t i, k, tms;
2391	struct sdr *dr;
2392	struct tle_timer_wheel *tw;
2393	struct tle_stream *p;
2394	struct tle_tcp_stream *s, *rs[num];
2395
2396	/* process streams with RTO exipred */
2397
2398	tw = CTX_TCP_TMWHL(ctx);
2399	tms = tcp_get_tms();
2400	tle_timer_expire(tw, tms);
2401
2402	k = tle_timer_get_expired_bulk(tw, (void **)rs, RTE_DIM(rs));
2403
2404	for (i = 0; i != k; i++) {
2405
2406		s = rs[i];
2407		s->timer.handle = NULL;
2408		if (rwl_try_acquire(&s->tx.use) > 0)
2409			rto_stream(s, tms);
2410		rwl_release(&s->tx.use);
2411	}
2412
2413	/* process streams from to-send queue */
2414
2415	k = txs_dequeue_bulk(ctx, rs, RTE_DIM(rs));
2416
2417	for (i = 0; i != k; i++) {
2418
2419		s = rs[i];
2420		if (rwl_try_acquire(&s->tx.use) > 0 &&
2421				rte_atomic32_read(&s->tx.arm) > 0) {
2422			rte_atomic32_set(&s->tx.arm, 0);
2423			tx_stream(s, tms);
2424		}
2425		rwl_release(&s->tx.use);
2426	}
2427
2428	/* collect streams to close from the death row */
2429
2430	dr = CTX_TCP_SDR(ctx);
2431	for (k = 0, p = STAILQ_FIRST(&dr->be);
2432			k != num && p != NULL;
2433			k++, p = STAILQ_NEXT(p, link))
2434		rs[k] = TCP_STREAM(p);
2435
2436	if (p == NULL)
2437		STAILQ_INIT(&dr->be);
2438	else
2439		STAILQ_FIRST(&dr->be) = p;
2440
2441	/* cleanup closed streams */
2442	for (i = 0; i != k; i++) {
2443		s = rs[i];
2444		tcp_stream_down(s);
2445		tcp_stream_reset(ctx, s);
2446	}
2447
2448	return 0;
2449}
2450