tcp_rxtx.c revision aa97dd1c
1/*
2 * Copyright (c) 2016  Intel Corporation.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#include <rte_errno.h>
17#include <rte_ethdev.h>
18#include <rte_ip.h>
19#include <rte_ip_frag.h>
20#include <rte_tcp.h>
21
22#include "tcp_stream.h"
23#include "tcp_timer.h"
24#include "stream_table.h"
25#include "syncookie.h"
26#include "misc.h"
27#include "tcp_ctl.h"
28#include "tcp_rxq.h"
29#include "tcp_txq.h"
30
31#define	TCP_MAX_PKT_SEG	0x20
32
33/*
34 * checks if input TCP ports and IP addresses match given stream.
35 * returns zero on success.
36 */
37static inline int
38rx_check_stream(const struct tle_tcp_stream *s, const union pkt_info *pi)
39{
40	int32_t rc;
41
42	if (pi->tf.type == TLE_V4)
43		rc = (pi->port.raw & s->s.pmsk.raw) != s->s.port.raw ||
44			(pi->addr4.raw & s->s.ipv4.mask.raw) !=
45			s->s.ipv4.addr.raw;
46	else
47		rc = (pi->port.raw & s->s.pmsk.raw) != s->s.port.raw ||
48			ymm_mask_cmp(&pi->addr6->raw, &s->s.ipv6.addr.raw,
49			&s->s.ipv6.mask.raw) != 0;
50
51	return rc;
52}
53
54static inline struct tle_tcp_stream *
55rx_obtain_listen_stream(const struct tle_dev *dev, const union pkt_info *pi,
56	uint32_t type)
57{
58	struct tle_tcp_stream *s;
59
60	s = (struct tle_tcp_stream *)dev->dp[type]->streams[pi->port.dst];
61	if (s == NULL || rwl_acquire(&s->rx.use) < 0)
62		return NULL;
63
64	/* check that we have a proper stream. */
65	if (s->tcb.state != TCP_ST_LISTEN) {
66		rwl_release(&s->rx.use);
67		s = NULL;
68	}
69
70	return s;
71}
72
73static inline struct tle_tcp_stream *
74rx_obtain_stream(const struct tle_dev *dev, struct stbl *st,
75	const union pkt_info *pi, uint32_t type)
76{
77	struct tle_tcp_stream *s;
78
79	s = stbl_find_data(st, pi);
80	if (s == NULL) {
81		if (pi->tf.flags == TCP_FLAG_ACK)
82			return rx_obtain_listen_stream(dev, pi, type);
83		return NULL;
84	}
85
86	if (stbl_data_pkt(s) || rwl_acquire(&s->rx.use) < 0)
87		return NULL;
88	/* check that we have a proper stream. */
89	else if (s->tcb.state == TCP_ST_CLOSED) {
90		rwl_release(&s->rx.use);
91		s = NULL;
92	}
93
94	return s;
95}
96
97/*
98 * Consider 2 pkt_info *equal* if their:
99 * - types (IPv4/IPv6)
100 * - TCP flags
101 * - checksum flags
102 * - TCP src and dst ports
103 * - IP src and dst addresses
104 * are equal.
105 */
106static inline int
107pkt_info_bulk_eq(const union pkt_info pi[], uint32_t num)
108{
109	uint32_t i;
110
111	i = 1;
112
113	if (pi[0].tf.type == TLE_V4) {
114		while (i != num && xmm_cmp(&pi[0].raw, &pi[i].raw) == 0)
115			i++;
116
117	} else if (pi[0].tf.type == TLE_V6) {
118		while (i != num &&
119				pi[0].raw.u64[0] == pi[i].raw.u64[0] &&
120				ymm_cmp(&pi[0].addr6->raw,
121				&pi[i].addr6->raw) == 0)
122			i++;
123	}
124
125	return i;
126}
127
128static inline int
129pkt_info_bulk_syneq(const union pkt_info pi[], uint32_t num)
130{
131	uint32_t i;
132
133	i = 1;
134
135	if (pi[0].tf.type == TLE_V4) {
136		while (i != num && pi[0].tf.raw == pi[i].tf.raw &&
137				pi[0].port.dst == pi[i].port.dst &&
138				pi[0].addr4.dst == pi[i].addr4.dst)
139			i++;
140
141	} else if (pi[0].tf.type == TLE_V6) {
142		while (i != num && pi[0].tf.raw == pi[i].tf.raw &&
143				pi[0].port.dst == pi[i].port.dst &&
144				xmm_cmp(&pi[0].addr6->dst,
145				&pi[i].addr6->dst) == 0)
146			i++;
147	}
148
149	return i;
150}
151
152static inline void
153stream_drb_free(struct tle_tcp_stream *s, struct tle_drb *drbs[],
154	uint32_t nb_drb)
155{
156	rte_ring_enqueue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
157}
158
159static inline uint32_t
160stream_drb_alloc(struct tle_tcp_stream *s, struct tle_drb *drbs[],
161	uint32_t nb_drb)
162{
163	return rte_ring_dequeue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
164}
165
166static inline void
167fill_tcph(struct tcp_hdr *l4h, const struct tcb *tcb, union l4_ports port,
168	uint32_t seq, uint8_t hlen, uint8_t flags)
169{
170	uint16_t wnd;
171
172	l4h->src_port = port.dst;
173	l4h->dst_port = port.src;
174
175	wnd = (flags & TCP_FLAG_SYN) ?
176		RTE_MAX(TCP4_MIN_MSS, tcb->so.mss) :
177		tcb->rcv.wnd >> tcb->rcv.wscale;
178
179	/* ??? use sse shuffle to hton all remaining 16 bytes at once. ??? */
180	l4h->sent_seq = rte_cpu_to_be_32(seq);
181	l4h->recv_ack = rte_cpu_to_be_32(tcb->rcv.nxt);
182	l4h->data_off = hlen / TCP_DATA_ALIGN << TCP_DATA_OFFSET;
183	l4h->tcp_flags = flags;
184	l4h->rx_win = rte_cpu_to_be_16(wnd);
185	l4h->cksum = 0;
186	l4h->tcp_urp = 0;
187
188	if (flags & TCP_FLAG_SYN)
189		fill_syn_opts(l4h + 1, &tcb->so);
190	else if ((flags & TCP_FLAG_RST) == 0 && tcb->so.ts.raw != 0)
191		fill_tms_opts(l4h + 1, tcb->snd.ts, tcb->rcv.ts);
192}
193
194static inline int
195tcp_fill_mbuf(struct rte_mbuf *m, const struct tle_tcp_stream *s,
196	const struct tle_dest *dst, uint64_t ol_flags,
197	union l4_ports port, uint32_t seq, uint32_t flags,
198	uint32_t pid, uint32_t swcsm)
199{
200	uint32_t l4, len, plen;
201	struct tcp_hdr *l4h;
202	char *l2h;
203
204	len = dst->l2_len + dst->l3_len;
205	plen = m->pkt_len;
206
207	if (flags & TCP_FLAG_SYN)
208		l4 = sizeof(*l4h) + TCP_TX_OPT_LEN_MAX;
209	else if ((flags & TCP_FLAG_RST) == 0 && s->tcb.rcv.ts != 0)
210		l4 = sizeof(*l4h) + TCP_TX_OPT_LEN_TMS;
211	else
212		l4 = sizeof(*l4h);
213
214	/* adjust mbuf to put L2/L3/L4 headers into it. */
215	l2h = rte_pktmbuf_prepend(m, len + l4);
216	if (l2h == NULL)
217		return -EINVAL;
218
219	/* copy L2/L3 header */
220	rte_memcpy(l2h, dst->hdr, len);
221
222	/* setup TCP header & options */
223	l4h = (struct tcp_hdr *)(l2h + len);
224	fill_tcph(l4h, &s->tcb, port, seq, l4, flags);
225
226	/* setup mbuf TX offload related fields. */
227	m->tx_offload = _mbuf_tx_offload(dst->l2_len, dst->l3_len, l4, 0, 0, 0);
228	m->ol_flags |= ol_flags;
229
230	/* update proto specific fields. */
231
232	if (s->s.type == TLE_V4) {
233		struct ipv4_hdr *l3h;
234		l3h = (struct ipv4_hdr *)(l2h + dst->l2_len);
235		l3h->packet_id = rte_cpu_to_be_16(pid);
236		l3h->total_length = rte_cpu_to_be_16(plen + dst->l3_len + l4);
237
238		if ((ol_flags & PKT_TX_TCP_CKSUM) != 0)
239			l4h->cksum = _ipv4x_phdr_cksum(l3h, m->l3_len,
240				ol_flags);
241		else if (swcsm != 0)
242			l4h->cksum = _ipv4_udptcp_mbuf_cksum(m, len, l3h);
243
244		if ((ol_flags & PKT_TX_IP_CKSUM) == 0 && swcsm != 0)
245			l3h->hdr_checksum = _ipv4x_cksum(l3h, m->l3_len);
246	} else {
247		struct ipv6_hdr *l3h;
248		l3h = (struct ipv6_hdr *)(l2h + dst->l2_len);
249		l3h->payload_len = rte_cpu_to_be_16(plen + l4);
250		if ((ol_flags & PKT_TX_TCP_CKSUM) != 0)
251			l4h->cksum = rte_ipv6_phdr_cksum(l3h, ol_flags);
252		else if (swcsm != 0)
253			l4h->cksum = _ipv6_udptcp_mbuf_cksum(m, len, l3h);
254	}
255
256	return 0;
257}
258
259/*
260 * That function supposed to be used only for data packets.
261 * Assumes that L2/L3/L4 headers and mbuf fields already setup properly.
262 *  - updates tcp SEG.SEQ, SEG.ACK, TS.VAL, TS.ECR.
263 *  - if no HW cksum offloads are enabled, calculates TCP checksum.
264 */
265static inline void
266tcp_update_mbuf(struct rte_mbuf *m, uint32_t type, const struct tcb *tcb,
267	uint32_t seq, uint32_t pid)
268{
269	struct tcp_hdr *l4h;
270	uint32_t len;
271
272	len = m->l2_len + m->l3_len;
273	l4h = rte_pktmbuf_mtod_offset(m, struct tcp_hdr *, len);
274
275	l4h->sent_seq = rte_cpu_to_be_32(seq);
276	l4h->recv_ack = rte_cpu_to_be_32(tcb->rcv.nxt);
277
278	if (tcb->so.ts.raw != 0)
279		fill_tms_opts(l4h + 1, tcb->snd.ts, tcb->rcv.ts);
280
281	if (type == TLE_V4) {
282		struct ipv4_hdr *l3h;
283		l3h = rte_pktmbuf_mtod_offset(m, struct ipv4_hdr *, m->l2_len);
284		l3h->hdr_checksum = 0;
285		l3h->packet_id = rte_cpu_to_be_16(pid);
286		if ((m->ol_flags & PKT_TX_IP_CKSUM) == 0)
287			l3h->hdr_checksum = _ipv4x_cksum(l3h, m->l3_len);
288	}
289
290	/* have to calculate TCP checksum in SW */
291	if ((m->ol_flags & PKT_TX_TCP_CKSUM) == 0) {
292
293		l4h->cksum = 0;
294
295		if (type == TLE_V4) {
296			struct ipv4_hdr *l3h;
297			l3h = rte_pktmbuf_mtod_offset(m, struct ipv4_hdr *,
298				m->l2_len);
299			l4h->cksum = _ipv4_udptcp_mbuf_cksum(m, len, l3h);
300
301		} else {
302			struct ipv6_hdr *l3h;
303			l3h = rte_pktmbuf_mtod_offset(m, struct ipv6_hdr *,
304				m->l2_len);
305			l4h->cksum = _ipv6_udptcp_mbuf_cksum(m, len, l3h);
306		}
307	}
308}
309
310/* Send data packets that need to be ACK-ed by peer */
311static inline uint32_t
312tx_data_pkts(struct tle_tcp_stream *s, struct rte_mbuf *const m[], uint32_t num)
313{
314	uint32_t bsz, i, nb, nbm;
315	struct tle_dev *dev;
316	struct tle_drb *drb[num];
317
318	/* calculate how many drbs are needed.*/
319	bsz = s->tx.drb.nb_elem;
320	nbm = (num + bsz - 1) / bsz;
321
322	/* allocate drbs, adjust number of packets. */
323	nb = stream_drb_alloc(s, drb, nbm);
324
325	/* drb ring is empty. */
326	if (nb == 0)
327		return 0;
328
329	else if (nb != nbm)
330		num = nb * bsz;
331
332	dev = s->tx.dst.dev;
333
334	/* enqueue pkts for TX. */
335	nbm = nb;
336	i = tle_dring_mp_enqueue(&dev->tx.dr, (const void * const*)m,
337		num, drb, &nb);
338
339	/* free unused drbs. */
340	if (nb != 0)
341		stream_drb_free(s, drb + nbm - nb, nb);
342
343	return i;
344}
345
346static inline uint32_t
347tx_data_bulk(struct tle_tcp_stream *s, union seqlen *sl, struct rte_mbuf *mi[],
348	uint32_t num)
349{
350	uint32_t fail, i, k, n, mss, pid, plen, sz, tn, type;
351	struct tle_dev *dev;
352	struct rte_mbuf *mb;
353	struct rte_mbuf *mo[MAX_PKT_BURST + TCP_MAX_PKT_SEG];
354
355	mss = s->tcb.snd.mss;
356	type = s->s.type;
357
358	dev = s->tx.dst.dev;
359	pid = rte_atomic32_add_return(&dev->tx.packet_id[type], num) - num;
360
361	k = 0;
362	tn = 0;
363	fail = 0;
364	for (i = 0; i != num && sl->len != 0 && fail == 0; i++) {
365
366		mb = mi[i];
367		sz = RTE_MIN(sl->len, mss);
368		plen = PKT_L4_PLEN(mb);
369
370		/*fast path, no need to use indirect mbufs. */
371		if (plen <= sz) {
372
373			/* update pkt TCP header */
374			tcp_update_mbuf(mb, type, &s->tcb, sl->seq, pid + i);
375
376			/* keep mbuf till ACK is received. */
377			rte_pktmbuf_refcnt_update(mb, 1);
378			sl->len -= plen;
379			sl->seq += plen;
380			mo[k++] = mb;
381		/* remaining snd.wnd is less them MSS, send nothing */
382		} else if (sz < mss)
383			break;
384		/* packet indirection needed */
385		else
386			RTE_VERIFY(0);
387
388		if (k >= MAX_PKT_BURST) {
389			n = tx_data_pkts(s, mo, k);
390			fail = k - n;
391			tn += n;
392			k = 0;
393		}
394	}
395
396	if (k != 0) {
397		n = tx_data_pkts(s, mo, k);
398		fail = k - n;
399		tn += n;
400	}
401
402	if (fail != 0) {
403		sz = tcp_mbuf_seq_free(mo + n, fail);
404		sl->seq -= sz;
405		sl->len += sz;
406	}
407
408	return tn;
409}
410
411/*
412 * gets data from stream send buffer, updates it and
413 * queues it into TX device queue.
414 * Note that this function and is not MT safe.
415 */
416static inline uint32_t
417tx_nxt_data(struct tle_tcp_stream *s, uint32_t tms)
418{
419	uint32_t n, num, tn, wnd;
420	struct rte_mbuf **mi;
421	union seqlen sl;
422
423	tn = 0;
424	wnd = s->tcb.snd.wnd - (uint32_t)(s->tcb.snd.nxt - s->tcb.snd.una);
425	sl.seq = s->tcb.snd.nxt;
426	sl.len = RTE_MIN(wnd, s->tcb.snd.cwnd);
427
428	if (sl.len == 0)
429		return tn;
430
431	/* update send timestamp */
432	s->tcb.snd.ts = tms;
433
434	do {
435		/* get group of packets */
436		mi = tcp_txq_get_nxt_objs(s, &num);
437
438		/* stream send buffer is empty */
439		if (num == 0)
440			break;
441
442		/* queue data packets for TX */
443		n = tx_data_bulk(s, &sl, mi, num);
444		tn += n;
445
446		/* update consumer head */
447		tcp_txq_set_nxt_head(s, n);
448	} while (n == num);
449
450	s->tcb.snd.nxt += sl.seq - (uint32_t)s->tcb.snd.nxt;
451	return tn;
452}
453
454static inline void
455free_una_data(struct tle_tcp_stream *s, uint32_t len)
456{
457	uint32_t i, n, num, plen;
458	struct rte_mbuf **mi;
459
460	n = 0;
461	plen = 0;
462
463	do {
464		/* get group of packets */
465		mi = tcp_txq_get_una_objs(s, &num);
466
467		if (num == 0)
468			break;
469
470		/* free acked data */
471		for (i = 0; i != num && n != len; i++, n = plen) {
472			plen += PKT_L4_PLEN(mi[i]);
473			if (plen > len) {
474				/* keep SND.UNA at the start of the packet */
475				len -= RTE_MIN(len, plen - len);
476				break;
477			}
478			rte_pktmbuf_free(mi[i]);
479		}
480
481		/* update consumer tail */
482		tcp_txq_set_una_tail(s, i);
483	} while (plen < len);
484
485	s->tcb.snd.una += len;
486
487	/*
488	 * that could happen in case of retransmit,
489	 * adjust SND.NXT with SND.UNA.
490	 */
491	if (s->tcb.snd.una > s->tcb.snd.nxt) {
492		tcp_txq_rst_nxt_head(s);
493		s->tcb.snd.nxt = s->tcb.snd.una;
494	}
495}
496
497static inline uint16_t
498calc_smss(uint16_t mss, const struct tle_dest *dst)
499{
500	uint16_t n;
501
502	n = dst->mtu - dst->l2_len - dst->l3_len - TCP_TX_HDR_DACK;
503	mss = RTE_MIN(n, mss);
504	return mss;
505}
506
507/*
508 * RFC 5681 3.1
509 * If SMSS > 2190 bytes:
510 *     IW = 2 * SMSS bytes and MUST NOT be more than 2 segments
511 *  If (SMSS > 1095 bytes) and (SMSS <= 2190 bytes):
512 *     IW = 3 * SMSS bytes and MUST NOT be more than 3 segments
513 *  if SMSS <= 1095 bytes:
514 *     IW = 4 * SMSS bytes and MUST NOT be more than 4 segments
515 */
516static inline uint32_t
517initial_cwnd(uint16_t smss)
518{
519	if (smss > 2190)
520		return 2 * smss;
521	else if (smss > 1095)
522		return 3 * smss;
523	return 4 * smss;
524}
525
526/*
527 * queue standalone packet to he particular output device
528 * It assumes that:
529 * - L2/L3/L4 headers should be already set.
530 * - packet fits into one segment.
531 */
532static inline int
533send_pkt(struct tle_tcp_stream *s, struct tle_dev *dev, struct rte_mbuf *m)
534{
535	uint32_t n, nb;
536	struct tle_drb *drb;
537
538	if (stream_drb_alloc(s, &drb, 1) == 0)
539		return -ENOBUFS;
540
541	/* enqueue pkt for TX. */
542	nb = 1;
543	n = tle_dring_mp_enqueue(&dev->tx.dr, (const void * const*)&m, 1,
544		&drb, &nb);
545
546	/* free unused drbs. */
547	if (nb != 0)
548		stream_drb_free(s, &drb, 1);
549
550	return (n == 1) ? 0 : -ENOBUFS;
551}
552
553static inline int
554send_ctrl_pkt(struct tle_tcp_stream *s, struct rte_mbuf *m, uint32_t seq,
555	uint32_t flags)
556{
557	const struct tle_dest *dst;
558	uint32_t pid, type;
559	int32_t rc;
560
561	dst = &s->tx.dst;
562	type = s->s.type;
563	pid = rte_atomic32_add_return(&dst->dev->tx.packet_id[type], 1) - 1;
564
565	rc = tcp_fill_mbuf(m, s, dst, 0, s->s.port, seq, flags, pid, 1);
566	if (rc == 0)
567		rc = send_pkt(s, dst->dev, m);
568
569	return rc;
570}
571
572static inline int
573send_rst(struct tle_tcp_stream *s, uint32_t seq)
574{
575	struct rte_mbuf *m;
576	int32_t rc;
577
578	m = rte_pktmbuf_alloc(s->tx.dst.head_mp);
579	if (m == NULL)
580		return -ENOMEM;
581
582	rc = send_ctrl_pkt(s, m, seq, TCP_FLAG_RST);
583	if (rc != 0)
584		rte_pktmbuf_free(m);
585
586	return rc;
587}
588
589static inline int
590send_ack(struct tle_tcp_stream *s, uint32_t tms, uint32_t flags)
591{
592	struct rte_mbuf *m;
593	uint32_t seq;
594	int32_t rc;
595
596	m = rte_pktmbuf_alloc(s->tx.dst.head_mp);
597	if (m == NULL)
598		return -ENOMEM;
599
600	seq = s->tcb.snd.nxt - ((flags & (TCP_FLAG_FIN | TCP_FLAG_SYN)) != 0);
601	s->tcb.snd.ts = tms;
602
603	rc = send_ctrl_pkt(s, m, seq, flags);
604	if (rc != 0) {
605		rte_pktmbuf_free(m);
606		return rc;
607	}
608
609	s->tcb.snd.ack = s->tcb.rcv.nxt;
610	return 0;
611}
612
613
614static int
615sync_ack(struct tle_tcp_stream *s, const union pkt_info *pi,
616	const union seg_info *si, uint32_t ts, struct rte_mbuf *m)
617{
618	uint16_t len;
619	int32_t rc;
620	uint32_t pid, seq, type;
621	struct tle_dev *dev;
622	const void *da;
623	struct tle_dest dst;
624	const struct tcp_hdr *th;
625
626	type = s->s.type;
627
628	/* get destination information. */
629	if (type == TLE_V4)
630		da = &pi->addr4.src;
631	else
632		da = &pi->addr6->src;
633
634	rc = stream_get_dest(&s->s, da, &dst);
635	if (rc < 0)
636		return rc;
637
638	th = rte_pktmbuf_mtod_offset(m, const struct tcp_hdr *,
639		m->l2_len + m->l3_len);
640	get_syn_opts(&s->tcb.so, (uintptr_t)(th + 1), m->l4_len - sizeof(*th));
641
642	s->tcb.rcv.nxt = si->seq + 1;
643	seq = sync_gen_seq(pi, s->tcb.rcv.nxt, ts, s->tcb.so.mss);
644	s->tcb.so.ts.ecr = s->tcb.so.ts.val;
645	s->tcb.so.ts.val = sync_gen_ts(ts, s->tcb.so.wscale);
646	s->tcb.so.wscale = (s->tcb.so.wscale == TCP_WSCALE_NONE) ?
647		TCP_WSCALE_NONE : TCP_WSCALE_DEFAULT;
648	s->tcb.so.mss = calc_smss(dst.mtu, &dst);
649
650	/* reset mbuf's data contents. */
651	len = m->l2_len + m->l3_len + m->l4_len;
652	m->tx_offload = 0;
653	if (rte_pktmbuf_adj(m, len) == NULL)
654		return -EINVAL;
655
656	dev = dst.dev;
657	pid = rte_atomic32_add_return(&dev->tx.packet_id[type], 1) - 1;
658
659	rc = tcp_fill_mbuf(m, s, &dst, 0, pi->port, seq,
660		TCP_FLAG_SYN | TCP_FLAG_ACK, pid, 1);
661	if (rc == 0)
662		rc = send_pkt(s, dev, m);
663
664	return rc;
665}
666
667/*
668 * RFC 793:
669 * There are four cases for the acceptability test for an incoming segment:
670 * Segment Receive  Test
671 * Length  Window
672 * ------- -------  -------------------------------------------
673 *    0       0     SEG.SEQ = RCV.NXT
674 *    0      >0     RCV.NXT =< SEG.SEQ < RCV.NXT+RCV.WND
675 *   >0       0     not acceptable
676 *   >0      >0     RCV.NXT =< SEG.SEQ < RCV.NXT+RCV.WND
677 *                  or RCV.NXT =< SEG.SEQ+SEG.LEN-1 < RCV.NXT+RCV.WND
678 */
679static inline int
680check_seqn(const struct tcb *tcb, uint32_t seqn, uint32_t len)
681{
682	uint32_t n;
683
684	n = seqn + len;
685	if (seqn - tcb->rcv.nxt >= tcb->rcv.wnd &&
686			n - tcb->rcv.nxt > tcb->rcv.wnd)
687		return -ERANGE;
688
689	return 0;
690}
691
692static inline union tsopt
693rx_tms_opt(const struct tcb *tcb, const struct rte_mbuf *mb)
694{
695	union tsopt ts;
696	uintptr_t opt;
697	const struct tcp_hdr *th;
698
699	if (tcb->so.ts.val != 0) {
700		opt = rte_pktmbuf_mtod_offset(mb, uintptr_t,
701			mb->l2_len + mb->l3_len + sizeof(*th));
702		ts = get_tms_opts(opt, mb->l4_len - sizeof(*th));
703	} else
704		ts.raw = 0;
705
706	return ts;
707}
708
709/*
710 * PAWS and sequence check.
711 * RFC 1323 4.2.1
712 */
713static inline int
714rx_check_seq(struct tcb *tcb, uint32_t seq, uint32_t len, const union tsopt ts)
715{
716	int32_t rc;
717
718	/* RFC 1323 4.2.1 R2 */
719	rc = check_seqn(tcb, seq, len);
720	if (rc < 0)
721		return rc;
722
723	if (ts.raw != 0) {
724
725		/* RFC 1323 4.2.1 R1 */
726		if (tcp_seq_lt(ts.val, tcb->rcv.ts))
727			return -ERANGE;
728
729		/* RFC 1323 4.2.1 R3 */
730		if (tcp_seq_leq(seq, tcb->snd.ack) &&
731				tcp_seq_lt(tcb->snd.ack, seq + len))
732			tcb->rcv.ts = ts.val;
733	}
734
735	return rc;
736}
737
738static inline int
739rx_check_ack(const struct tcb *tcb, uint32_t ack)
740{
741	uint32_t max;
742
743	max = (uint32_t)RTE_MAX(tcb->snd.nxt, tcb->snd.rcvr);
744
745	if (tcp_seq_leq(tcb->snd.una, ack) && tcp_seq_leq(ack, max))
746		return 0;
747
748	return -ERANGE;
749}
750
751static inline int
752rx_check_seqack(struct tcb *tcb, uint32_t seq, uint32_t ack, uint32_t len,
753	const union tsopt ts)
754{
755	int32_t rc;
756
757	rc = rx_check_seq(tcb, seq, len, ts);
758	rc |= rx_check_ack(tcb, ack);
759	return rc;
760}
761
762static inline int
763restore_syn_pkt(const union pkt_info *pi, const union seg_info *si,
764	uint32_t ts, struct rte_mbuf *mb)
765{
766	int32_t rc;
767	uint32_t len;
768	struct tcp_hdr *th;
769	struct syn_opts so;
770
771	/* check that ACK, etc fields are what we expected. */
772	rc = sync_check_ack(pi, si->seq, si->ack - 1, ts);
773	if (rc < 0)
774		return rc;
775
776	so.mss = rc;
777
778	th = rte_pktmbuf_mtod_offset(mb, struct tcp_hdr *,
779		mb->l2_len + mb->l3_len);
780	len = mb->l4_len - sizeof(*th);
781	sync_get_opts(&so, (uintptr_t)(th + 1), len);
782
783	/* reconstruct SYN options, extend header size if necessary */
784	if (len < TCP_TX_OPT_LEN_MAX) {
785		len = TCP_TX_OPT_LEN_MAX - len;
786		th->data_off = TCP_TX_OPT_LEN_MAX / TCP_DATA_ALIGN <<
787			TCP_DATA_OFFSET;
788		mb->pkt_len += len;
789		mb->data_len += len;
790		mb->l4_len += len;
791	}
792
793	fill_syn_opts(th + 1, &so);
794	return 0;
795}
796
797static inline int
798rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st,
799	const union pkt_info *pi, const union seg_info *si,
800	uint32_t ts, struct rte_mbuf *mb)
801{
802	int32_t rc;
803	struct stbl_entry *se;
804
805	if (pi->tf.flags != TCP_FLAG_ACK || rx_check_stream(s, pi) != 0)
806		return -EINVAL;
807
808	/* ACK for new connection request. */
809
810	rc = restore_syn_pkt(pi, si, ts, mb);
811	if (rc < 0)
812		return rc;
813
814	se = stbl_add_pkt(st, pi, mb);
815	if (se == NULL)
816		return -ENOBUFS;
817
818	/* put new connection requests into stream listen queue */
819	if (rte_ring_enqueue_burst(s->rx.q,
820			(void * const *)&se, 1) != 1) {
821		stbl_del_pkt(st, se, pi);
822		return -ENOBUFS;
823	}
824
825	return 0;
826}
827
828static inline void
829stream_term(struct tle_tcp_stream *s)
830{
831	struct sdr *dr;
832
833	s->tcb.state = TCP_ST_CLOSED;
834	rte_smp_wmb();
835
836	timer_stop(s);
837
838	/* close() was already invoked, schedule final cleanup */
839	if ((s->tcb.uop & TCP_OP_CLOSE) != 0) {
840
841		dr = CTX_TCP_SDR(s->s.ctx);
842		STAILQ_INSERT_TAIL(&dr->be, &s->s, link);
843
844	/* notify user that stream need to be closed */
845	} else if (s->err.ev != NULL)
846		tle_event_raise(s->err.ev);
847	else if (s->err.cb.func != NULL)
848		s->err.cb.func(s->err.cb.data, &s->s);
849}
850
851static inline int
852data_pkt_adjust(const struct tcb *tcb, struct rte_mbuf *mb, uint32_t hlen,
853	uint32_t *seqn, uint32_t *plen)
854{
855	uint32_t len, n, seq;
856
857	seq = *seqn;
858	len = *plen;
859
860	rte_pktmbuf_adj(mb, hlen);
861	if (len == 0)
862		return -ENODATA;
863	/* cut off the start of the packet */
864	else if (tcp_seq_lt(seq, tcb->rcv.nxt)) {
865		n = tcb->rcv.nxt - seq;
866		if (n >= len)
867			return -ENODATA;
868
869		rte_pktmbuf_adj(mb, n);
870		*seqn = seq + n;
871		*plen = len - n;
872	}
873
874	return 0;
875}
876
877static inline uint32_t
878rx_ackdata(struct tle_tcp_stream *s, uint32_t ack)
879{
880	uint32_t k, n;
881
882	n = ack - (uint32_t)s->tcb.snd.una;
883
884	/* some more data was acked. */
885	if (n != 0) {
886
887		/* advance SND.UNA and free related packets. */
888		k = rte_ring_free_count(s->tx.q);
889		free_una_data(s, n);
890
891		/* mark the stream as available for writing */
892		if (rte_ring_free_count(s->tx.q) != 0) {
893			if (s->tx.ev != NULL)
894				tle_event_raise(s->tx.ev);
895			else if (k == 0 && s->tx.cb.func != NULL)
896				s->tx.cb.func(s->tx.cb.data, &s->s);
897		}
898	}
899
900	return n;
901}
902
903static void
904rx_fin_state(struct tle_tcp_stream *s, struct resp_info *rsp)
905{
906	uint32_t state;
907	int32_t ackfin;
908
909	s->tcb.rcv.nxt += 1;
910
911	ackfin = (s->tcb.snd.una == s->tcb.snd.fss);
912	state = s->tcb.state;
913
914	if (state == TCP_ST_ESTABLISHED) {
915		s->tcb.state = TCP_ST_CLOSE_WAIT;
916		/* raise err.ev & err.cb */
917		if (s->err.ev != NULL)
918			tle_event_raise(s->err.ev);
919		else if (s->err.cb.func != NULL)
920			s->err.cb.func(s->err.cb.data, &s->s);
921	} else if (state == TCP_ST_FIN_WAIT_1 || state == TCP_ST_CLOSING) {
922		rsp->flags |= TCP_FLAG_ACK;
923		if (ackfin != 0) {
924			s->tcb.state = TCP_ST_TIME_WAIT;
925			s->tcb.snd.rto = TCP_RTO_2MSL;
926			timer_reset(s);
927		} else
928			s->tcb.state = TCP_ST_CLOSING;
929	} else if (state == TCP_ST_FIN_WAIT_2) {
930		rsp->flags |= TCP_FLAG_ACK;
931		s->tcb.state = TCP_ST_TIME_WAIT;
932		s->tcb.snd.rto = TCP_RTO_2MSL;
933		timer_reset(s);
934	} else if (state == TCP_ST_LAST_ACK && ackfin != 0) {
935		stream_term(s);
936	}
937}
938
939/*
940 * FIN process for ESTABLISHED state
941 * returns:
942 * 0 < - error occurred
943 * 0 - FIN was processed OK, and mbuf can be free/reused.
944 * 0 > - FIN was processed OK and mbuf can't be free/reused.
945 */
946static inline int
947rx_fin(struct tle_tcp_stream *s, uint32_t state,
948	const union seg_info *si, struct rte_mbuf *mb,
949	struct resp_info *rsp)
950{
951	uint32_t hlen, plen, seq;
952	int32_t ret;
953	union tsopt ts;
954
955	hlen = PKT_L234_HLEN(mb);
956	plen = mb->pkt_len - hlen;
957	seq = si->seq;
958
959	ts = rx_tms_opt(&s->tcb, mb);
960	ret = rx_check_seqack(&s->tcb, seq, si->ack, plen, ts);
961	if (ret != 0)
962		return ret;
963
964	if (state < TCP_ST_ESTABLISHED)
965		return -EINVAL;
966
967	if (plen != 0) {
968
969		ret = data_pkt_adjust(&s->tcb, mb, hlen, &seq, &plen);
970		if (ret != 0)
971			return ret;
972		if (rx_data_enqueue(s, seq, plen, &mb, 1) != 1)
973			return -ENOBUFS;
974	}
975
976	/* process ack here */
977	rx_ackdata(s, si->ack);
978
979	/* some fragments still missing */
980	if (seq + plen != s->tcb.rcv.nxt) {
981		s->tcb.rcv.frs.seq = seq + plen;
982		s->tcb.rcv.frs.on = 1;
983	} else
984		rx_fin_state(s, rsp);
985
986	return plen;
987}
988
989static inline int
990rx_rst(struct tle_tcp_stream *s, uint32_t state, uint32_t flags,
991	const union seg_info *si)
992{
993	int32_t rc;
994
995	/*
996	 * RFC 793: In all states except SYN-SENT, all reset (RST) segments
997	 * are validated by checking their SEQ-fields.
998	 * A reset is valid if its sequence number is in the window.
999	 * In the SYN-SENT state (a RST received in response to an initial SYN),
1000	 * the RST is acceptable if the ACK field acknowledges the SYN.
1001	 */
1002	if (state == TCP_ST_SYN_SENT) {
1003		rc = ((flags & TCP_FLAG_ACK) == 0 ||
1004				si->ack != s->tcb.snd.nxt) ?
1005			-ERANGE : 0;
1006	}
1007
1008	else
1009		rc = check_seqn(&s->tcb, si->seq, 0);
1010
1011	if (rc == 0)
1012		stream_term(s);
1013
1014	return rc;
1015}
1016
1017/*
1018 *  check do we have FIN  that was received out-of-order.
1019 *  if yes, try to process it now.
1020 */
1021static inline void
1022rx_ofo_fin(struct tle_tcp_stream *s, struct resp_info *rsp)
1023{
1024	if (s->tcb.rcv.frs.on != 0 && s->tcb.rcv.nxt == s->tcb.rcv.frs.seq)
1025		rx_fin_state(s, rsp);
1026}
1027
1028static inline void
1029dack_info_init(struct dack_info *tack, const struct tcb *tcb)
1030{
1031	memset(tack, 0, sizeof(*tack));
1032	tack->ack = tcb->snd.una;
1033	tack->segs.dup = tcb->rcv.dupack;
1034	tack->wu.raw = tcb->snd.wu.raw;
1035	tack->wnd = tcb->snd.wnd >> tcb->snd.wscale;
1036}
1037
1038static inline void
1039ack_window_update(struct tcb *tcb, const struct dack_info *tack)
1040{
1041	tcb->snd.wu.raw = tack->wu.raw;
1042	tcb->snd.wnd = tack->wnd << tcb->snd.wscale;
1043}
1044
1045static inline void
1046ack_cwnd_update(struct tcb *tcb, uint32_t acked, const struct dack_info *tack)
1047{
1048	uint32_t n;
1049
1050	n = tack->segs.ack * tcb->snd.mss;
1051
1052	/* slow start phase, RFC 5681 3.1 (2)  */
1053	if (tcb->snd.cwnd < tcb->snd.ssthresh)
1054		tcb->snd.cwnd += RTE_MIN(acked, n);
1055	/* congestion avoidance phase, RFC 5681 3.1 (3) */
1056	else
1057		tcb->snd.cwnd += RTE_MAX(1U, n * tcb->snd.mss / tcb->snd.cwnd);
1058}
1059
1060static inline void
1061rto_ssthresh_update(struct tcb *tcb)
1062{
1063	uint32_t k, n;
1064
1065	/* RFC 5681 3.1 (4)  */
1066	n = (tcb->snd.nxt - tcb->snd.una) / 2;
1067	k = 2 * tcb->snd.mss;
1068	tcb->snd.ssthresh = RTE_MAX(n, k);
1069}
1070
1071static inline void
1072rto_cwnd_update(struct tcb *tcb)
1073{
1074
1075	if (tcb->snd.nb_retx == 0)
1076		rto_ssthresh_update(tcb);
1077
1078	/*
1079	 * RFC 5681 3.1: upon a timeout cwnd MUST be set to
1080	 * no more than 1 full-sized segment.
1081	 */
1082	tcb->snd.cwnd = tcb->snd.mss;
1083}
1084
1085static inline void
1086ack_info_update(struct dack_info *tack, const union seg_info *si,
1087	int32_t badseq, uint32_t dlen, const union tsopt ts)
1088{
1089	if (badseq != 0) {
1090		tack->segs.badseq++;
1091		return;
1092	}
1093
1094	/* segnt with incoming data */
1095	tack->segs.data += (dlen != 0);
1096
1097	/* segment with newly acked data */
1098	if (tcp_seq_lt(tack->ack, si->ack)) {
1099		tack->segs.dup = 0;
1100		tack->segs.ack++;
1101		tack->ack = si->ack;
1102		tack->ts = ts;
1103
1104	/*
1105	 * RFC 5681: An acknowledgment is considered a "duplicate" when:
1106	 * (a) the receiver of the ACK has outstanding data
1107	 * (b) the incoming acknowledgment carries no data
1108	 * (c) the SYN and FIN bits are both off
1109	 * (d) the acknowledgment number is equal to the TCP.UNA
1110	 * (e) the advertised window in the incoming acknowledgment equals the
1111	 * advertised window in the last incoming acknowledgment.
1112	 *
1113	 * Here will have only to check only for (b),(d),(e).
1114	 * (a) will be checked later for the whole bulk of packets,
1115	 * (c) should never happen here.
1116	 */
1117	} else if (dlen == 0 && si->wnd == tack->wnd && ++tack->segs.dup == 3) {
1118		tack->dup3.seg = tack->segs.ack + 1;
1119		tack->dup3.ack = tack->ack;
1120	}
1121
1122	/*
1123	 * RFC 793:
1124	 * If SND.UNA < SEG.ACK =< SND.NXT, the send window should be
1125	 * updated.  If (SND.WL1 < SEG.SEQ or (SND.WL1 = SEG.SEQ and
1126	 * SND.WL2 =< SEG.ACK)), set SND.WND <- SEG.WND, set
1127	 * SND.WL1 <- SEG.SEQ, and set SND.WL2 <- SEG.ACK.
1128	 */
1129	if (tcp_seq_lt(tack->wu.wl1, si->seq) ||
1130			(si->seq == tack->wu.wl1 &&
1131			tcp_seq_leq(tack->wu.wl2, si->ack))) {
1132
1133		tack->wu.wl1 = si->seq;
1134		tack->wu.wl2 = si->ack;
1135		tack->wnd = si->wnd;
1136	}
1137}
1138
1139static inline uint32_t
1140rx_data_ack(struct tle_tcp_stream *s, struct dack_info *tack,
1141	const union seg_info si[], struct rte_mbuf *mb[], struct rte_mbuf *rp[],
1142	int32_t rc[], uint32_t num)
1143{
1144	uint32_t i, j, k, n, t;
1145	uint32_t hlen, plen, seq, tlen;
1146	int32_t ret;
1147	union tsopt ts;
1148
1149	k = 0;
1150	for (i = 0; i != num; i = j) {
1151
1152		hlen = PKT_L234_HLEN(mb[i]);
1153		plen = mb[i]->pkt_len - hlen;
1154		seq = si[i].seq;
1155
1156		ts = rx_tms_opt(&s->tcb, mb[i]);
1157		ret = rx_check_seqack(&s->tcb, seq, si[i].ack, plen, ts);
1158
1159		/* account segment received */
1160		ack_info_update(tack, &si[i], ret != 0, plen, ts);
1161
1162		if (ret == 0) {
1163			/* skip duplicate data, if any */
1164			ret = data_pkt_adjust(&s->tcb, mb[i], hlen,
1165				&seq, &plen);
1166		}
1167
1168		j = i + 1;
1169		if (ret != 0) {
1170			rp[k] = mb[i];
1171			rc[k] = -ret;
1172			k++;
1173			continue;
1174		}
1175
1176		/* group sequential packets together. */
1177		for (tlen = plen; j != num; tlen += plen, j++) {
1178
1179			hlen = PKT_L234_HLEN(mb[j]);
1180			plen = mb[j]->pkt_len - hlen;
1181
1182			/* not consecutive packet */
1183			if (plen == 0 || seq + tlen != si[j].seq)
1184				break;
1185
1186			/* check SEQ/ACK */
1187			ts = rx_tms_opt(&s->tcb, mb[j]);
1188			ret = rx_check_seqack(&s->tcb, si[j].seq, si[j].ack,
1189				plen, ts);
1190
1191			/* account for segment received */
1192			ack_info_update(tack, &si[j], ret != 0, plen, ts);
1193
1194			if (ret != 0) {
1195				rp[k] = mb[j];
1196				rc[k] = -ret;
1197				k++;
1198				break;
1199			}
1200			rte_pktmbuf_adj(mb[j], hlen);
1201		}
1202
1203		n = j - i;
1204		j += (ret != 0);
1205
1206		/* account for OFO data */
1207		if (seq != s->tcb.rcv.nxt)
1208			tack->segs.ofo += n;
1209
1210		/* enqueue packets */
1211		t = rx_data_enqueue(s, seq, tlen, mb + i, n);
1212
1213		/* if we are out of space in stream recv buffer. */
1214		for (; t != n; t++) {
1215			rp[k] = mb[i + t];
1216			rc[k] = -ENOBUFS;
1217			k++;
1218		}
1219	}
1220
1221	return num - k;
1222}
1223
1224static inline void
1225start_fast_retransmit(struct tle_tcp_stream *s)
1226{
1227	struct tcb *tcb;
1228
1229	tcb = &s->tcb;
1230
1231	/* RFC 6582 3.2.2 */
1232	tcb->snd.rcvr = tcb->snd.nxt;
1233	tcb->snd.fastack = 1;
1234
1235	/* RFC 5681 3.2.2 */
1236	rto_ssthresh_update(tcb);
1237
1238	/* RFC 5681 3.2.3 */
1239	tcp_txq_rst_nxt_head(s);
1240	tcb->snd.nxt = tcb->snd.una;
1241	tcb->snd.cwnd = tcb->snd.ssthresh + 3 * tcb->snd.mss;
1242}
1243
1244static inline void
1245stop_fast_retransmit(struct tle_tcp_stream *s)
1246{
1247	struct tcb *tcb;
1248	uint32_t n;
1249
1250	tcb = &s->tcb;
1251	n = tcb->snd.nxt - tcb->snd.una;
1252	tcb->snd.cwnd = RTE_MIN(tcb->snd.ssthresh,
1253		RTE_MAX(n, tcb->snd.mss) + tcb->snd.mss);
1254	tcb->snd.fastack = 0;
1255}
1256
1257static inline int
1258in_fast_retransmit(struct tle_tcp_stream *s, uint32_t ack_len, uint32_t ack_num,
1259	uint32_t dup_num)
1260{
1261	uint32_t n;
1262	struct tcb *tcb;
1263
1264	tcb = &s->tcb;
1265
1266	/* RFC 5682 3.2.3 partial ACK */
1267	if (ack_len != 0) {
1268
1269		n = ack_num * tcb->snd.mss;
1270		if (ack_len >= n)
1271			tcb->snd.cwnd -= ack_len - n;
1272		else
1273			tcb->snd.cwnd -= ack_len % tcb->snd.mss;
1274
1275		/*
1276		 * For the first partial ACK that arrives
1277		 * during fast recovery, also reset the
1278		 * retransmit timer.
1279		 */
1280		if (tcb->snd.fastack == 1)
1281			timer_reset(s);
1282
1283		tcb->snd.fastack += ack_num;
1284		return 1;
1285
1286	/* RFC 5681 3.2.4 */
1287	} else if (dup_num > 3) {
1288		s->tcb.snd.cwnd += (dup_num - 3) * tcb->snd.mss;
1289		return 1;
1290	}
1291
1292	return 0;
1293}
1294
1295static inline int
1296process_ack(struct tle_tcp_stream *s, uint32_t acked,
1297	const struct dack_info *tack)
1298{
1299	int32_t send;
1300
1301	send = 0;
1302
1303	/* normal mode */
1304	if (s->tcb.snd.fastack == 0) {
1305
1306		send = 1;
1307
1308		/* RFC 6582 3.2.2 switch to fast retransmit mode */
1309		if (tack->dup3.seg != 0 && s->tcb.snd.una != s->tcb.snd.nxt &&
1310				s->tcb.snd.una >= s->tcb.snd.rcvr) {
1311
1312			start_fast_retransmit(s);
1313			in_fast_retransmit(s,
1314				tack->ack - tack->dup3.ack,
1315				tack->segs.ack - tack->dup3.seg - 1,
1316				tack->segs.dup);
1317
1318		/* remain in normal mode */
1319		} else if (acked != 0) {
1320			ack_cwnd_update(&s->tcb, acked, tack);
1321			timer_stop(s);
1322		}
1323
1324	/* fast retransmit mode */
1325	} else {
1326
1327		/* remain in fast retransmit mode */
1328		if (s->tcb.snd.una < s->tcb.snd.rcvr) {
1329
1330			send = in_fast_retransmit(s, acked, tack->segs.ack,
1331				tack->segs.dup);
1332		} else {
1333			/* RFC 5682 3.2.3 full ACK */
1334			stop_fast_retransmit(s);
1335			timer_stop(s);
1336
1337			/* if we have another series of dup ACKs */
1338			if (tack->dup3.seg != 0 &&
1339					s->tcb.snd.una != s->tcb.snd.nxt &&
1340					tcp_seq_leq((uint32_t)s->tcb.snd.rcvr,
1341					tack->dup3.ack)) {
1342
1343				/* restart fast retransmit again. */
1344				start_fast_retransmit(s);
1345				send = in_fast_retransmit(s,
1346					tack->ack - tack->dup3.ack,
1347					tack->segs.ack - tack->dup3.seg - 1,
1348					tack->segs.dup);
1349			}
1350		}
1351	}
1352
1353	return send;
1354}
1355
1356/*
1357 * our FIN was acked, stop rto timer, change stream state,
1358 * and possibly close the stream.
1359 */
1360static inline void
1361rx_ackfin(struct tle_tcp_stream *s)
1362{
1363	uint32_t state;
1364
1365	s->tcb.snd.una = s->tcb.snd.fss;
1366	empty_mbuf_ring(s->tx.q);
1367
1368	state = s->tcb.state;
1369	if (state == TCP_ST_LAST_ACK)
1370		stream_term(s);
1371	else if (state == TCP_ST_FIN_WAIT_1) {
1372		timer_stop(s);
1373		s->tcb.state = TCP_ST_FIN_WAIT_2;
1374	} else if (state == TCP_ST_CLOSING) {
1375		s->tcb.state = TCP_ST_TIME_WAIT;
1376		s->tcb.snd.rto = TCP_RTO_2MSL;
1377		timer_reset(s);
1378	}
1379}
1380
1381static inline void
1382rx_process_ack(struct tle_tcp_stream *s, uint32_t ts,
1383	const struct dack_info *tack)
1384{
1385	int32_t send;
1386	uint32_t n;
1387
1388	s->tcb.rcv.dupack = tack->segs.dup;
1389
1390	n = rx_ackdata(s, tack->ack);
1391	send = process_ack(s, n, tack);
1392
1393	/* try to send more data. */
1394	if ((n != 0 || send != 0) && tcp_txq_nxt_cnt(s) != 0)
1395		txs_enqueue(s->s.ctx, s);
1396
1397	/* restart RTO timer. */
1398	if (s->tcb.snd.nxt != s->tcb.snd.una)
1399		timer_start(s);
1400
1401	/* update rto, if fresh packet is here then calculate rtt */
1402	if (tack->ts.ecr != 0)
1403		rto_estimate(&s->tcb, ts - tack->ts.ecr);
1404}
1405
1406/*
1407 * process <SYN,ACK>
1408 * returns negative value on failure, or zero on success.
1409 */
1410static inline int
1411rx_synack(struct tle_tcp_stream *s, uint32_t ts, uint32_t state,
1412	const union seg_info *si, struct rte_mbuf *mb,
1413	struct resp_info *rsp)
1414{
1415	struct syn_opts so;
1416	struct tcp_hdr *th;
1417
1418	if (state != TCP_ST_SYN_SENT)
1419		return -EINVAL;
1420
1421	/* invalid SEG.SEQ */
1422	if (si->ack != (uint32_t)s->tcb.snd.nxt) {
1423		rsp->flags = TCP_FLAG_RST;
1424		return 0;
1425	}
1426
1427	th = rte_pktmbuf_mtod_offset(mb, struct tcp_hdr *,
1428		mb->l2_len + mb->l3_len);
1429	get_syn_opts(&so, (uintptr_t)(th + 1), mb->l4_len - sizeof(*th));
1430
1431	s->tcb.so = so;
1432
1433	s->tcb.snd.una = s->tcb.snd.nxt;
1434	s->tcb.snd.mss = so.mss;
1435	s->tcb.snd.wnd = si->wnd << so.wscale;
1436	s->tcb.snd.wu.wl1 = si->seq;
1437	s->tcb.snd.wu.wl2 = si->ack;
1438	s->tcb.snd.wscale = so.wscale;
1439
1440	/* setup congestion variables */
1441	s->tcb.snd.cwnd = initial_cwnd(s->tcb.snd.mss);
1442	s->tcb.snd.ssthresh = s->tcb.snd.wnd;
1443
1444	s->tcb.rcv.ts = so.ts.val;
1445	s->tcb.rcv.irs = si->seq;
1446	s->tcb.rcv.nxt = si->seq + 1;
1447
1448	/* calculate initial rto */
1449	rto_estimate(&s->tcb, ts - s->tcb.snd.ts);
1450
1451	rsp->flags |= TCP_FLAG_ACK;
1452
1453	timer_stop(s);
1454	s->tcb.state = TCP_ST_ESTABLISHED;
1455	rte_smp_wmb();
1456
1457	if (s->tx.ev != NULL)
1458		tle_event_raise(s->tx.ev);
1459	else if (s->tx.cb.func != NULL)
1460		s->tx.cb.func(s->tx.cb.data, &s->s);
1461
1462	return 0;
1463}
1464
1465static inline uint32_t
1466rx_stream(struct tle_tcp_stream *s, uint32_t ts,
1467	const union pkt_info *pi, const union seg_info si[],
1468	struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1469	uint32_t num)
1470{
1471	uint32_t i, k, n, state;
1472	int32_t ret;
1473	struct resp_info rsp;
1474	struct dack_info tack;
1475
1476	k = 0;
1477	rsp.flags = 0;
1478
1479	state = s->tcb.state;
1480
1481	/*
1482	 * first check for the states/flags where we don't
1483	 * expect groups of packets.
1484	 */
1485
1486	/* process RST */
1487	if ((pi->tf.flags & TCP_FLAG_RST) != 0) {
1488		for (i = 0;
1489				i != num &&
1490				rx_rst(s, state, pi->tf.flags, &si[i]);
1491				i++)
1492			;
1493		i = 0;
1494
1495	/* RFC 793: if the ACK bit is off drop the segment and return */
1496	} else if ((pi->tf.flags & TCP_FLAG_ACK) == 0) {
1497		i = 0;
1498	/*
1499	 * first check for the states/flags where we don't
1500	 * expect groups of packets.
1501	 */
1502
1503	/* process <SYN,ACK> */
1504	} else if ((pi->tf.flags & TCP_FLAG_SYN) != 0) {
1505		ret = 0;
1506		for (i = 0; i != num; i++) {
1507			ret = rx_synack(s, ts, state, &si[i], mb[i], &rsp);
1508			if (ret == 0)
1509				break;
1510
1511			rc[k] = -ret;
1512			rp[k] = mb[i];
1513			k++;
1514		}
1515
1516	/* process FIN */
1517	} else if ((pi->tf.flags & TCP_FLAG_FIN) != 0) {
1518		ret = 0;
1519		for (i = 0; i != num; i++) {
1520			ret = rx_fin(s, state, &si[i], mb[i], &rsp);
1521			if (ret >= 0)
1522				break;
1523
1524			rc[k] = -ret;
1525			rp[k] = mb[i];
1526			k++;
1527		}
1528		i += (ret > 0);
1529
1530	/* normal data/ack packets */
1531	} else if (state >= TCP_ST_ESTABLISHED && state <= TCP_ST_LAST_ACK) {
1532
1533		/* process incoming data packets. */
1534		dack_info_init(&tack, &s->tcb);
1535		n = rx_data_ack(s, &tack, si, mb, rp, rc, num);
1536
1537		/* follow up actions based on aggregated information */
1538
1539		/* update SND.WND */
1540		ack_window_update(&s->tcb, &tack);
1541
1542		/*
1543		 * fast-path: all data & FIN was already sent out
1544		 * and now is acknowledged.
1545		 */
1546		if (s->tcb.snd.fss == s->tcb.snd.nxt &&
1547				tack.ack == (uint32_t) s->tcb.snd.nxt)
1548			rx_ackfin(s);
1549		else
1550			rx_process_ack(s, ts, &tack);
1551
1552		/*
1553		 * send an immediate ACK if either:
1554		 * - received segment with invalid seq/ack number
1555		 * - received segment with OFO data
1556		 * - received segment with INO data and no TX is scheduled
1557		 *   for that stream.
1558		 */
1559		if (tack.segs.badseq != 0 || tack.segs.ofo != 0 ||
1560				(tack.segs.data != 0 &&
1561				rte_atomic32_read(&s->tx.arm) == 0))
1562			rsp.flags |= TCP_FLAG_ACK;
1563
1564		rx_ofo_fin(s, &rsp);
1565
1566		k += num - n;
1567		i = num;
1568
1569	/* unhandled state, drop all packets. */
1570	} else
1571		i = 0;
1572
1573	/* we have a response packet to send. */
1574	if (rsp.flags == TCP_FLAG_RST) {
1575		send_rst(s, si[i].ack);
1576		stream_term(s);
1577	} else if (rsp.flags != 0) {
1578		send_ack(s, ts, rsp.flags);
1579
1580		/* start the timer for FIN packet */
1581		if ((rsp.flags & TCP_FLAG_FIN) != 0)
1582			timer_reset(s);
1583	}
1584
1585	/* unprocessed packets */
1586	for (; i != num; i++, k++) {
1587		rc[k] = EINVAL;
1588		rp[k] = mb[i];
1589	}
1590
1591	return num - k;
1592}
1593
1594static inline uint32_t
1595rx_postsyn(struct tle_dev *dev, struct stbl *st, uint32_t type, uint32_t ts,
1596	const union pkt_info pi[], const union seg_info si[],
1597	struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1598	uint32_t num)
1599{
1600	struct tle_tcp_stream *s;
1601	uint32_t i, k, state;
1602	int32_t ret;
1603
1604	s = rx_obtain_stream(dev, st, &pi[0], type);
1605	if (s == NULL) {
1606		for (i = 0; i != num; i++) {
1607			rc[i] = ENOENT;
1608			rp[i] = mb[i];
1609		}
1610		return 0;
1611	}
1612
1613	k = 0;
1614	state = s->tcb.state;
1615
1616	if (state == TCP_ST_LISTEN) {
1617
1618		/* one connection per flow */
1619		ret = EINVAL;
1620		for (i = 0; i != num && ret != 0; i++) {
1621			ret = rx_ack_listen(s, st, pi, &si[i], ts, mb[i]);
1622			if (ret != 0) {
1623				rc[k] = -ret;
1624				rp[k] = mb[i];
1625				k++;
1626			}
1627		}
1628		/* duplicate SYN requests */
1629		for (; i != num; i++, k++) {
1630			rc[k] = EINVAL;
1631			rp[k] = mb[i];
1632		}
1633
1634		if (k != num && s->rx.ev != NULL)
1635			tle_event_raise(s->rx.ev);
1636		else if (s->rx.cb.func != NULL && rte_ring_count(s->rx.q) == 1)
1637			s->rx.cb.func(s->rx.cb.data, &s->s);
1638
1639	} else {
1640		i = rx_stream(s, ts, pi, si, mb, rp, rc, num);
1641		k = num - i;
1642	}
1643
1644	rwl_release(&s->rx.use);
1645	return num - k;
1646}
1647
1648
1649static inline uint32_t
1650rx_syn(struct tle_dev *dev, uint32_t type, uint32_t ts,
1651	const union pkt_info pi[], const union seg_info si[],
1652	struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1653	uint32_t num)
1654{
1655	struct tle_tcp_stream *s;
1656	uint32_t i, k;
1657	int32_t ret;
1658
1659	s = rx_obtain_listen_stream(dev, &pi[0], type);
1660	if (s == NULL) {
1661		for (i = 0; i != num; i++) {
1662			rc[i] = ENOENT;
1663			rp[i] = mb[i];
1664		}
1665		return 0;
1666	}
1667
1668	k = 0;
1669	for (i = 0; i != num; i++) {
1670
1671		/* check that this remote is allowed to connect */
1672		if (rx_check_stream(s, &pi[i]) != 0)
1673			ret = -ENOENT;
1674		else
1675			/* syncokie: reply with <SYN,ACK> */
1676			ret = sync_ack(s, &pi[i], &si[i], ts, mb[i]);
1677
1678		if (ret != 0) {
1679			rc[k] = -ret;
1680			rp[k] = mb[i];
1681			k++;
1682		}
1683	}
1684
1685	rwl_release(&s->rx.use);
1686	return num - k;
1687}
1688
1689uint16_t
1690tle_tcp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[],
1691	struct rte_mbuf *rp[], int32_t rc[], uint16_t num)
1692{
1693	struct stbl *st;
1694	uint32_t i, j, k, n, t, ts;
1695	uint64_t csf;
1696	union pkt_info pi[num];
1697	union seg_info si[num];
1698	union {
1699		uint8_t t[TLE_VNUM];
1700		uint32_t raw;
1701	} stu;
1702
1703	ts = tcp_get_tms();
1704	st = CTX_TCP_STLB(dev->ctx);
1705
1706	stu.raw = 0;
1707
1708	/* extract packet info and check the L3/L4 csums */
1709	for (i = 0; i != num; i++) {
1710
1711		get_pkt_info(pkt[i], &pi[i], &si[i]);
1712
1713		t = pi[i].tf.type;
1714		csf = dev->rx.ol_flags[t] &
1715			(PKT_RX_IP_CKSUM_BAD | PKT_RX_L4_CKSUM_BAD);
1716
1717		/* check csums in SW */
1718		if (pi[i].csf == 0 && csf != 0 && check_pkt_csum(pkt[i], csf,
1719				pi[i].tf.type, IPPROTO_TCP) != 0)
1720			pi[i].csf = csf;
1721
1722		stu.t[t] = 1;
1723	}
1724
1725	if (stu.t[TLE_V4] != 0)
1726		stbl_lock(st, TLE_V4);
1727	if (stu.t[TLE_V6] != 0)
1728		stbl_lock(st, TLE_V6);
1729
1730	k = 0;
1731	for (i = 0; i != num; i += j) {
1732
1733		t = pi[i].tf.type;
1734
1735		/*basic checks for incoming packet */
1736		if (t >= TLE_VNUM || pi[i].csf != 0 || dev->dp[t] == NULL) {
1737			rc[k] = EINVAL;
1738			rp[k] = pkt[i];
1739			j = 1;
1740			k++;
1741		/* process input SYN packets */
1742		} else if (pi[i].tf.flags == TCP_FLAG_SYN) {
1743			j = pkt_info_bulk_syneq(pi + i, num - i);
1744			n = rx_syn(dev, t, ts, pi + i, si + i, pkt + i,
1745				rp + k, rc + k, j);
1746			k += j - n;
1747		} else {
1748			j = pkt_info_bulk_eq(pi + i, num - i);
1749			n = rx_postsyn(dev, st, t, ts, pi + i, si + i, pkt + i,
1750				rp + k, rc + k, j);
1751			k += j - n;
1752		}
1753	}
1754
1755	if (stu.t[TLE_V4] != 0)
1756		stbl_unlock(st, TLE_V4);
1757	if (stu.t[TLE_V6] != 0)
1758		stbl_unlock(st, TLE_V6);
1759
1760	return num - k;
1761}
1762
1763uint16_t
1764tle_tcp_stream_synreqs(struct tle_stream *ts, struct tle_syn_req rq[],
1765	uint32_t num)
1766{
1767	uint32_t i, n;
1768	struct tle_tcp_stream *s;
1769	struct stbl_entry *se[num];
1770
1771	s = TCP_STREAM(ts);
1772	n = rte_ring_mc_dequeue_burst(s->rx.q, (void **)se, num);
1773	if (n == 0)
1774		return 0;
1775
1776	for (i = 0; i != n; i++) {
1777		rq[i].pkt = stbl_get_pkt(se[i]);
1778		rq[i].opaque = se[i];
1779	}
1780
1781	/*
1782	 * if we still have packets to read,
1783	 * then rearm stream RX event.
1784	 */
1785	if (n == num && rte_ring_count(s->rx.q) != 0) {
1786		if (rwl_try_acquire(&s->rx.use) > 0 && s->rx.ev != NULL)
1787			tle_event_raise(s->rx.ev);
1788		rwl_release(&s->rx.use);
1789	}
1790
1791	return n;
1792}
1793
1794static inline int
1795stream_fill_dest(struct tle_tcp_stream *s)
1796{
1797	int32_t rc;
1798	const void *da;
1799
1800	if (s->s.type == TLE_V4)
1801		da = &s->s.ipv4.addr.src;
1802	else
1803		da = &s->s.ipv6.addr.src;
1804
1805	rc = stream_get_dest(&s->s, da, &s->tx.dst);
1806	return (rc < 0) ? rc : 0;
1807}
1808
1809/*
1810 * helper function, prepares an accepted stream.
1811 */
1812static int
1813accept_fill_stream(struct tle_tcp_stream *ps, struct tle_tcp_stream *cs,
1814	const struct tle_tcp_accept_param *prm, uint32_t tms,
1815	const union pkt_info *pi, const union seg_info *si)
1816{
1817	int32_t rc;
1818	uint32_t rtt;
1819
1820	/* some TX still pending for that stream. */
1821	if (TCP_STREAM_TX_PENDING(cs))
1822		return -EAGAIN;
1823
1824	/* setup L4 ports and L3 addresses fields. */
1825	cs->s.port.raw = pi->port.raw;
1826	cs->s.pmsk.raw = UINT32_MAX;
1827
1828	if (pi->tf.type == TLE_V4) {
1829		cs->s.ipv4.addr = pi->addr4;
1830		cs->s.ipv4.mask.src = INADDR_NONE;
1831		cs->s.ipv4.mask.dst = INADDR_NONE;
1832	} else if (pi->tf.type == TLE_V6) {
1833		cs->s.ipv6.addr = *pi->addr6;
1834		rte_memcpy(&cs->s.ipv6.mask.src, &tle_ipv6_none,
1835			sizeof(cs->s.ipv6.mask.src));
1836		rte_memcpy(&cs->s.ipv6.mask.dst, &tle_ipv6_none,
1837			sizeof(cs->s.ipv6.mask.dst));
1838	}
1839
1840	/* setup TCB */
1841	sync_fill_tcb(&cs->tcb, si, prm->syn.pkt);
1842	cs->tcb.rcv.wnd = cs->rx.q->prod.mask << cs->tcb.rcv.wscale;
1843
1844	/* setup stream notification menchanism */
1845	cs->rx.ev = prm->cfg.recv_ev;
1846	cs->rx.cb = prm->cfg.recv_cb;
1847	cs->tx.ev = prm->cfg.send_ev;
1848	cs->tx.cb = prm->cfg.send_cb;
1849	cs->err.ev = prm->cfg.err_ev;
1850	cs->err.cb = prm->cfg.err_cb;
1851
1852	/* store other params */
1853	cs->tcb.snd.nb_retm = (prm->cfg.nb_retries != 0) ? prm->cfg.nb_retries :
1854		TLE_TCP_DEFAULT_RETRIES;
1855
1856	/*
1857	 * estimate the rto
1858	 * for now rtt is calculated based on the tcp TMS option,
1859	 * later add real-time one
1860	 */
1861	if (cs->tcb.so.ts.ecr) {
1862		rtt = tms - cs->tcb.so.ts.ecr;
1863		rto_estimate(&cs->tcb, rtt);
1864	} else
1865		cs->tcb.snd.rto = TCP_RTO_DEFAULT;
1866
1867	tcp_stream_up(cs);
1868
1869	/* copy streams type. */
1870	cs->s.type = ps->s.type;
1871
1872	/* retrive and cache destination information. */
1873	rc = stream_fill_dest(cs);
1874	if (rc != 0)
1875		return rc;
1876
1877	/* update snd.mss with SMSS value */
1878	cs->tcb.snd.mss = calc_smss(cs->tcb.snd.mss, &cs->tx.dst);
1879
1880	/* setup congestion variables */
1881	cs->tcb.snd.cwnd = initial_cwnd(cs->tcb.snd.mss);
1882	cs->tcb.snd.ssthresh = cs->tcb.snd.wnd;
1883
1884	cs->tcb.state = TCP_ST_ESTABLISHED;
1885	cs->tcb.uop |= TCP_OP_ACCEPT;
1886
1887	/* add stream to the table */
1888	cs->ste = prm->syn.opaque;
1889	rte_smp_wmb();
1890	cs->ste->data = cs;
1891	return 0;
1892}
1893
1894/*
1895 * !!!
1896 * Right now new stream rcv.wnd is set to zero.
1897 * That simplifies handling of new connection establishment
1898 * (as no data segments could be received),
1899 * but has to be addressed.
1900 * possible ways:
1901 *  - send ack after accept creates new stream with new rcv.wnd value.
1902 *    the problem with that approach that single ack is not delivered
1903 *    reliably (could be lost), plus might slowdown connection establishment
1904 *    (extra packet per connection, that client has to wait for).
1905 *  - allocate new stream at ACK recieve stage.
1906 *    As a drawback - whole new stream allocation/connection establishment
1907 *    will be done in BE.
1908 * !!!
1909 */
1910int
1911tle_tcp_stream_accept(struct tle_stream *ts,
1912	const struct tle_tcp_accept_param prm[], struct tle_stream *rs[],
1913	uint32_t num)
1914{
1915	struct tle_tcp_stream *cs, *s;
1916	struct tle_ctx *ctx;
1917	uint32_t i, j, n, tms;
1918	int32_t rc;
1919	union pkt_info pi[num];
1920	union seg_info si[num];
1921
1922	tms = tcp_get_tms();
1923	s = TCP_STREAM(ts);
1924
1925	for (i = 0; i != num; i++)
1926		get_pkt_info(prm[i].syn.pkt, &pi[i], &si[i]);
1927
1928	/* mark stream as not closable */
1929	if (rwl_acquire(&s->rx.use) < 0)
1930		return -EINVAL;
1931
1932	ctx = s->s.ctx;
1933	n = get_streams(ctx, rs, num);
1934
1935	rc = 0;
1936	for (i = 0; i != n; i++) {
1937
1938		/* prepare new stream */
1939		cs = TCP_STREAM(rs[i]);
1940		rc = accept_fill_stream(s, cs, prm + i, tms, pi + i, si + i);
1941		if (rc != 0)
1942			break;
1943	}
1944
1945	rwl_release(&s->rx.use);
1946
1947	/* free 'SYN' mbufs. */
1948	for (j = 0; j != i; j++)
1949		rte_pktmbuf_free(prm[j].syn.pkt);
1950
1951	/* close failed stream, put unused streams back to the free list. */
1952	if (rc != 0) {
1953		tle_tcp_stream_close(rs[i]);
1954		for (j = i + 1; j != n; j++) {
1955			cs = TCP_STREAM(rs[j]);
1956			put_stream(ctx, rs[j], TCP_STREAM_TX_PENDING(cs));
1957		}
1958		rte_errno = -rc;
1959
1960	/* not enough streams are available */
1961	} else if (n != num)
1962		rte_errno = ENFILE;
1963
1964	return i;
1965}
1966
1967/*
1968 * !!! implement a proper one, or delete !!!
1969 * need to make sure no race conditions with add/lookup stream table.
1970 */
1971void
1972tle_tcp_reject(struct tle_stream *s, const struct tle_syn_req rq[],
1973	uint32_t num)
1974{
1975	uint32_t i;
1976	struct rte_mbuf *mb;
1977	struct stbl *st;
1978	union pkt_info pi;
1979	union seg_info si;
1980
1981	st = CTX_TCP_STLB(s->ctx);
1982
1983	for (i = 0; i != num; i++) {
1984		mb = rq[i].pkt;
1985		get_pkt_info(mb, &pi, &si);
1986		if (pi.tf.type < TLE_VNUM)
1987			stbl_del_pkt_lock(st, rq[i].opaque, &pi);
1988
1989		/* !!! send RST pkt to the peer !!! */
1990		rte_pktmbuf_free(mb);
1991	}
1992}
1993
1994uint16_t
1995tle_tcp_tx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], uint16_t num)
1996{
1997	uint32_t i, j, k, n;
1998	struct tle_drb *drb[num];
1999	struct tle_tcp_stream *s;
2000
2001	/* extract packets from device TX queue. */
2002
2003	k = num;
2004	n = tle_dring_sc_dequeue(&dev->tx.dr, (const void **)(uintptr_t)pkt,
2005		num, drb, &k);
2006
2007	if (n == 0)
2008		return 0;
2009
2010	/* free empty drbs and notify related streams. */
2011
2012	for (i = 0; i != k; i = j) {
2013		s = drb[i]->udata;
2014		for (j = i + 1; j != k && s == drb[j]->udata; j++)
2015			;
2016		stream_drb_free(s, drb + i, j - i);
2017	}
2018
2019	return n;
2020}
2021
2022static inline void
2023stream_fill_pkt_info(const struct tle_tcp_stream *s, union pkt_info *pi)
2024{
2025	if (s->s.type == TLE_V4)
2026		pi->addr4 = s->s.ipv4.addr;
2027	else
2028		pi->addr6 = &s->s.ipv6.addr;
2029
2030	pi->port = s->s.port;
2031	pi->tf.type = s->s.type;
2032}
2033
2034static int
2035stream_fill_addr(struct tle_tcp_stream *s, const struct sockaddr *addr)
2036{
2037	const struct sockaddr_in *in4;
2038	const struct sockaddr_in6 *in6;
2039	const struct tle_dev_param *prm;
2040	int32_t rc;
2041
2042	rc = 0;
2043	s->s.pmsk.raw = UINT32_MAX;
2044
2045	/* setup L4 src ports and src address fields. */
2046	if (s->s.type == TLE_V4) {
2047		in4 = (const struct sockaddr_in *)addr;
2048		if (in4->sin_addr.s_addr == INADDR_ANY || in4->sin_port == 0)
2049			return -EINVAL;
2050
2051		s->s.port.src = in4->sin_port;
2052		s->s.ipv4.addr.src = in4->sin_addr.s_addr;
2053		s->s.ipv4.mask.src = INADDR_NONE;
2054		s->s.ipv4.mask.dst = INADDR_NONE;
2055
2056	} else if (s->s.type == TLE_V6) {
2057		in6 = (const struct sockaddr_in6 *)addr;
2058		if (memcmp(&in6->sin6_addr, &tle_ipv6_any,
2059				sizeof(tle_ipv6_any)) == 0 ||
2060				in6->sin6_port == 0)
2061			return -EINVAL;
2062
2063		s->s.port.src = in6->sin6_port;
2064		rte_memcpy(&s->s.ipv6.addr.src, &in6->sin6_addr,
2065			sizeof(s->s.ipv6.addr.src));
2066		rte_memcpy(&s->s.ipv6.mask.src, &tle_ipv6_none,
2067			sizeof(s->s.ipv6.mask.src));
2068		rte_memcpy(&s->s.ipv6.mask.dst, &tle_ipv6_none,
2069			sizeof(s->s.ipv6.mask.dst));
2070	}
2071
2072	/* setup the destination device. */
2073	rc = stream_fill_dest(s);
2074	if (rc != 0)
2075		return rc;
2076
2077	/* setup L4 dst address from device param */
2078	prm = &s->tx.dst.dev->prm;
2079	if (s->s.type == TLE_V4) {
2080		if (s->s.ipv4.addr.dst == INADDR_ANY)
2081			s->s.ipv4.addr.dst = prm->local_addr4.s_addr;
2082	} else if (memcmp(&s->s.ipv6.addr.dst, &tle_ipv6_any,
2083			sizeof(tle_ipv6_any)) == 0)
2084		memcpy(&s->s.ipv6.addr.dst, &prm->local_addr6,
2085			sizeof(s->s.ipv6.addr.dst));
2086
2087	return rc;
2088}
2089
2090static inline int
2091tx_syn(struct tle_tcp_stream *s, const struct sockaddr *addr)
2092{
2093	int32_t rc;
2094	uint32_t tms, seq;
2095	union pkt_info pi;
2096	struct stbl *st;
2097	struct stbl_entry *se;
2098
2099	/* fill stream address */
2100	rc = stream_fill_addr(s, addr);
2101	if (rc != 0)
2102		return rc;
2103
2104	/* fill pkt info to generate seq.*/
2105	stream_fill_pkt_info(s, &pi);
2106
2107	tms = tcp_get_tms();
2108	s->tcb.so.ts.val = tms;
2109	s->tcb.so.ts.ecr = 0;
2110	s->tcb.so.wscale = TCP_WSCALE_DEFAULT;
2111	s->tcb.so.mss = calc_smss(s->tx.dst.mtu, &s->tx.dst);
2112
2113	/* note that rcv.nxt is 0 here for sync_gen_seq.*/
2114	seq = sync_gen_seq(&pi, s->tcb.rcv.nxt, tms, s->tcb.so.mss);
2115	s->tcb.snd.iss = seq;
2116	s->tcb.snd.rcvr = seq;
2117	s->tcb.snd.una = seq;
2118	s->tcb.snd.nxt = seq + 1;
2119	s->tcb.snd.rto = TCP_RTO_DEFAULT;
2120	s->tcb.snd.ts = tms;
2121
2122	s->tcb.rcv.mss = s->tcb.so.mss;
2123	s->tcb.rcv.wscale = TCP_WSCALE_DEFAULT;
2124	s->tcb.rcv.wnd = s->rx.q->prod.mask << s->tcb.rcv.wscale;
2125	s->tcb.rcv.ts = 0;
2126
2127	/* add the stream in stream table */
2128	st = CTX_TCP_STLB(s->s.ctx);
2129	se = stbl_add_stream_lock(st, s);
2130	if (se == NULL)
2131		return -ENOBUFS;
2132	s->ste = se;
2133
2134	/* put stream into the to-send queue */
2135	txs_enqueue(s->s.ctx, s);
2136
2137	return 0;
2138}
2139
2140int
2141tle_tcp_stream_connect(struct tle_stream *ts, const struct sockaddr *addr)
2142{
2143	struct tle_tcp_stream *s;
2144	uint32_t type;
2145	int32_t rc;
2146
2147	if (ts == NULL || addr == NULL)
2148		return -EINVAL;
2149
2150	s = TCP_STREAM(ts);
2151	type = s->s.type;
2152	if (type >= TLE_VNUM)
2153		return -EINVAL;
2154
2155	if (rwl_try_acquire(&s->tx.use) > 0) {
2156		rc = rte_atomic16_cmpset(&s->tcb.state, TCP_ST_CLOSED,
2157			TCP_ST_SYN_SENT);
2158		rc = (rc == 0) ? -EDEADLK : 0;
2159	} else
2160		rc = -EINVAL;
2161
2162	if (rc != 0) {
2163		rwl_release(&s->tx.use);
2164		return rc;
2165	}
2166
2167	/* fill stream, prepare and transmit syn pkt */
2168	s->tcb.uop |= TCP_OP_CONNECT;
2169	rc = tx_syn(s, addr);
2170	rwl_release(&s->tx.use);
2171
2172	/* error happened, do a cleanup */
2173	if (rc != 0)
2174		tle_tcp_stream_close(ts);
2175
2176	return rc;
2177}
2178
2179uint16_t
2180tle_tcp_stream_recv(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
2181{
2182	uint32_t n;
2183	struct tle_tcp_stream *s;
2184
2185	s = TCP_STREAM(ts);
2186	n = rte_ring_mc_dequeue_burst(s->rx.q, (void **)pkt, num);
2187	if (n == 0)
2188		return 0;
2189
2190	/*
2191	 * if we still have packets to read,
2192	 * then rearm stream RX event.
2193	 */
2194	if (n == num && rte_ring_count(s->rx.q) != 0) {
2195		if (rwl_try_acquire(&s->rx.use) > 0 && s->rx.ev != NULL)
2196			tle_event_raise(s->rx.ev);
2197		rwl_release(&s->rx.use);
2198	}
2199
2200	return n;
2201}
2202
2203uint16_t
2204tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
2205{
2206	uint32_t i, j, mss, n, state, type;
2207	uint64_t ol_flags;
2208	struct tle_tcp_stream *s;
2209	struct tle_dev *dev;
2210
2211	s = TCP_STREAM(ts);
2212
2213	/* mark stream as not closable. */
2214	if (rwl_acquire(&s->tx.use) < 0) {
2215		rte_errno = EAGAIN;
2216		return 0;
2217	}
2218
2219	state = s->tcb.state;
2220	if (state != TCP_ST_ESTABLISHED && state != TCP_ST_CLOSE_WAIT) {
2221		rte_errno = ENOTCONN;
2222		n = 0;
2223	} else {
2224		mss = s->tcb.snd.mss;
2225		dev = s->tx.dst.dev;
2226		type = s->s.type;
2227		ol_flags = dev->tx.ol_flags[type];
2228
2229		/* prepare and check for TX */
2230		for (i = 0; i != num; i++) {
2231
2232			/* !!! need to be modified !!! */
2233			if (pkt[i]->pkt_len > mss ||
2234					pkt[i]->nb_segs > TCP_MAX_PKT_SEG) {
2235				rte_errno = EBADMSG;
2236				break;
2237			} else if (tcp_fill_mbuf(pkt[i], s, &s->tx.dst,
2238					ol_flags, s->s.port, 0, TCP_FLAG_ACK,
2239					0, 0) != 0)
2240				break;
2241		}
2242
2243		/* queue packets for further transmision. */
2244		n = rte_ring_mp_enqueue_burst(s->tx.q, (void **)pkt, i);
2245
2246		/* notify BE about more data to send */
2247		if (n != 0)
2248			txs_enqueue(s->s.ctx, s);
2249
2250		/*
2251		 * for unsent, but already modified packets:
2252		 * remove pkt l2/l3 headers, restore ol_flags
2253		 */
2254		if (n != i) {
2255			ol_flags = ~dev->tx.ol_flags[type];
2256			for (j = n; j != i; j++) {
2257				rte_pktmbuf_adj(pkt[j], pkt[j]->l2_len +
2258					pkt[j]->l3_len + pkt[j]->l4_len);
2259				pkt[j]->ol_flags &= ol_flags;
2260			}
2261		/* if possible, rearm stream write event. */
2262		} else if (rte_ring_free_count(s->tx.q) != 0 &&
2263				s->tx.ev != NULL)
2264			tle_event_raise(s->tx.ev);
2265	}
2266
2267	rwl_release(&s->tx.use);
2268	return n;
2269}
2270
2271/* send data and FIN (if needed) */
2272static inline void
2273tx_data_fin(struct tle_tcp_stream *s, uint32_t tms, uint32_t state)
2274{
2275	/* try to send some data */
2276	tx_nxt_data(s, tms);
2277
2278	/* we also have to send a FIN */
2279	if (state != TCP_ST_ESTABLISHED &&
2280			state != TCP_ST_CLOSE_WAIT &&
2281			tcp_txq_nxt_cnt(s) == 0 &&
2282			s->tcb.snd.fss != s->tcb.snd.nxt) {
2283		s->tcb.snd.fss = ++s->tcb.snd.nxt;
2284		send_ack(s, tms, TCP_FLAG_FIN | TCP_FLAG_ACK);
2285	}
2286}
2287
2288static inline void
2289tx_stream(struct tle_tcp_stream *s, uint32_t tms)
2290{
2291	uint32_t state;
2292
2293	state = s->tcb.state;
2294
2295	if (state == TCP_ST_SYN_SENT) {
2296		/* send the SYN, start the rto timer */
2297		send_ack(s, tms, TCP_FLAG_SYN);
2298		timer_start(s);
2299
2300	} else if (state >= TCP_ST_ESTABLISHED && state <= TCP_ST_LAST_ACK) {
2301
2302		tx_data_fin(s, tms, state);
2303
2304		/* start RTO timer. */
2305		if (s->tcb.snd.nxt != s->tcb.snd.una)
2306			timer_start(s);
2307	}
2308}
2309
2310static inline void
2311rto_stream(struct tle_tcp_stream *s, uint32_t tms)
2312{
2313	uint32_t state;
2314
2315	state = s->tcb.state;
2316
2317	TCP_LOG(DEBUG, "%s(%p, tms=%u): state=%u, "
2318		"retx=%u, retm=%u, "
2319		"rto=%u, snd.ts=%u, tmo=%u, "
2320		"snd.nxt=%lu, snd.una=%lu, flight_size=%lu, "
2321		"snd.rcvr=%lu, snd.fastack=%u, "
2322		"wnd=%u, cwnd=%u, ssthresh=%u, "
2323		"bytes sent=%lu, pkt remain=%u;\n",
2324		__func__, s, tms, s->tcb.state,
2325		s->tcb.snd.nb_retx, s->tcb.snd.nb_retm,
2326		s->tcb.snd.rto, s->tcb.snd.ts, tms - s->tcb.snd.ts,
2327		s->tcb.snd.nxt, s->tcb.snd.una, s->tcb.snd.nxt - s->tcb.snd.una,
2328		s->tcb.snd.rcvr, s->tcb.snd.fastack,
2329		s->tcb.snd.wnd, s->tcb.snd.cwnd, s->tcb.snd.ssthresh,
2330		s->tcb.snd.nxt - s->tcb.snd.iss, tcp_txq_nxt_cnt(s));
2331
2332	if (s->tcb.snd.nb_retx < s->tcb.snd.nb_retm) {
2333
2334		if (state >= TCP_ST_ESTABLISHED && state <= TCP_ST_LAST_ACK) {
2335
2336			/* update SND.CWD and SND.SSTHRESH */
2337			rto_cwnd_update(&s->tcb);
2338
2339			/* RFC 6582 3.2.4 */
2340			s->tcb.snd.rcvr = s->tcb.snd.nxt;
2341			s->tcb.snd.fastack = 0;
2342
2343			/* restart from last acked data */
2344			tcp_txq_rst_nxt_head(s);
2345			s->tcb.snd.nxt = s->tcb.snd.una;
2346
2347			tx_data_fin(s, tms, state);
2348
2349		} else if (state == TCP_ST_SYN_SENT) {
2350			/* resending SYN */
2351			s->tcb.so.ts.val = tms;
2352			send_ack(s, tms, TCP_FLAG_SYN);
2353
2354		} else if (state == TCP_ST_TIME_WAIT) {
2355			stream_term(s);
2356		}
2357
2358		/* RFC6298:5.5 back off the timer */
2359		s->tcb.snd.rto = rto_roundup(2 * s->tcb.snd.rto);
2360		s->tcb.snd.nb_retx++;
2361		timer_restart(s);
2362
2363	} else {
2364		send_rst(s, s->tcb.snd.una);
2365		stream_term(s);
2366	}
2367}
2368
2369int
2370tle_tcp_process(struct tle_ctx *ctx, uint32_t num)
2371{
2372	uint32_t i, k, tms;
2373	struct sdr *dr;
2374	struct tle_timer_wheel *tw;
2375	struct tle_stream *p;
2376	struct tle_tcp_stream *s, *rs[num];
2377
2378	/* process streams with RTO exipred */
2379
2380	tw = CTX_TCP_TMWHL(ctx);
2381	tms = tcp_get_tms();
2382	tle_timer_expire(tw, tms);
2383
2384	k = tle_timer_get_expired_bulk(tw, (void **)rs, RTE_DIM(rs));
2385
2386	for (i = 0; i != k; i++) {
2387
2388		s = rs[i];
2389		s->timer.handle = NULL;
2390		if (rwl_try_acquire(&s->tx.use) > 0)
2391			rto_stream(s, tms);
2392		rwl_release(&s->tx.use);
2393	}
2394
2395	/* process streams from to-send queue */
2396
2397	k = txs_dequeue_bulk(ctx, rs, RTE_DIM(rs));
2398
2399	for (i = 0; i != k; i++) {
2400
2401		s = rs[i];
2402		if (rwl_try_acquire(&s->tx.use) > 0 &&
2403				rte_atomic32_read(&s->tx.arm) > 0) {
2404			rte_atomic32_set(&s->tx.arm, 0);
2405			tx_stream(s, tms);
2406		}
2407		rwl_release(&s->tx.use);
2408	}
2409
2410	/* collect streams to close from the death row */
2411
2412	dr = CTX_TCP_SDR(ctx);
2413	for (k = 0, p = STAILQ_FIRST(&dr->be);
2414			k != num && p != NULL;
2415			k++, p = STAILQ_NEXT(p, link))
2416		rs[k] = TCP_STREAM(p);
2417
2418	if (p == NULL)
2419		STAILQ_INIT(&dr->be);
2420	else
2421		STAILQ_FIRST(&dr->be) = p;
2422
2423	/* cleanup closed streams */
2424	for (i = 0; i != k; i++) {
2425		s = rs[i];
2426		tcp_stream_down(s);
2427		tcp_stream_reset(ctx, s);
2428	}
2429
2430	return 0;
2431}
2432