tcp_rxtx.c revision 7e18fa1b
1/*
2 * Copyright (c) 2016-2017  Intel Corporation.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#include <rte_errno.h>
17#include <rte_ethdev.h>
18#include <rte_ip.h>
19#include <rte_ip_frag.h>
20#include <rte_tcp.h>
21
22#include "tcp_stream.h"
23#include "tcp_timer.h"
24#include "stream_table.h"
25#include "syncookie.h"
26#include "misc.h"
27#include "tcp_ctl.h"
28#include "tcp_rxq.h"
29#include "tcp_txq.h"
30#include "tcp_tx_seg.h"
31
32#define	TCP_MAX_PKT_SEG	0x20
33
34/*
35 * checks if input TCP ports and IP addresses match given stream.
36 * returns zero on success.
37 */
38static inline int
39rx_check_stream(const struct tle_tcp_stream *s, const union pkt_info *pi)
40{
41	int32_t rc;
42
43	if (pi->tf.type == TLE_V4)
44		rc = (pi->port.raw & s->s.pmsk.raw) != s->s.port.raw ||
45			(pi->addr4.raw & s->s.ipv4.mask.raw) !=
46			s->s.ipv4.addr.raw;
47	else
48		rc = (pi->port.raw & s->s.pmsk.raw) != s->s.port.raw ||
49			ymm_mask_cmp(&pi->addr6->raw, &s->s.ipv6.addr.raw,
50			&s->s.ipv6.mask.raw) != 0;
51
52	return rc;
53}
54
55static inline struct tle_tcp_stream *
56rx_obtain_listen_stream(const struct tle_dev *dev, const union pkt_info *pi,
57	uint32_t type)
58{
59	struct tle_tcp_stream *s;
60
61	s = (struct tle_tcp_stream *)dev->dp[type]->streams[pi->port.dst];
62	if (s == NULL || tcp_stream_acquire(s) < 0)
63		return NULL;
64
65	/* check that we have a proper stream. */
66	if (s->tcb.state != TCP_ST_LISTEN) {
67		tcp_stream_release(s);
68		s = NULL;
69	}
70
71	return s;
72}
73
74static inline struct tle_tcp_stream *
75rx_obtain_stream(const struct tle_dev *dev, struct stbl *st,
76	const union pkt_info *pi, uint32_t type)
77{
78	struct tle_tcp_stream *s;
79
80	s = stbl_find_data(st, pi);
81	if (s == NULL) {
82		if (pi->tf.flags == TCP_FLAG_ACK)
83			return rx_obtain_listen_stream(dev, pi, type);
84		return NULL;
85	}
86
87	if (tcp_stream_acquire(s) < 0)
88		return NULL;
89	/* check that we have a proper stream. */
90	else if (s->tcb.state == TCP_ST_CLOSED) {
91		tcp_stream_release(s);
92		s = NULL;
93	}
94
95	return s;
96}
97
98/*
99 * Consider 2 pkt_info *equal* if their:
100 * - types (IPv4/IPv6)
101 * - TCP flags
102 * - checksum flags
103 * - TCP src and dst ports
104 * - IP src and dst addresses
105 * are equal.
106 */
107static inline int
108pkt_info_bulk_eq(const union pkt_info pi[], uint32_t num)
109{
110	uint32_t i;
111
112	i = 1;
113
114	if (pi[0].tf.type == TLE_V4) {
115		while (i != num && xmm_cmp(&pi[0].raw, &pi[i].raw) == 0)
116			i++;
117
118	} else if (pi[0].tf.type == TLE_V6) {
119		while (i != num &&
120				pi[0].raw.u64[0] == pi[i].raw.u64[0] &&
121				ymm_cmp(&pi[0].addr6->raw,
122				&pi[i].addr6->raw) == 0)
123			i++;
124	}
125
126	return i;
127}
128
129static inline int
130pkt_info_bulk_syneq(const union pkt_info pi[], uint32_t num)
131{
132	uint32_t i;
133
134	i = 1;
135
136	if (pi[0].tf.type == TLE_V4) {
137		while (i != num && pi[0].tf.raw == pi[i].tf.raw &&
138				pi[0].port.dst == pi[i].port.dst &&
139				pi[0].addr4.dst == pi[i].addr4.dst)
140			i++;
141
142	} else if (pi[0].tf.type == TLE_V6) {
143		while (i != num && pi[0].tf.raw == pi[i].tf.raw &&
144				pi[0].port.dst == pi[i].port.dst &&
145				xmm_cmp(&pi[0].addr6->dst,
146				&pi[i].addr6->dst) == 0)
147			i++;
148	}
149
150	return i;
151}
152
153static inline void
154stream_drb_free(struct tle_tcp_stream *s, struct tle_drb *drbs[],
155	uint32_t nb_drb)
156{
157	_rte_ring_enqueue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
158}
159
160static inline uint32_t
161stream_drb_alloc(struct tle_tcp_stream *s, struct tle_drb *drbs[],
162	uint32_t nb_drb)
163{
164	return _rte_ring_dequeue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
165}
166
167static inline uint32_t
168get_ip_pid(struct tle_dev *dev, uint32_t num, uint32_t type, uint32_t st)
169{
170	uint32_t pid;
171	rte_atomic32_t *pa;
172
173	pa = &dev->tx.packet_id[type];
174
175	if (st == 0) {
176		pid = rte_atomic32_add_return(pa, num);
177		return pid - num;
178	} else {
179		pid = rte_atomic32_read(pa);
180		rte_atomic32_set(pa, pid + num);
181		return pid;
182	}
183}
184
185static inline void
186fill_tcph(struct tcp_hdr *l4h, const struct tcb *tcb, union l4_ports port,
187	uint32_t seq, uint8_t hlen, uint8_t flags)
188{
189	uint16_t wnd;
190
191	l4h->src_port = port.dst;
192	l4h->dst_port = port.src;
193
194	wnd = (flags & TCP_FLAG_SYN) ?
195		RTE_MIN(tcb->rcv.wnd, (uint32_t)UINT16_MAX) :
196		tcb->rcv.wnd >> tcb->rcv.wscale;
197
198	/* ??? use sse shuffle to hton all remaining 16 bytes at once. ??? */
199	l4h->sent_seq = rte_cpu_to_be_32(seq);
200	l4h->recv_ack = rte_cpu_to_be_32(tcb->rcv.nxt);
201	l4h->data_off = hlen / TCP_DATA_ALIGN << TCP_DATA_OFFSET;
202	l4h->tcp_flags = flags;
203	l4h->rx_win = rte_cpu_to_be_16(wnd);
204	l4h->cksum = 0;
205	l4h->tcp_urp = 0;
206
207	if (flags & TCP_FLAG_SYN)
208		fill_syn_opts(l4h + 1, &tcb->so);
209	else if ((flags & TCP_FLAG_RST) == 0 && tcb->so.ts.raw != 0)
210		fill_tms_opts(l4h + 1, tcb->snd.ts, tcb->rcv.ts);
211}
212
213static inline int
214tcp_fill_mbuf(struct rte_mbuf *m, const struct tle_tcp_stream *s,
215	const struct tle_dest *dst, uint64_t ol_flags,
216	union l4_ports port, uint32_t seq, uint32_t flags,
217	uint32_t pid, uint32_t swcsm)
218{
219	uint32_t l4, len, plen;
220	struct tcp_hdr *l4h;
221	char *l2h;
222
223	len = dst->l2_len + dst->l3_len;
224	plen = m->pkt_len;
225
226	if (flags & TCP_FLAG_SYN)
227		l4 = sizeof(*l4h) + TCP_TX_OPT_LEN_MAX;
228	else if ((flags & TCP_FLAG_RST) == 0 && s->tcb.rcv.ts != 0)
229		l4 = sizeof(*l4h) + TCP_TX_OPT_LEN_TMS;
230	else
231		l4 = sizeof(*l4h);
232
233	/* adjust mbuf to put L2/L3/L4 headers into it. */
234	l2h = rte_pktmbuf_prepend(m, len + l4);
235	if (l2h == NULL)
236		return -EINVAL;
237
238	/* copy L2/L3 header */
239	rte_memcpy(l2h, dst->hdr, len);
240
241	/* setup TCP header & options */
242	l4h = (struct tcp_hdr *)(l2h + len);
243	fill_tcph(l4h, &s->tcb, port, seq, l4, flags);
244
245	/* setup mbuf TX offload related fields. */
246	m->tx_offload = _mbuf_tx_offload(dst->l2_len, dst->l3_len, l4, 0, 0, 0);
247	m->ol_flags |= ol_flags;
248
249	/* update proto specific fields. */
250
251	if (s->s.type == TLE_V4) {
252		struct ipv4_hdr *l3h;
253		l3h = (struct ipv4_hdr *)(l2h + dst->l2_len);
254		l3h->packet_id = rte_cpu_to_be_16(pid);
255		l3h->total_length = rte_cpu_to_be_16(plen + dst->l3_len + l4);
256
257		if ((ol_flags & PKT_TX_TCP_CKSUM) != 0)
258			l4h->cksum = _ipv4x_phdr_cksum(l3h, m->l3_len,
259				ol_flags);
260		else if (swcsm != 0)
261			l4h->cksum = _ipv4_udptcp_mbuf_cksum(m, len, l3h);
262
263		if ((ol_flags & PKT_TX_IP_CKSUM) == 0 && swcsm != 0)
264			l3h->hdr_checksum = _ipv4x_cksum(l3h, m->l3_len);
265	} else {
266		struct ipv6_hdr *l3h;
267		l3h = (struct ipv6_hdr *)(l2h + dst->l2_len);
268		l3h->payload_len = rte_cpu_to_be_16(plen + l4);
269		if ((ol_flags & PKT_TX_TCP_CKSUM) != 0)
270			l4h->cksum = rte_ipv6_phdr_cksum(l3h, ol_flags);
271		else if (swcsm != 0)
272			l4h->cksum = _ipv6_udptcp_mbuf_cksum(m, len, l3h);
273	}
274
275	return 0;
276}
277
278/*
279 * That function supposed to be used only for data packets.
280 * Assumes that L2/L3/L4 headers and mbuf fields already setup properly.
281 *  - updates tcp SEG.SEQ, SEG.ACK, TS.VAL, TS.ECR.
282 *  - if no HW cksum offloads are enabled, calculates TCP checksum.
283 */
284static inline void
285tcp_update_mbuf(struct rte_mbuf *m, uint32_t type, const struct tcb *tcb,
286	uint32_t seq, uint32_t pid)
287{
288	struct tcp_hdr *l4h;
289	uint32_t len;
290
291	len = m->l2_len + m->l3_len;
292	l4h = rte_pktmbuf_mtod_offset(m, struct tcp_hdr *, len);
293
294	l4h->sent_seq = rte_cpu_to_be_32(seq);
295	l4h->recv_ack = rte_cpu_to_be_32(tcb->rcv.nxt);
296
297	if (tcb->so.ts.raw != 0)
298		fill_tms_opts(l4h + 1, tcb->snd.ts, tcb->rcv.ts);
299
300	if (type == TLE_V4) {
301		struct ipv4_hdr *l3h;
302		l3h = rte_pktmbuf_mtod_offset(m, struct ipv4_hdr *, m->l2_len);
303		l3h->hdr_checksum = 0;
304		l3h->packet_id = rte_cpu_to_be_16(pid);
305		if ((m->ol_flags & PKT_TX_IP_CKSUM) == 0)
306			l3h->hdr_checksum = _ipv4x_cksum(l3h, m->l3_len);
307	}
308
309	/* have to calculate TCP checksum in SW */
310	if ((m->ol_flags & PKT_TX_TCP_CKSUM) == 0) {
311
312		l4h->cksum = 0;
313
314		if (type == TLE_V4) {
315			struct ipv4_hdr *l3h;
316			l3h = rte_pktmbuf_mtod_offset(m, struct ipv4_hdr *,
317				m->l2_len);
318			l4h->cksum = _ipv4_udptcp_mbuf_cksum(m, len, l3h);
319
320		} else {
321			struct ipv6_hdr *l3h;
322			l3h = rte_pktmbuf_mtod_offset(m, struct ipv6_hdr *,
323				m->l2_len);
324			l4h->cksum = _ipv6_udptcp_mbuf_cksum(m, len, l3h);
325		}
326	}
327}
328
329/* Send data packets that need to be ACK-ed by peer */
330static inline uint32_t
331tx_data_pkts(struct tle_tcp_stream *s, struct rte_mbuf *const m[], uint32_t num)
332{
333	uint32_t bsz, i, nb, nbm;
334	struct tle_dev *dev;
335	struct tle_drb *drb[num];
336
337	/* calculate how many drbs are needed.*/
338	bsz = s->tx.drb.nb_elem;
339	nbm = (num + bsz - 1) / bsz;
340
341	/* allocate drbs, adjust number of packets. */
342	nb = stream_drb_alloc(s, drb, nbm);
343
344	/* drb ring is empty. */
345	if (nb == 0)
346		return 0;
347
348	else if (nb != nbm)
349		num = nb * bsz;
350
351	dev = s->tx.dst.dev;
352
353	/* enqueue pkts for TX. */
354	nbm = nb;
355	i = tle_dring_mp_enqueue(&dev->tx.dr, (const void * const*)m,
356		num, drb, &nb);
357
358	/* free unused drbs. */
359	if (nb != 0)
360		stream_drb_free(s, drb + nbm - nb, nb);
361
362	return i;
363}
364
365static inline uint32_t
366tx_data_bulk(struct tle_tcp_stream *s, union seqlen *sl, struct rte_mbuf *mi[],
367	uint32_t num)
368{
369	uint32_t fail, i, k, n, mss, pid, plen, sz, tn, type;
370	struct tle_dev *dev;
371	struct rte_mbuf *mb;
372	struct rte_mbuf *mo[MAX_PKT_BURST + TCP_MAX_PKT_SEG];
373
374	mss = s->tcb.snd.mss;
375	type = s->s.type;
376
377	dev = s->tx.dst.dev;
378	pid = get_ip_pid(dev, num, type, (s->flags & TLE_CTX_FLAG_ST) != 0);
379
380	k = 0;
381	tn = 0;
382	fail = 0;
383	for (i = 0; i != num && sl->len != 0 && fail == 0; i++) {
384
385		mb = mi[i];
386		sz = RTE_MIN(sl->len, mss);
387		plen = PKT_L4_PLEN(mb);
388
389		/*fast path, no need to use indirect mbufs. */
390		if (plen <= sz) {
391
392			/* update pkt TCP header */
393			tcp_update_mbuf(mb, type, &s->tcb, sl->seq, pid + i);
394
395			/* keep mbuf till ACK is received. */
396			rte_pktmbuf_refcnt_update(mb, 1);
397			sl->len -= plen;
398			sl->seq += plen;
399			mo[k++] = mb;
400		/* remaining snd.wnd is less them MSS, send nothing */
401		} else if (sz < mss)
402			break;
403		/* packet indirection needed */
404		else
405			RTE_VERIFY(0);
406
407		if (k >= MAX_PKT_BURST) {
408			n = tx_data_pkts(s, mo, k);
409			fail = k - n;
410			tn += n;
411			k = 0;
412		}
413	}
414
415	if (k != 0) {
416		n = tx_data_pkts(s, mo, k);
417		fail = k - n;
418		tn += n;
419	}
420
421	if (fail != 0) {
422		sz = tcp_mbuf_seq_free(mo + n, fail);
423		sl->seq -= sz;
424		sl->len += sz;
425	}
426
427	return tn;
428}
429
430/*
431 * gets data from stream send buffer, updates it and
432 * queues it into TX device queue.
433 * Note that this function and is not MT safe.
434 */
435static inline uint32_t
436tx_nxt_data(struct tle_tcp_stream *s, uint32_t tms)
437{
438	uint32_t n, num, tn, wnd;
439	struct rte_mbuf **mi;
440	union seqlen sl;
441
442	tn = 0;
443	wnd = s->tcb.snd.wnd - (uint32_t)(s->tcb.snd.nxt - s->tcb.snd.una);
444	sl.seq = s->tcb.snd.nxt;
445	sl.len = RTE_MIN(wnd, s->tcb.snd.cwnd);
446
447	if (sl.len == 0)
448		return tn;
449
450	/* update send timestamp */
451	s->tcb.snd.ts = tms;
452
453	do {
454		/* get group of packets */
455		mi = tcp_txq_get_nxt_objs(s, &num);
456
457		/* stream send buffer is empty */
458		if (num == 0)
459			break;
460
461		/* queue data packets for TX */
462		n = tx_data_bulk(s, &sl, mi, num);
463		tn += n;
464
465		/* update consumer head */
466		tcp_txq_set_nxt_head(s, n);
467	} while (n == num);
468
469	s->tcb.snd.nxt += sl.seq - (uint32_t)s->tcb.snd.nxt;
470	return tn;
471}
472
473static inline void
474free_una_data(struct tle_tcp_stream *s, uint32_t len)
475{
476	uint32_t i, n, num, plen;
477	struct rte_mbuf **mi;
478
479	n = 0;
480	plen = 0;
481
482	do {
483		/* get group of packets */
484		mi = tcp_txq_get_una_objs(s, &num);
485
486		if (num == 0)
487			break;
488
489		/* free acked data */
490		for (i = 0; i != num && n != len; i++, n = plen) {
491			plen += PKT_L4_PLEN(mi[i]);
492			if (plen > len) {
493				/* keep SND.UNA at the start of the packet */
494				len -= RTE_MIN(len, plen - len);
495				break;
496			}
497			rte_pktmbuf_free(mi[i]);
498		}
499
500		/* update consumer tail */
501		tcp_txq_set_una_tail(s, i);
502	} while (plen < len);
503
504	s->tcb.snd.una += len;
505
506	/*
507	 * that could happen in case of retransmit,
508	 * adjust SND.NXT with SND.UNA.
509	 */
510	if (s->tcb.snd.una > s->tcb.snd.nxt) {
511		tcp_txq_rst_nxt_head(s);
512		s->tcb.snd.nxt = s->tcb.snd.una;
513	}
514}
515
516static inline uint16_t
517calc_smss(uint16_t mss, const struct tle_dest *dst)
518{
519	uint16_t n;
520
521	n = dst->mtu - dst->l2_len - dst->l3_len - TCP_TX_HDR_DACK;
522	mss = RTE_MIN(n, mss);
523	return mss;
524}
525
526/*
527 * RFC 6928 2
528 * min (10*MSS, max (2*MSS, 14600))
529 *
530 * or using user provided initial congestion window (icw)
531 * min (10*MSS, max (2*MSS, icw))
532 */
533static inline uint32_t
534initial_cwnd(uint32_t smss, uint32_t icw)
535{
536	return RTE_MIN(10 * smss, RTE_MAX(2 * smss, icw));
537}
538
539/*
540 * queue standalone packet to he particular output device
541 * It assumes that:
542 * - L2/L3/L4 headers should be already set.
543 * - packet fits into one segment.
544 */
545static inline int
546send_pkt(struct tle_tcp_stream *s, struct tle_dev *dev, struct rte_mbuf *m)
547{
548	uint32_t n, nb;
549	struct tle_drb *drb;
550
551	if (stream_drb_alloc(s, &drb, 1) == 0)
552		return -ENOBUFS;
553
554	/* enqueue pkt for TX. */
555	nb = 1;
556	n = tle_dring_mp_enqueue(&dev->tx.dr, (const void * const*)&m, 1,
557		&drb, &nb);
558
559	/* free unused drbs. */
560	if (nb != 0)
561		stream_drb_free(s, &drb, 1);
562
563	return (n == 1) ? 0 : -ENOBUFS;
564}
565
566static inline int
567send_ctrl_pkt(struct tle_tcp_stream *s, struct rte_mbuf *m, uint32_t seq,
568	uint32_t flags)
569{
570	const struct tle_dest *dst;
571	uint32_t pid, type;
572	int32_t rc;
573
574	dst = &s->tx.dst;
575	type = s->s.type;
576	pid = get_ip_pid(dst->dev, 1, type, (s->flags & TLE_CTX_FLAG_ST) != 0);
577
578	rc = tcp_fill_mbuf(m, s, dst, 0, s->s.port, seq, flags, pid, 1);
579	if (rc == 0)
580		rc = send_pkt(s, dst->dev, m);
581
582	return rc;
583}
584
585static inline int
586send_rst(struct tle_tcp_stream *s, uint32_t seq)
587{
588	struct rte_mbuf *m;
589	int32_t rc;
590
591	m = rte_pktmbuf_alloc(s->tx.dst.head_mp);
592	if (m == NULL)
593		return -ENOMEM;
594
595	rc = send_ctrl_pkt(s, m, seq, TCP_FLAG_RST);
596	if (rc != 0)
597		rte_pktmbuf_free(m);
598
599	return rc;
600}
601
602static inline int
603send_ack(struct tle_tcp_stream *s, uint32_t tms, uint32_t flags)
604{
605	struct rte_mbuf *m;
606	uint32_t seq;
607	int32_t rc;
608
609	m = rte_pktmbuf_alloc(s->tx.dst.head_mp);
610	if (m == NULL)
611		return -ENOMEM;
612
613	seq = s->tcb.snd.nxt - ((flags & (TCP_FLAG_FIN | TCP_FLAG_SYN)) != 0);
614	s->tcb.snd.ts = tms;
615
616	rc = send_ctrl_pkt(s, m, seq, flags);
617	if (rc != 0) {
618		rte_pktmbuf_free(m);
619		return rc;
620	}
621
622	s->tcb.snd.ack = s->tcb.rcv.nxt;
623	return 0;
624}
625
626
627static int
628sync_ack(struct tle_tcp_stream *s, const union pkt_info *pi,
629	const union seg_info *si, uint32_t ts, struct rte_mbuf *m)
630{
631	uint16_t len;
632	int32_t rc;
633	uint32_t pid, seq, type;
634	struct tle_dev *dev;
635	const void *da;
636	struct tle_dest dst;
637	const struct tcp_hdr *th;
638
639	type = s->s.type;
640
641	/* get destination information. */
642	if (type == TLE_V4)
643		da = &pi->addr4.src;
644	else
645		da = &pi->addr6->src;
646
647	rc = stream_get_dest(&s->s, da, &dst);
648	if (rc < 0)
649		return rc;
650
651	th = rte_pktmbuf_mtod_offset(m, const struct tcp_hdr *,
652		m->l2_len + m->l3_len);
653	get_syn_opts(&s->tcb.so, (uintptr_t)(th + 1), m->l4_len - sizeof(*th));
654
655	s->tcb.rcv.nxt = si->seq + 1;
656	seq = sync_gen_seq(pi, s->tcb.rcv.nxt, ts, s->tcb.so.mss,
657				s->s.ctx->prm.hash_alg,
658				&s->s.ctx->prm.secret_key);
659	s->tcb.so.ts.ecr = s->tcb.so.ts.val;
660	s->tcb.so.ts.val = sync_gen_ts(ts, s->tcb.so.wscale);
661	s->tcb.so.wscale = (s->tcb.so.wscale == TCP_WSCALE_NONE) ?
662		TCP_WSCALE_NONE : TCP_WSCALE_DEFAULT;
663	s->tcb.so.mss = calc_smss(dst.mtu, &dst);
664
665	/* reset mbuf's data contents. */
666	len = m->l2_len + m->l3_len + m->l4_len;
667	m->tx_offload = 0;
668	if (rte_pktmbuf_adj(m, len) == NULL)
669		return -EINVAL;
670
671	dev = dst.dev;
672	pid = get_ip_pid(dev, 1, type, (s->flags & TLE_CTX_FLAG_ST) != 0);
673
674	rc = tcp_fill_mbuf(m, s, &dst, 0, pi->port, seq,
675		TCP_FLAG_SYN | TCP_FLAG_ACK, pid, 1);
676	if (rc == 0)
677		rc = send_pkt(s, dev, m);
678
679	return rc;
680}
681
682/*
683 * RFC 793:
684 * There are four cases for the acceptability test for an incoming segment:
685 * Segment Receive  Test
686 * Length  Window
687 * ------- -------  -------------------------------------------
688 *    0       0     SEG.SEQ = RCV.NXT
689 *    0      >0     RCV.NXT =< SEG.SEQ < RCV.NXT+RCV.WND
690 *   >0       0     not acceptable
691 *   >0      >0     RCV.NXT =< SEG.SEQ < RCV.NXT+RCV.WND
692 *                  or RCV.NXT =< SEG.SEQ+SEG.LEN-1 < RCV.NXT+RCV.WND
693 */
694static inline int
695check_seqn(const struct tcb *tcb, uint32_t seqn, uint32_t len)
696{
697	uint32_t n;
698
699	n = seqn + len;
700	if (seqn - tcb->rcv.nxt >= tcb->rcv.wnd &&
701			n - tcb->rcv.nxt > tcb->rcv.wnd)
702		return -ERANGE;
703
704	return 0;
705}
706
707static inline union tsopt
708rx_tms_opt(const struct tcb *tcb, const struct rte_mbuf *mb)
709{
710	union tsopt ts;
711	uintptr_t opt;
712	const struct tcp_hdr *th;
713
714	if (tcb->so.ts.val != 0) {
715		opt = rte_pktmbuf_mtod_offset(mb, uintptr_t,
716			mb->l2_len + mb->l3_len + sizeof(*th));
717		ts = get_tms_opts(opt, mb->l4_len - sizeof(*th));
718	} else
719		ts.raw = 0;
720
721	return ts;
722}
723
724/*
725 * PAWS and sequence check.
726 * RFC 1323 4.2.1
727 */
728static inline int
729rx_check_seq(struct tcb *tcb, uint32_t seq, uint32_t len, const union tsopt ts)
730{
731	int32_t rc;
732
733	/* RFC 1323 4.2.1 R2 */
734	rc = check_seqn(tcb, seq, len);
735	if (rc < 0)
736		return rc;
737
738	if (ts.raw != 0) {
739
740		/* RFC 1323 4.2.1 R1 */
741		if (tcp_seq_lt(ts.val, tcb->rcv.ts))
742			return -ERANGE;
743
744		/* RFC 1323 4.2.1 R3 */
745		if (tcp_seq_leq(seq, tcb->snd.ack) &&
746				tcp_seq_lt(tcb->snd.ack, seq + len))
747			tcb->rcv.ts = ts.val;
748	}
749
750	return rc;
751}
752
753static inline int
754rx_check_ack(const struct tcb *tcb, uint32_t ack)
755{
756	uint32_t max;
757
758	max = (uint32_t)RTE_MAX(tcb->snd.nxt, tcb->snd.rcvr);
759
760	if (tcp_seq_leq(tcb->snd.una, ack) && tcp_seq_leq(ack, max))
761		return 0;
762
763	return -ERANGE;
764}
765
766static inline int
767rx_check_seqack(struct tcb *tcb, uint32_t seq, uint32_t ack, uint32_t len,
768	const union tsopt ts)
769{
770	int32_t rc;
771
772	rc = rx_check_seq(tcb, seq, len, ts);
773	rc |= rx_check_ack(tcb, ack);
774	return rc;
775}
776
777static inline int
778restore_syn_opt(union seg_info *si, union tsopt *to,
779	const union pkt_info *pi, uint32_t ts, const struct rte_mbuf *mb,
780	uint32_t hash_alg, rte_xmm_t *secret_key)
781{
782	int32_t rc;
783	uint32_t len;
784	const struct tcp_hdr *th;
785
786	/* check that ACK, etc fields are what we expected. */
787	rc = sync_check_ack(pi, si->seq, si->ack - 1, ts,
788				hash_alg,
789				secret_key);
790	if (rc < 0)
791		return rc;
792
793	si->mss = rc;
794
795	th = rte_pktmbuf_mtod_offset(mb, const struct tcp_hdr *,
796		mb->l2_len + mb->l3_len);
797	len = mb->l4_len - sizeof(*th);
798	to[0] = get_tms_opts((uintptr_t)(th + 1), len);
799	return 0;
800}
801
802static inline void
803stream_term(struct tle_tcp_stream *s)
804{
805	struct sdr *dr;
806
807	s->tcb.state = TCP_ST_CLOSED;
808	rte_smp_wmb();
809
810	timer_stop(s);
811
812	/* close() was already invoked, schedule final cleanup */
813	if ((s->tcb.uop & TCP_OP_CLOSE) != 0) {
814
815		dr = CTX_TCP_SDR(s->s.ctx);
816		STAILQ_INSERT_TAIL(&dr->be, &s->s, link);
817
818	/* notify user that stream need to be closed */
819	} else if (s->err.ev != NULL)
820		tle_event_raise(s->err.ev);
821	else if (s->err.cb.func != NULL)
822		s->err.cb.func(s->err.cb.data, &s->s);
823}
824
825static inline int
826stream_fill_dest(struct tle_tcp_stream *s)
827{
828	int32_t rc;
829	uint32_t type;
830	const void *da;
831
832        type = s->s.type;
833	if (type == TLE_V4)
834		da = &s->s.ipv4.addr.src;
835	else
836		da = &s->s.ipv6.addr.src;
837
838	rc = stream_get_dest(&s->s, da, &s->tx.dst);
839	return (rc < 0) ? rc : 0;
840}
841
842/*
843 * helper function, prepares a new accept stream.
844 */
845static inline int
846accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st,
847	struct tle_tcp_stream *cs, const union tsopt *to,
848	uint32_t tms, const union pkt_info *pi, const union seg_info *si)
849{
850	int32_t rc;
851	uint32_t rtt;
852
853	/* some TX still pending for that stream. */
854	if (TCP_STREAM_TX_PENDING(cs))
855		return -EAGAIN;
856
857	/* setup L4 ports and L3 addresses fields. */
858	cs->s.port.raw = pi->port.raw;
859	cs->s.pmsk.raw = UINT32_MAX;
860
861	if (pi->tf.type == TLE_V4) {
862		cs->s.ipv4.addr = pi->addr4;
863		cs->s.ipv4.mask.src = INADDR_NONE;
864		cs->s.ipv4.mask.dst = INADDR_NONE;
865	} else if (pi->tf.type == TLE_V6) {
866		cs->s.ipv6.addr = *pi->addr6;
867		rte_memcpy(&cs->s.ipv6.mask.src, &tle_ipv6_none,
868			sizeof(cs->s.ipv6.mask.src));
869		rte_memcpy(&cs->s.ipv6.mask.dst, &tle_ipv6_none,
870			sizeof(cs->s.ipv6.mask.dst));
871	}
872
873	/* setup TCB */
874	sync_fill_tcb(&cs->tcb, si, to);
875	cs->tcb.rcv.wnd = calc_rx_wnd(cs, cs->tcb.rcv.wscale);
876
877	/*
878	 * estimate the rto
879	 * for now rtt is calculated based on the tcp TMS option,
880	 * later add real-time one
881	 */
882	if (cs->tcb.so.ts.ecr) {
883		rtt = tms - cs->tcb.so.ts.ecr;
884		rto_estimate(&cs->tcb, rtt);
885	} else
886		cs->tcb.snd.rto = TCP_RTO_DEFAULT;
887
888	/* copy streams type & flags. */
889	cs->s.type = ps->s.type;
890	cs->flags = ps->flags;
891
892	/* retrive and cache destination information. */
893	rc = stream_fill_dest(cs);
894	if (rc != 0)
895		return rc;
896
897	/* update snd.mss with SMSS value */
898	cs->tcb.snd.mss = calc_smss(cs->tcb.snd.mss, &cs->tx.dst);
899
900	/* setup congestion variables */
901	cs->tcb.snd.cwnd = initial_cwnd(cs->tcb.snd.mss, ps->tcb.snd.cwnd);
902	cs->tcb.snd.ssthresh = cs->tcb.snd.wnd;
903	cs->tcb.snd.rto_tw = ps->tcb.snd.rto_tw;
904
905	cs->tcb.state = TCP_ST_ESTABLISHED;
906
907	/* add stream to the table */
908	cs->ste = stbl_add_stream(st, pi, cs);
909	if (cs->ste == NULL)
910		return -ENOBUFS;
911
912	cs->tcb.uop |= TCP_OP_ACCEPT;
913	tcp_stream_up(cs);
914	return 0;
915}
916
917
918/*
919 * ACK for new connection request arrived.
920 * Check that the packet meets all conditions and try to open a new stream.
921 * returns:
922 * < 0  - invalid packet
923 * == 0 - packet is valid and new stream was opened for it.
924 * > 0  - packet is valid, but failed to open new stream.
925 */
926static inline int
927rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st,
928	const union pkt_info *pi, union seg_info *si,
929	uint32_t tms, struct rte_mbuf *mb, struct tle_tcp_stream **csp)
930{
931	int32_t rc;
932	struct tle_ctx *ctx;
933	struct tle_stream *ts;
934	struct tle_tcp_stream *cs;
935	union tsopt to;
936
937	*csp = NULL;
938
939	if (pi->tf.flags != TCP_FLAG_ACK || rx_check_stream(s, pi) != 0)
940		return -EINVAL;
941
942	ctx = s->s.ctx;
943	rc = restore_syn_opt(si, &to, pi, tms, mb, ctx->prm.hash_alg,
944				&ctx->prm.secret_key);
945	if (rc < 0)
946		return rc;
947
948	/* allocate new stream */
949	ts = get_stream(ctx);
950	cs = TCP_STREAM(ts);
951	if (ts == NULL)
952		return ENFILE;
953
954	/* prepare stream to handle new connection */
955	if (accept_prep_stream(s, st, cs, &to, tms, pi, si) == 0) {
956
957		/* put new stream in the accept queue */
958		if (_rte_ring_enqueue_burst(s->rx.q,
959				(void * const *)&ts, 1) == 1) {
960			*csp = cs;
961			return 0;
962		}
963
964		/* cleanup on failure */
965		tcp_stream_down(cs);
966		stbl_del_stream(st, cs->ste, cs, 0);
967		cs->ste = NULL;
968	}
969
970	tcp_stream_reset(ctx, cs);
971	return ENOBUFS;
972}
973
974static inline int
975data_pkt_adjust(const struct tcb *tcb, struct rte_mbuf *mb, uint32_t hlen,
976	uint32_t *seqn, uint32_t *plen)
977{
978	uint32_t len, n, seq;
979
980	seq = *seqn;
981	len = *plen;
982
983	rte_pktmbuf_adj(mb, hlen);
984	if (len == 0)
985		return -ENODATA;
986	/* cut off the start of the packet */
987	else if (tcp_seq_lt(seq, tcb->rcv.nxt)) {
988		n = tcb->rcv.nxt - seq;
989		if (n >= len)
990			return -ENODATA;
991
992		rte_pktmbuf_adj(mb, n);
993		*seqn = seq + n;
994		*plen = len - n;
995	}
996
997	return 0;
998}
999
1000static inline uint32_t
1001rx_ackdata(struct tle_tcp_stream *s, uint32_t ack)
1002{
1003	uint32_t k, n;
1004
1005	n = ack - (uint32_t)s->tcb.snd.una;
1006
1007	/* some more data was acked. */
1008	if (n != 0) {
1009
1010		/* advance SND.UNA and free related packets. */
1011		k = rte_ring_free_count(s->tx.q);
1012		free_una_data(s, n);
1013
1014		/* mark the stream as available for writing */
1015		if (rte_ring_free_count(s->tx.q) != 0) {
1016			if (s->tx.ev != NULL)
1017				tle_event_raise(s->tx.ev);
1018			else if (k == 0 && s->tx.cb.func != NULL)
1019				s->tx.cb.func(s->tx.cb.data, &s->s);
1020		}
1021	}
1022
1023	return n;
1024}
1025
1026static void
1027stream_timewait(struct tle_tcp_stream *s, uint32_t rto)
1028{
1029	if (rto != 0) {
1030		s->tcb.state = TCP_ST_TIME_WAIT;
1031		s->tcb.snd.rto = rto;
1032		timer_reset(s);
1033	} else
1034		stream_term(s);
1035}
1036
1037static void
1038rx_fin_state(struct tle_tcp_stream *s, struct resp_info *rsp)
1039{
1040	uint32_t state;
1041	int32_t ackfin;
1042
1043	s->tcb.rcv.nxt += 1;
1044
1045	ackfin = (s->tcb.snd.una == s->tcb.snd.fss);
1046	state = s->tcb.state;
1047
1048	if (state == TCP_ST_ESTABLISHED) {
1049		s->tcb.state = TCP_ST_CLOSE_WAIT;
1050		/* raise err.ev & err.cb */
1051		if (s->err.ev != NULL)
1052			tle_event_raise(s->err.ev);
1053		else if (s->err.cb.func != NULL)
1054			s->err.cb.func(s->err.cb.data, &s->s);
1055	} else if (state == TCP_ST_FIN_WAIT_1 || state == TCP_ST_CLOSING) {
1056		rsp->flags |= TCP_FLAG_ACK;
1057		if (ackfin != 0)
1058			stream_timewait(s, s->tcb.snd.rto_tw);
1059		else
1060			s->tcb.state = TCP_ST_CLOSING;
1061	} else if (state == TCP_ST_FIN_WAIT_2) {
1062		rsp->flags |= TCP_FLAG_ACK;
1063		stream_timewait(s, s->tcb.snd.rto_tw);
1064	} else if (state == TCP_ST_LAST_ACK && ackfin != 0) {
1065		stream_term(s);
1066	}
1067}
1068
1069/*
1070 * FIN process for ESTABLISHED state
1071 * returns:
1072 * 0 < - error occurred
1073 * 0 - FIN was processed OK, and mbuf can be free/reused.
1074 * 0 > - FIN was processed OK and mbuf can't be free/reused.
1075 */
1076static inline int
1077rx_fin(struct tle_tcp_stream *s, uint32_t state,
1078	const union seg_info *si, struct rte_mbuf *mb,
1079	struct resp_info *rsp)
1080{
1081	uint32_t hlen, plen, seq;
1082	int32_t ret;
1083	union tsopt ts;
1084
1085	hlen = PKT_L234_HLEN(mb);
1086	plen = mb->pkt_len - hlen;
1087	seq = si->seq;
1088
1089	ts = rx_tms_opt(&s->tcb, mb);
1090	ret = rx_check_seqack(&s->tcb, seq, si->ack, plen, ts);
1091	if (ret != 0)
1092		return ret;
1093
1094	if (state < TCP_ST_ESTABLISHED)
1095		return -EINVAL;
1096
1097	if (plen != 0) {
1098
1099		ret = data_pkt_adjust(&s->tcb, mb, hlen, &seq, &plen);
1100		if (ret != 0)
1101			return ret;
1102		if (rx_data_enqueue(s, seq, plen, &mb, 1) != 1)
1103			return -ENOBUFS;
1104	}
1105
1106	/*
1107	 * fast-path: all data & FIN was already sent out
1108	 * and now is acknowledged.
1109	 */
1110	if (s->tcb.snd.fss == s->tcb.snd.nxt &&
1111			si->ack == (uint32_t)s->tcb.snd.nxt) {
1112		s->tcb.snd.una = s->tcb.snd.fss;
1113		empty_tq(s);
1114	/* conventional ACK processiing */
1115	} else
1116		rx_ackdata(s, si->ack);
1117
1118	/* some fragments still missing */
1119	if (seq + plen != s->tcb.rcv.nxt) {
1120		s->tcb.rcv.frs.seq = seq + plen;
1121		s->tcb.rcv.frs.on = 1;
1122	} else
1123		rx_fin_state(s, rsp);
1124
1125	return plen;
1126}
1127
1128static inline int
1129rx_rst(struct tle_tcp_stream *s, uint32_t state, uint32_t flags,
1130	const union seg_info *si)
1131{
1132	int32_t rc;
1133
1134	/*
1135	 * RFC 793: In all states except SYN-SENT, all reset (RST) segments
1136	 * are validated by checking their SEQ-fields.
1137	 * A reset is valid if its sequence number is in the window.
1138	 * In the SYN-SENT state (a RST received in response to an initial SYN),
1139	 * the RST is acceptable if the ACK field acknowledges the SYN.
1140	 */
1141	if (state == TCP_ST_SYN_SENT) {
1142		rc = ((flags & TCP_FLAG_ACK) == 0 ||
1143				si->ack != s->tcb.snd.nxt) ?
1144			-ERANGE : 0;
1145	}
1146
1147	else
1148		rc = check_seqn(&s->tcb, si->seq, 0);
1149
1150	if (rc == 0)
1151		stream_term(s);
1152
1153	return rc;
1154}
1155
1156/*
1157 *  check do we have FIN  that was received out-of-order.
1158 *  if yes, try to process it now.
1159 */
1160static inline void
1161rx_ofo_fin(struct tle_tcp_stream *s, struct resp_info *rsp)
1162{
1163	if (s->tcb.rcv.frs.on != 0 && s->tcb.rcv.nxt == s->tcb.rcv.frs.seq)
1164		rx_fin_state(s, rsp);
1165}
1166
1167static inline void
1168dack_info_init(struct dack_info *tack, const struct tcb *tcb)
1169{
1170	static const struct dack_info zero_dack;
1171
1172	tack[0] = zero_dack;
1173	tack->ack = tcb->snd.una;
1174	tack->segs.dup = tcb->rcv.dupack;
1175	tack->wu.raw = tcb->snd.wu.raw;
1176	tack->wnd = tcb->snd.wnd >> tcb->snd.wscale;
1177}
1178
1179static inline void
1180ack_window_update(struct tcb *tcb, const struct dack_info *tack)
1181{
1182	tcb->snd.wu.raw = tack->wu.raw;
1183	tcb->snd.wnd = tack->wnd << tcb->snd.wscale;
1184}
1185
1186static inline void
1187ack_cwnd_update(struct tcb *tcb, uint32_t acked, const struct dack_info *tack)
1188{
1189	uint32_t n;
1190
1191	n = tack->segs.ack * tcb->snd.mss;
1192
1193	/* slow start phase, RFC 5681 3.1 (2)  */
1194	if (tcb->snd.cwnd < tcb->snd.ssthresh)
1195		tcb->snd.cwnd += RTE_MIN(acked, n);
1196	/* congestion avoidance phase, RFC 5681 3.1 (3) */
1197	else
1198		tcb->snd.cwnd += RTE_MAX(1U, n * tcb->snd.mss / tcb->snd.cwnd);
1199}
1200
1201static inline void
1202rto_ssthresh_update(struct tcb *tcb)
1203{
1204	uint32_t k, n;
1205
1206	/* RFC 5681 3.1 (4)  */
1207	n = (tcb->snd.nxt - tcb->snd.una) / 2;
1208	k = 2 * tcb->snd.mss;
1209	tcb->snd.ssthresh = RTE_MAX(n, k);
1210}
1211
1212static inline void
1213rto_cwnd_update(struct tcb *tcb)
1214{
1215
1216	if (tcb->snd.nb_retx == 0)
1217		rto_ssthresh_update(tcb);
1218
1219	/*
1220	 * RFC 5681 3.1: upon a timeout cwnd MUST be set to
1221	 * no more than 1 full-sized segment.
1222	 */
1223	tcb->snd.cwnd = tcb->snd.mss;
1224}
1225
1226static inline void
1227ack_info_update(struct dack_info *tack, const union seg_info *si,
1228	int32_t badseq, uint32_t dlen, const union tsopt ts)
1229{
1230	if (badseq != 0) {
1231		tack->segs.badseq++;
1232		return;
1233	}
1234
1235	/* segnt with incoming data */
1236	tack->segs.data += (dlen != 0);
1237
1238	/* segment with newly acked data */
1239	if (tcp_seq_lt(tack->ack, si->ack)) {
1240		tack->segs.dup = 0;
1241		tack->segs.ack++;
1242		tack->ack = si->ack;
1243		tack->ts = ts;
1244
1245	/*
1246	 * RFC 5681: An acknowledgment is considered a "duplicate" when:
1247	 * (a) the receiver of the ACK has outstanding data
1248	 * (b) the incoming acknowledgment carries no data
1249	 * (c) the SYN and FIN bits are both off
1250	 * (d) the acknowledgment number is equal to the TCP.UNA
1251	 * (e) the advertised window in the incoming acknowledgment equals the
1252	 * advertised window in the last incoming acknowledgment.
1253	 *
1254	 * Here will have only to check only for (b),(d),(e).
1255	 * (a) will be checked later for the whole bulk of packets,
1256	 * (c) should never happen here.
1257	 */
1258	} else if (dlen == 0 && si->wnd == tack->wnd && ++tack->segs.dup == 3) {
1259		tack->dup3.seg = tack->segs.ack + 1;
1260		tack->dup3.ack = tack->ack;
1261	}
1262
1263	/*
1264	 * RFC 793:
1265	 * If SND.UNA < SEG.ACK =< SND.NXT, the send window should be
1266	 * updated.  If (SND.WL1 < SEG.SEQ or (SND.WL1 = SEG.SEQ and
1267	 * SND.WL2 =< SEG.ACK)), set SND.WND <- SEG.WND, set
1268	 * SND.WL1 <- SEG.SEQ, and set SND.WL2 <- SEG.ACK.
1269	 */
1270	if (tcp_seq_lt(tack->wu.wl1, si->seq) ||
1271			(si->seq == tack->wu.wl1 &&
1272			tcp_seq_leq(tack->wu.wl2, si->ack))) {
1273
1274		tack->wu.wl1 = si->seq;
1275		tack->wu.wl2 = si->ack;
1276		tack->wnd = si->wnd;
1277	}
1278}
1279
1280static inline uint32_t
1281rx_data_ack(struct tle_tcp_stream *s, struct dack_info *tack,
1282	const union seg_info si[], struct rte_mbuf *mb[], struct rte_mbuf *rp[],
1283	int32_t rc[], uint32_t num)
1284{
1285	uint32_t i, j, k, n, t;
1286	uint32_t hlen, plen, seq, tlen;
1287	int32_t ret;
1288	union tsopt ts;
1289
1290	k = 0;
1291	for (i = 0; i != num; i = j) {
1292
1293		hlen = PKT_L234_HLEN(mb[i]);
1294		plen = mb[i]->pkt_len - hlen;
1295		seq = si[i].seq;
1296
1297		ts = rx_tms_opt(&s->tcb, mb[i]);
1298		ret = rx_check_seqack(&s->tcb, seq, si[i].ack, plen, ts);
1299
1300		/* account segment received */
1301		ack_info_update(tack, &si[i], ret != 0, plen, ts);
1302
1303		if (ret == 0) {
1304			/* skip duplicate data, if any */
1305			ret = data_pkt_adjust(&s->tcb, mb[i], hlen,
1306				&seq, &plen);
1307		}
1308
1309		j = i + 1;
1310		if (ret != 0) {
1311			rp[k] = mb[i];
1312			rc[k] = -ret;
1313			k++;
1314			continue;
1315		}
1316
1317		/* group sequential packets together. */
1318		for (tlen = plen; j != num; tlen += plen, j++) {
1319
1320			hlen = PKT_L234_HLEN(mb[j]);
1321			plen = mb[j]->pkt_len - hlen;
1322
1323			/* not consecutive packet */
1324			if (plen == 0 || seq + tlen != si[j].seq)
1325				break;
1326
1327			/* check SEQ/ACK */
1328			ts = rx_tms_opt(&s->tcb, mb[j]);
1329			ret = rx_check_seqack(&s->tcb, si[j].seq, si[j].ack,
1330				plen, ts);
1331
1332			/* account for segment received */
1333			ack_info_update(tack, &si[j], ret != 0, plen, ts);
1334
1335			if (ret != 0) {
1336				rp[k] = mb[j];
1337				rc[k] = -ret;
1338				k++;
1339				break;
1340			}
1341			rte_pktmbuf_adj(mb[j], hlen);
1342		}
1343
1344		n = j - i;
1345		j += (ret != 0);
1346
1347		/* account for OFO data */
1348		if (seq != s->tcb.rcv.nxt)
1349			tack->segs.ofo += n;
1350
1351		/* enqueue packets */
1352		t = rx_data_enqueue(s, seq, tlen, mb + i, n);
1353
1354		/* if we are out of space in stream recv buffer. */
1355		for (; t != n; t++) {
1356			rp[k] = mb[i + t];
1357			rc[k] = -ENOBUFS;
1358			k++;
1359		}
1360	}
1361
1362	return num - k;
1363}
1364
1365static inline void
1366start_fast_retransmit(struct tle_tcp_stream *s)
1367{
1368	struct tcb *tcb;
1369
1370	tcb = &s->tcb;
1371
1372	/* RFC 6582 3.2.2 */
1373	tcb->snd.rcvr = tcb->snd.nxt;
1374	tcb->snd.fastack = 1;
1375
1376	/* RFC 5681 3.2.2 */
1377	rto_ssthresh_update(tcb);
1378
1379	/* RFC 5681 3.2.3 */
1380	tcp_txq_rst_nxt_head(s);
1381	tcb->snd.nxt = tcb->snd.una;
1382	tcb->snd.cwnd = tcb->snd.ssthresh + 3 * tcb->snd.mss;
1383}
1384
1385static inline void
1386stop_fast_retransmit(struct tle_tcp_stream *s)
1387{
1388	struct tcb *tcb;
1389	uint32_t n;
1390
1391	tcb = &s->tcb;
1392	n = tcb->snd.nxt - tcb->snd.una;
1393	tcb->snd.cwnd = RTE_MIN(tcb->snd.ssthresh,
1394		RTE_MAX(n, tcb->snd.mss) + tcb->snd.mss);
1395	tcb->snd.fastack = 0;
1396}
1397
1398static inline int
1399in_fast_retransmit(struct tle_tcp_stream *s, uint32_t ack_len, uint32_t ack_num,
1400	uint32_t dup_num)
1401{
1402	uint32_t n;
1403	struct tcb *tcb;
1404
1405	tcb = &s->tcb;
1406
1407	/* RFC 5682 3.2.3 partial ACK */
1408	if (ack_len != 0) {
1409
1410		n = ack_num * tcb->snd.mss;
1411		if (ack_len >= n)
1412			tcb->snd.cwnd -= ack_len - n;
1413		else
1414			tcb->snd.cwnd -= ack_len % tcb->snd.mss;
1415
1416		/*
1417		 * For the first partial ACK that arrives
1418		 * during fast recovery, also reset the
1419		 * retransmit timer.
1420		 */
1421		if (tcb->snd.fastack == 1)
1422			timer_reset(s);
1423
1424		tcb->snd.fastack += ack_num;
1425		return 1;
1426
1427	/* RFC 5681 3.2.4 */
1428	} else if (dup_num > 3) {
1429		s->tcb.snd.cwnd += (dup_num - 3) * tcb->snd.mss;
1430		return 1;
1431	}
1432
1433	return 0;
1434}
1435
1436static inline int
1437process_ack(struct tle_tcp_stream *s, uint32_t acked,
1438	const struct dack_info *tack)
1439{
1440	int32_t send;
1441
1442	send = 0;
1443
1444	/* normal mode */
1445	if (s->tcb.snd.fastack == 0) {
1446
1447		send = 1;
1448
1449		/* RFC 6582 3.2.2 switch to fast retransmit mode */
1450		if (tack->dup3.seg != 0 && s->tcb.snd.una != s->tcb.snd.nxt &&
1451				s->tcb.snd.una >= s->tcb.snd.rcvr) {
1452
1453			start_fast_retransmit(s);
1454			in_fast_retransmit(s,
1455				tack->ack - tack->dup3.ack,
1456				tack->segs.ack - tack->dup3.seg - 1,
1457				tack->segs.dup);
1458
1459		/* remain in normal mode */
1460		} else if (acked != 0) {
1461			ack_cwnd_update(&s->tcb, acked, tack);
1462			timer_stop(s);
1463		}
1464
1465	/* fast retransmit mode */
1466	} else {
1467
1468		/* remain in fast retransmit mode */
1469		if (s->tcb.snd.una < s->tcb.snd.rcvr) {
1470
1471			send = in_fast_retransmit(s, acked, tack->segs.ack,
1472				tack->segs.dup);
1473		} else {
1474			/* RFC 5682 3.2.3 full ACK */
1475			stop_fast_retransmit(s);
1476			timer_stop(s);
1477
1478			/* if we have another series of dup ACKs */
1479			if (tack->dup3.seg != 0 &&
1480					s->tcb.snd.una != s->tcb.snd.nxt &&
1481					tcp_seq_leq((uint32_t)s->tcb.snd.rcvr,
1482					tack->dup3.ack)) {
1483
1484				/* restart fast retransmit again. */
1485				start_fast_retransmit(s);
1486				send = in_fast_retransmit(s,
1487					tack->ack - tack->dup3.ack,
1488					tack->segs.ack - tack->dup3.seg - 1,
1489					tack->segs.dup);
1490			}
1491		}
1492	}
1493
1494	return send;
1495}
1496
1497/*
1498 * our FIN was acked, stop rto timer, change stream state,
1499 * and possibly close the stream.
1500 */
1501static inline void
1502rx_ackfin(struct tle_tcp_stream *s)
1503{
1504	uint32_t state;
1505
1506	s->tcb.snd.una = s->tcb.snd.fss;
1507	empty_tq(s);
1508
1509	state = s->tcb.state;
1510	if (state == TCP_ST_LAST_ACK)
1511		stream_term(s);
1512	else if (state == TCP_ST_FIN_WAIT_1) {
1513		timer_stop(s);
1514		s->tcb.state = TCP_ST_FIN_WAIT_2;
1515	} else if (state == TCP_ST_CLOSING) {
1516		stream_timewait(s, s->tcb.snd.rto_tw);
1517	}
1518}
1519
1520static inline void
1521rx_process_ack(struct tle_tcp_stream *s, uint32_t ts,
1522	const struct dack_info *tack)
1523{
1524	int32_t send;
1525	uint32_t n;
1526
1527	s->tcb.rcv.dupack = tack->segs.dup;
1528
1529	n = rx_ackdata(s, tack->ack);
1530	send = process_ack(s, n, tack);
1531
1532	/* try to send more data. */
1533	if ((n != 0 || send != 0) && tcp_txq_nxt_cnt(s) != 0)
1534		txs_enqueue(s->s.ctx, s);
1535
1536	/* restart RTO timer. */
1537	if (s->tcb.snd.nxt != s->tcb.snd.una)
1538		timer_start(s);
1539
1540	/* update rto, if fresh packet is here then calculate rtt */
1541	if (tack->ts.ecr != 0)
1542		rto_estimate(&s->tcb, ts - tack->ts.ecr);
1543}
1544
1545/*
1546 * process <SYN,ACK>
1547 * returns negative value on failure, or zero on success.
1548 */
1549static inline int
1550rx_synack(struct tle_tcp_stream *s, uint32_t ts, uint32_t state,
1551	const union seg_info *si, struct rte_mbuf *mb,
1552	struct resp_info *rsp)
1553{
1554	struct syn_opts so;
1555	struct tcp_hdr *th;
1556
1557	if (state != TCP_ST_SYN_SENT)
1558		return -EINVAL;
1559
1560	/* invalid SEG.SEQ */
1561	if (si->ack != (uint32_t)s->tcb.snd.nxt) {
1562		rsp->flags = TCP_FLAG_RST;
1563		return 0;
1564	}
1565
1566	th = rte_pktmbuf_mtod_offset(mb, struct tcp_hdr *,
1567		mb->l2_len + mb->l3_len);
1568	get_syn_opts(&so, (uintptr_t)(th + 1), mb->l4_len - sizeof(*th));
1569
1570	s->tcb.so = so;
1571
1572	s->tcb.snd.una = s->tcb.snd.nxt;
1573	s->tcb.snd.mss = calc_smss(so.mss, &s->tx.dst);
1574	s->tcb.snd.wnd = si->wnd << so.wscale;
1575	s->tcb.snd.wu.wl1 = si->seq;
1576	s->tcb.snd.wu.wl2 = si->ack;
1577	s->tcb.snd.wscale = so.wscale;
1578
1579	/* setup congestion variables */
1580	s->tcb.snd.cwnd = initial_cwnd(s->tcb.snd.mss, s->tcb.snd.cwnd);
1581	s->tcb.snd.ssthresh = s->tcb.snd.wnd;
1582
1583	s->tcb.rcv.ts = so.ts.val;
1584	s->tcb.rcv.irs = si->seq;
1585	s->tcb.rcv.nxt = si->seq + 1;
1586
1587	/* if peer doesn't support WSCALE opt, recalculate RCV.WND */
1588	s->tcb.rcv.wscale = (so.wscale == TCP_WSCALE_NONE) ?
1589		TCP_WSCALE_NONE : TCP_WSCALE_DEFAULT;
1590	s->tcb.rcv.wnd = calc_rx_wnd(s, s->tcb.rcv.wscale);
1591
1592	/* calculate initial rto */
1593	rto_estimate(&s->tcb, ts - s->tcb.snd.ts);
1594
1595	rsp->flags |= TCP_FLAG_ACK;
1596
1597	timer_stop(s);
1598	s->tcb.state = TCP_ST_ESTABLISHED;
1599	rte_smp_wmb();
1600
1601	if (s->tx.ev != NULL)
1602		tle_event_raise(s->tx.ev);
1603	else if (s->tx.cb.func != NULL)
1604		s->tx.cb.func(s->tx.cb.data, &s->s);
1605
1606	return 0;
1607}
1608
1609static inline uint32_t
1610rx_stream(struct tle_tcp_stream *s, uint32_t ts,
1611	const union pkt_info *pi, const union seg_info si[],
1612	struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1613	uint32_t num)
1614{
1615	uint32_t i, k, n, state;
1616	int32_t ret;
1617	struct resp_info rsp;
1618	struct dack_info tack;
1619
1620	k = 0;
1621	rsp.flags = 0;
1622
1623	state = s->tcb.state;
1624
1625	/*
1626	 * first check for the states/flags where we don't
1627	 * expect groups of packets.
1628	 */
1629
1630	/* process RST */
1631	if ((pi->tf.flags & TCP_FLAG_RST) != 0) {
1632		for (i = 0;
1633				i != num &&
1634				rx_rst(s, state, pi->tf.flags, &si[i]);
1635				i++)
1636			;
1637		i = 0;
1638
1639	/* RFC 793: if the ACK bit is off drop the segment and return */
1640	} else if ((pi->tf.flags & TCP_FLAG_ACK) == 0) {
1641		i = 0;
1642	/*
1643	 * first check for the states/flags where we don't
1644	 * expect groups of packets.
1645	 */
1646
1647	/* process <SYN,ACK> */
1648	} else if ((pi->tf.flags & TCP_FLAG_SYN) != 0) {
1649		ret = 0;
1650		for (i = 0; i != num; i++) {
1651			ret = rx_synack(s, ts, state, &si[i], mb[i], &rsp);
1652			if (ret == 0)
1653				break;
1654
1655			rc[k] = -ret;
1656			rp[k] = mb[i];
1657			k++;
1658		}
1659
1660	/* process FIN */
1661	} else if ((pi->tf.flags & TCP_FLAG_FIN) != 0) {
1662		ret = 0;
1663		for (i = 0; i != num; i++) {
1664			ret = rx_fin(s, state, &si[i], mb[i], &rsp);
1665			if (ret >= 0)
1666				break;
1667
1668			rc[k] = -ret;
1669			rp[k] = mb[i];
1670			k++;
1671		}
1672		i += (ret > 0);
1673
1674	/* normal data/ack packets */
1675	} else if (state >= TCP_ST_ESTABLISHED && state <= TCP_ST_LAST_ACK) {
1676
1677		/* process incoming data packets. */
1678		dack_info_init(&tack, &s->tcb);
1679		n = rx_data_ack(s, &tack, si, mb, rp, rc, num);
1680
1681		/* follow up actions based on aggregated information */
1682
1683		/* update SND.WND */
1684		ack_window_update(&s->tcb, &tack);
1685
1686		/*
1687		 * fast-path: all data & FIN was already sent out
1688		 * and now is acknowledged.
1689		 */
1690		if (s->tcb.snd.fss == s->tcb.snd.nxt &&
1691				tack.ack == (uint32_t)s->tcb.snd.nxt)
1692			rx_ackfin(s);
1693		else
1694			rx_process_ack(s, ts, &tack);
1695
1696		/*
1697		 * send an immediate ACK if either:
1698		 * - received segment with invalid seq/ack number
1699		 * - received segment with OFO data
1700		 * - received segment with INO data and no TX is scheduled
1701		 *   for that stream.
1702		 */
1703		if (tack.segs.badseq != 0 || tack.segs.ofo != 0 ||
1704				(tack.segs.data != 0 &&
1705				rte_atomic32_read(&s->tx.arm) == 0))
1706			rsp.flags |= TCP_FLAG_ACK;
1707
1708		rx_ofo_fin(s, &rsp);
1709
1710		k += num - n;
1711		i = num;
1712
1713	/* unhandled state, drop all packets. */
1714	} else
1715		i = 0;
1716
1717	/* we have a response packet to send. */
1718	if (rsp.flags == TCP_FLAG_RST) {
1719		send_rst(s, si[i].ack);
1720		stream_term(s);
1721	} else if (rsp.flags != 0) {
1722		send_ack(s, ts, rsp.flags);
1723
1724		/* start the timer for FIN packet */
1725		if ((rsp.flags & TCP_FLAG_FIN) != 0)
1726			timer_reset(s);
1727	}
1728
1729	/* unprocessed packets */
1730	for (; i != num; i++, k++) {
1731		rc[k] = EINVAL;
1732		rp[k] = mb[i];
1733	}
1734
1735	return num - k;
1736}
1737
1738static inline uint32_t
1739rx_new_stream(struct tle_tcp_stream *s, uint32_t ts,
1740	const union pkt_info *pi, const union seg_info si[],
1741	struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1742	uint32_t num)
1743{
1744	uint32_t i;
1745
1746	if (tcp_stream_acquire(s) > 0) {
1747		i = rx_stream(s, ts, pi, si, mb, rp, rc, num);
1748		tcp_stream_release(s);
1749		return i;
1750	}
1751
1752	for (i = 0; i != num; i++) {
1753		rc[i] = ENOENT;
1754		rp[i] = mb[i];
1755	}
1756	return 0;
1757}
1758
1759static inline uint32_t
1760rx_postsyn(struct tle_dev *dev, struct stbl *st, uint32_t type, uint32_t ts,
1761	const union pkt_info pi[], union seg_info si[],
1762	struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1763	uint32_t num)
1764{
1765	struct tle_tcp_stream *cs, *s;
1766	uint32_t i, k, n, state;
1767	int32_t ret;
1768
1769	s = rx_obtain_stream(dev, st, &pi[0], type);
1770	if (s == NULL) {
1771		for (i = 0; i != num; i++) {
1772			rc[i] = ENOENT;
1773			rp[i] = mb[i];
1774		}
1775		return 0;
1776	}
1777
1778	k = 0;
1779	state = s->tcb.state;
1780
1781	if (state == TCP_ST_LISTEN) {
1782
1783		/* one connection per flow */
1784		cs = NULL;
1785		ret = -EINVAL;
1786		for (i = 0; i != num; i++) {
1787
1788			ret = rx_ack_listen(s, st, pi, &si[i], ts, mb[i], &cs);
1789
1790			/* valid packet encountered */
1791			if (ret >= 0)
1792				break;
1793
1794			/* invalid packet, keep trying to find a proper one */
1795			rc[k] = -ret;
1796			rp[k] = mb[i];
1797			k++;
1798		}
1799
1800		/* packet is valid, but we are out of streams to serve it */
1801		if (ret > 0) {
1802			for (; i != num; i++, k++) {
1803				rc[k] = ret;
1804				rp[k] = mb[i];
1805			}
1806		/* new stream is accepted */
1807		} else if (ret == 0) {
1808
1809			/* inform listen stream about new connections */
1810			if (s->rx.ev != NULL)
1811				tle_event_raise(s->rx.ev);
1812			else if (s->rx.cb.func != NULL &&
1813					rte_ring_count(s->rx.q) == 1)
1814				s->rx.cb.func(s->rx.cb.data, &s->s);
1815
1816			/* if there is no data, drop current packet */
1817			if (PKT_L4_PLEN(mb[i]) == 0) {
1818				rc[k] = ENODATA;
1819				rp[k++] = mb[i++];
1820			}
1821
1822			/*  process remaining packets for that stream */
1823			if (num != i) {
1824				n = rx_new_stream(cs, ts, pi + i, si + i,
1825					mb + i, rp + k, rc + k, num - i);
1826				k += num - n - i;
1827			}
1828		}
1829
1830	} else {
1831		i = rx_stream(s, ts, pi, si, mb, rp, rc, num);
1832		k = num - i;
1833	}
1834
1835	tcp_stream_release(s);
1836	return num - k;
1837}
1838
1839
1840static inline uint32_t
1841rx_syn(struct tle_dev *dev, uint32_t type, uint32_t ts,
1842	const union pkt_info pi[], const union seg_info si[],
1843	struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1844	uint32_t num)
1845{
1846	struct tle_tcp_stream *s;
1847	uint32_t i, k;
1848	int32_t ret;
1849
1850	s = rx_obtain_listen_stream(dev, &pi[0], type);
1851	if (s == NULL) {
1852		for (i = 0; i != num; i++) {
1853			rc[i] = ENOENT;
1854			rp[i] = mb[i];
1855		}
1856		return 0;
1857	}
1858
1859	k = 0;
1860	for (i = 0; i != num; i++) {
1861
1862		/* check that this remote is allowed to connect */
1863		if (rx_check_stream(s, &pi[i]) != 0)
1864			ret = -ENOENT;
1865		else
1866			/* syncokie: reply with <SYN,ACK> */
1867			ret = sync_ack(s, &pi[i], &si[i], ts, mb[i]);
1868
1869		if (ret != 0) {
1870			rc[k] = -ret;
1871			rp[k] = mb[i];
1872			k++;
1873		}
1874	}
1875
1876	tcp_stream_release(s);
1877	return num - k;
1878}
1879
1880uint16_t
1881tle_tcp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[],
1882	struct rte_mbuf *rp[], int32_t rc[], uint16_t num)
1883{
1884	struct stbl *st;
1885	struct tle_ctx *ctx;
1886	uint32_t i, j, k, mt, n, t, ts;
1887	uint64_t csf;
1888	union pkt_info pi[num];
1889	union seg_info si[num];
1890	union {
1891		uint8_t t[TLE_VNUM];
1892		uint32_t raw;
1893	} stu;
1894
1895	ctx = dev->ctx;
1896	ts = tcp_get_tms(ctx->cycles_ms_shift);
1897	st = CTX_TCP_STLB(ctx);
1898	mt = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0);
1899
1900	stu.raw = 0;
1901
1902	/* extract packet info and check the L3/L4 csums */
1903	for (i = 0; i != num; i++) {
1904
1905		get_pkt_info(pkt[i], &pi[i], &si[i]);
1906
1907		t = pi[i].tf.type;
1908		csf = dev->rx.ol_flags[t] &
1909			(PKT_RX_IP_CKSUM_BAD | PKT_RX_L4_CKSUM_BAD);
1910
1911		/* check csums in SW */
1912		if (pi[i].csf == 0 && csf != 0 && check_pkt_csum(pkt[i], csf,
1913				pi[i].tf.type, IPPROTO_TCP) != 0)
1914			pi[i].csf = csf;
1915
1916		stu.t[t] = mt;
1917	}
1918
1919	if (stu.t[TLE_V4] != 0)
1920		stbl_lock(st, TLE_V4);
1921	if (stu.t[TLE_V6] != 0)
1922		stbl_lock(st, TLE_V6);
1923
1924	k = 0;
1925	for (i = 0; i != num; i += j) {
1926
1927		t = pi[i].tf.type;
1928
1929		/*basic checks for incoming packet */
1930		if (t >= TLE_VNUM || pi[i].csf != 0 || dev->dp[t] == NULL) {
1931			rc[k] = EINVAL;
1932			rp[k] = pkt[i];
1933			j = 1;
1934			k++;
1935		/* process input SYN packets */
1936		} else if (pi[i].tf.flags == TCP_FLAG_SYN) {
1937			j = pkt_info_bulk_syneq(pi + i, num - i);
1938			n = rx_syn(dev, t, ts, pi + i, si + i, pkt + i,
1939				rp + k, rc + k, j);
1940			k += j - n;
1941		} else {
1942			j = pkt_info_bulk_eq(pi + i, num - i);
1943			n = rx_postsyn(dev, st, t, ts, pi + i, si + i, pkt + i,
1944				rp + k, rc + k, j);
1945			k += j - n;
1946		}
1947	}
1948
1949	if (stu.t[TLE_V4] != 0)
1950		stbl_unlock(st, TLE_V4);
1951	if (stu.t[TLE_V6] != 0)
1952		stbl_unlock(st, TLE_V6);
1953
1954	return num - k;
1955}
1956
1957uint16_t
1958tle_tcp_stream_accept(struct tle_stream *ts, struct tle_stream *rs[],
1959	uint32_t num)
1960{
1961	uint32_t n;
1962	struct tle_tcp_stream *s;
1963
1964	s = TCP_STREAM(ts);
1965	n = _rte_ring_dequeue_burst(s->rx.q, (void **)rs, num);
1966	if (n == 0)
1967		return 0;
1968
1969	/*
1970	 * if we still have packets to read,
1971	 * then rearm stream RX event.
1972	 */
1973	if (n == num && rte_ring_count(s->rx.q) != 0) {
1974		if (tcp_stream_try_acquire(s) > 0 && s->rx.ev != NULL)
1975			tle_event_raise(s->rx.ev);
1976		tcp_stream_release(s);
1977	}
1978
1979	return n;
1980}
1981
1982uint16_t
1983tle_tcp_tx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], uint16_t num)
1984{
1985	uint32_t i, j, k, n;
1986	struct tle_drb *drb[num];
1987	struct tle_tcp_stream *s;
1988
1989	/* extract packets from device TX queue. */
1990
1991	k = num;
1992	n = tle_dring_sc_dequeue(&dev->tx.dr, (const void **)(uintptr_t)pkt,
1993		num, drb, &k);
1994
1995	if (n == 0)
1996		return 0;
1997
1998	/* free empty drbs and notify related streams. */
1999
2000	for (i = 0; i != k; i = j) {
2001		s = drb[i]->udata;
2002		for (j = i + 1; j != k && s == drb[j]->udata; j++)
2003			;
2004		stream_drb_free(s, drb + i, j - i);
2005	}
2006
2007	return n;
2008}
2009
2010static inline void
2011stream_fill_pkt_info(const struct tle_tcp_stream *s, union pkt_info *pi)
2012{
2013	if (s->s.type == TLE_V4)
2014		pi->addr4 = s->s.ipv4.addr;
2015	else
2016		pi->addr6 = &s->s.ipv6.addr;
2017
2018	pi->port = s->s.port;
2019	pi->tf.type = s->s.type;
2020}
2021
2022static int
2023stream_fill_addr(struct tle_tcp_stream *s, const struct sockaddr *addr)
2024{
2025	const struct sockaddr_in *in4;
2026	const struct sockaddr_in6 *in6;
2027	const struct tle_dev_param *prm;
2028	int32_t rc;
2029
2030	rc = 0;
2031	s->s.pmsk.raw = UINT32_MAX;
2032
2033	/* setup L4 src ports and src address fields. */
2034	if (s->s.type == TLE_V4) {
2035		in4 = (const struct sockaddr_in *)addr;
2036		if (in4->sin_addr.s_addr == INADDR_ANY || in4->sin_port == 0)
2037			return -EINVAL;
2038
2039		s->s.port.src = in4->sin_port;
2040		s->s.ipv4.addr.src = in4->sin_addr.s_addr;
2041		s->s.ipv4.mask.src = INADDR_NONE;
2042		s->s.ipv4.mask.dst = INADDR_NONE;
2043
2044	} else if (s->s.type == TLE_V6) {
2045		in6 = (const struct sockaddr_in6 *)addr;
2046		if (memcmp(&in6->sin6_addr, &tle_ipv6_any,
2047				sizeof(tle_ipv6_any)) == 0 ||
2048				in6->sin6_port == 0)
2049			return -EINVAL;
2050
2051		s->s.port.src = in6->sin6_port;
2052		rte_memcpy(&s->s.ipv6.addr.src, &in6->sin6_addr,
2053			sizeof(s->s.ipv6.addr.src));
2054		rte_memcpy(&s->s.ipv6.mask.src, &tle_ipv6_none,
2055			sizeof(s->s.ipv6.mask.src));
2056		rte_memcpy(&s->s.ipv6.mask.dst, &tle_ipv6_none,
2057			sizeof(s->s.ipv6.mask.dst));
2058	}
2059
2060	/* setup the destination device. */
2061	rc = stream_fill_dest(s);
2062	if (rc != 0)
2063		return rc;
2064
2065	/* setup L4 dst address from device param */
2066	prm = &s->tx.dst.dev->prm;
2067	if (s->s.type == TLE_V4) {
2068		if (s->s.ipv4.addr.dst == INADDR_ANY)
2069			s->s.ipv4.addr.dst = prm->local_addr4.s_addr;
2070	} else if (memcmp(&s->s.ipv6.addr.dst, &tle_ipv6_any,
2071			sizeof(tle_ipv6_any)) == 0)
2072		memcpy(&s->s.ipv6.addr.dst, &prm->local_addr6,
2073			sizeof(s->s.ipv6.addr.dst));
2074
2075	return rc;
2076}
2077
2078static inline int
2079tx_syn(struct tle_tcp_stream *s, const struct sockaddr *addr)
2080{
2081	int32_t rc;
2082	uint32_t tms, seq;
2083	union pkt_info pi;
2084	struct stbl *st;
2085	struct stbl_entry *se;
2086
2087	/* fill stream address */
2088	rc = stream_fill_addr(s, addr);
2089	if (rc != 0)
2090		return rc;
2091
2092	/* fill pkt info to generate seq.*/
2093	stream_fill_pkt_info(s, &pi);
2094
2095	tms = tcp_get_tms(s->s.ctx->cycles_ms_shift);
2096	s->tcb.so.ts.val = tms;
2097	s->tcb.so.ts.ecr = 0;
2098	s->tcb.so.wscale = TCP_WSCALE_DEFAULT;
2099	s->tcb.so.mss = calc_smss(s->tx.dst.mtu, &s->tx.dst);
2100
2101	/* note that rcv.nxt is 0 here for sync_gen_seq.*/
2102	seq = sync_gen_seq(&pi, s->tcb.rcv.nxt, tms, s->tcb.so.mss,
2103				s->s.ctx->prm.hash_alg,
2104				&s->s.ctx->prm.secret_key);
2105	s->tcb.snd.iss = seq;
2106	s->tcb.snd.rcvr = seq;
2107	s->tcb.snd.una = seq;
2108	s->tcb.snd.nxt = seq + 1;
2109	s->tcb.snd.rto = TCP_RTO_DEFAULT;
2110	s->tcb.snd.ts = tms;
2111
2112	s->tcb.rcv.mss = s->tcb.so.mss;
2113	s->tcb.rcv.wscale = TCP_WSCALE_DEFAULT;
2114	s->tcb.rcv.wnd = calc_rx_wnd(s, s->tcb.rcv.wscale);
2115	s->tcb.rcv.ts = 0;
2116
2117	/* add the stream in stream table */
2118	st = CTX_TCP_STLB(s->s.ctx);
2119	se = stbl_add_stream_lock(st, s);
2120	if (se == NULL)
2121		return -ENOBUFS;
2122	s->ste = se;
2123
2124	/* put stream into the to-send queue */
2125	txs_enqueue(s->s.ctx, s);
2126
2127	return 0;
2128}
2129
2130int
2131tle_tcp_stream_connect(struct tle_stream *ts, const struct sockaddr *addr)
2132{
2133	struct tle_tcp_stream *s;
2134	uint32_t type;
2135	int32_t rc;
2136
2137	if (ts == NULL || addr == NULL)
2138		return -EINVAL;
2139
2140	s = TCP_STREAM(ts);
2141	type = s->s.type;
2142	if (type >= TLE_VNUM)
2143		return -EINVAL;
2144
2145	if (tcp_stream_try_acquire(s) > 0) {
2146		rc = rte_atomic16_cmpset(&s->tcb.state, TCP_ST_CLOSED,
2147			TCP_ST_SYN_SENT);
2148		rc = (rc == 0) ? -EDEADLK : 0;
2149	} else
2150		rc = -EINVAL;
2151
2152	if (rc != 0) {
2153		tcp_stream_release(s);
2154		return rc;
2155	}
2156
2157	/* fill stream, prepare and transmit syn pkt */
2158	s->tcb.uop |= TCP_OP_CONNECT;
2159	rc = tx_syn(s, addr);
2160	tcp_stream_release(s);
2161
2162	/* error happened, do a cleanup */
2163	if (rc != 0)
2164		tle_tcp_stream_close(ts);
2165
2166	return rc;
2167}
2168
2169uint16_t
2170tle_tcp_stream_recv(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
2171{
2172	uint32_t n;
2173	struct tle_tcp_stream *s;
2174
2175	s = TCP_STREAM(ts);
2176	n = _rte_ring_mcs_dequeue_burst(s->rx.q, (void **)pkt, num);
2177	if (n == 0)
2178		return 0;
2179
2180	/*
2181	 * if we still have packets to read,
2182	 * then rearm stream RX event.
2183	 */
2184	if (n == num && rte_ring_count(s->rx.q) != 0) {
2185		if (tcp_stream_try_acquire(s) > 0 && s->rx.ev != NULL)
2186			tle_event_raise(s->rx.ev);
2187		tcp_stream_release(s);
2188	}
2189
2190	return n;
2191}
2192
2193ssize_t
2194tle_tcp_stream_readv(struct tle_stream *ts, const struct iovec *iov,
2195	int iovcnt)
2196{
2197	int32_t i;
2198	uint32_t mn, n, tn;
2199	size_t sz;
2200	struct tle_tcp_stream *s;
2201	struct iovec iv;
2202	struct rxq_objs mo[2];
2203
2204	s = TCP_STREAM(ts);
2205
2206	/* get group of packets */
2207	mn = tcp_rxq_get_objs(s, mo);
2208	if (mn == 0)
2209		return 0;
2210
2211	sz = 0;
2212	n = 0;
2213	for (i = 0; i != iovcnt; i++) {
2214		iv = iov[i];
2215		sz += iv.iov_len;
2216		n += _mbus_to_iovec(&iv, mo[0].mb + n, mo[0].num - n);
2217		if (iv.iov_len != 0) {
2218			sz -= iv.iov_len;
2219			break;
2220		}
2221	}
2222
2223	tn = n;
2224
2225	if (i != iovcnt && mn != 1) {
2226		n = 0;
2227		do {
2228			sz += iv.iov_len;
2229			n += _mbus_to_iovec(&iv, mo[1].mb + n, mo[1].num - n);
2230			if (iv.iov_len != 0) {
2231				sz -= iv.iov_len;
2232				break;
2233			}
2234			if (i + 1 != iovcnt)
2235				iv = iov[i + 1];
2236		} while (++i != iovcnt);
2237		tn += n;
2238	}
2239
2240	tcp_rxq_consume(s, tn);
2241
2242	/*
2243	 * if we still have packets to read,
2244	 * then rearm stream RX event.
2245	 */
2246	if (i == iovcnt && rte_ring_count(s->rx.q) != 0) {
2247		if (tcp_stream_try_acquire(s) > 0 && s->rx.ev != NULL)
2248			tle_event_raise(s->rx.ev);
2249		tcp_stream_release(s);
2250	}
2251
2252	return sz;
2253}
2254
2255static inline int32_t
2256tx_segments(struct tle_tcp_stream *s, uint64_t ol_flags,
2257	struct rte_mbuf *segs[], uint32_t num)
2258{
2259	uint32_t i;
2260	int32_t rc;
2261
2262	for (i = 0; i != num; i++) {
2263		/* Build L2/L3/L4 header */
2264		rc = tcp_fill_mbuf(segs[i], s, &s->tx.dst, ol_flags, s->s.port,
2265			0, TCP_FLAG_ACK, 0, 0);
2266		if (rc != 0) {
2267			free_mbufs(segs, num);
2268			break;
2269		}
2270	}
2271
2272	if (i == num) {
2273		/* queue packets for further transmission. */
2274		rc = _rte_ring_enqueue_bulk(s->tx.q, (void **)segs, num);
2275		if (rc != 0)
2276			free_mbufs(segs, num);
2277	}
2278
2279	return rc;
2280}
2281
2282uint16_t
2283tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
2284{
2285	uint32_t i, j, k, mss, n, state;
2286	int32_t rc;
2287	uint64_t ol_flags;
2288	struct tle_tcp_stream *s;
2289	struct rte_mbuf *segs[TCP_MAX_PKT_SEG];
2290
2291	s = TCP_STREAM(ts);
2292
2293	/* mark stream as not closable. */
2294	if (tcp_stream_acquire(s) < 0) {
2295		rte_errno = EAGAIN;
2296		return 0;
2297	}
2298
2299	state = s->tcb.state;
2300	if (state != TCP_ST_ESTABLISHED && state != TCP_ST_CLOSE_WAIT) {
2301		rte_errno = ENOTCONN;
2302		tcp_stream_release(s);
2303		return 0;
2304	}
2305
2306	mss = s->tcb.snd.mss;
2307	ol_flags = s->tx.dst.ol_flags;
2308
2309	k = 0;
2310	rc = 0;
2311	while (k != num) {
2312		/* prepare and check for TX */
2313		for (i = k; i != num; i++) {
2314			if (pkt[i]->pkt_len > mss ||
2315					pkt[i]->nb_segs > TCP_MAX_PKT_SEG)
2316				break;
2317			rc = tcp_fill_mbuf(pkt[i], s, &s->tx.dst, ol_flags,
2318				s->s.port, 0, TCP_FLAG_ACK, 0, 0);
2319			if (rc != 0)
2320				break;
2321		}
2322
2323		if (i != k) {
2324			/* queue packets for further transmission. */
2325			n = _rte_ring_enqueue_burst(s->tx.q,
2326				(void **)pkt + k, (i - k));
2327			k += n;
2328
2329			/*
2330			 * for unsent, but already modified packets:
2331			 * remove pkt l2/l3 headers, restore ol_flags
2332			 */
2333			if (i != k) {
2334				ol_flags = ~s->tx.dst.ol_flags;
2335				for (j = k; j != i; j++) {
2336					rte_pktmbuf_adj(pkt[j], pkt[j]->l2_len +
2337						pkt[j]->l3_len +
2338						pkt[j]->l4_len);
2339					pkt[j]->ol_flags &= ol_flags;
2340				}
2341				break;
2342			}
2343		}
2344
2345		if (rc != 0) {
2346			rte_errno = -rc;
2347			break;
2348
2349		/* segment large packet and enqueue for sending */
2350		} else if (i != num) {
2351			/* segment the packet. */
2352			rc = tcp_segmentation(pkt[i], segs, RTE_DIM(segs),
2353				&s->tx.dst, mss);
2354			if (rc < 0) {
2355				rte_errno = -rc;
2356				break;
2357			}
2358
2359			rc = tx_segments(s, ol_flags, segs, rc);
2360			if (rc == 0) {
2361				/* free the large mbuf */
2362				rte_pktmbuf_free(pkt[i]);
2363				/* set the mbuf as consumed */
2364				k++;
2365			} else
2366				/* no space left in tx queue */
2367				break;
2368		}
2369	}
2370
2371	/* notify BE about more data to send */
2372	if (k != 0)
2373		txs_enqueue(s->s.ctx, s);
2374	/* if possible, re-arm stream write event. */
2375	if (rte_ring_free_count(s->tx.q) != 0 && s->tx.ev != NULL)
2376		tle_event_raise(s->tx.ev);
2377
2378	tcp_stream_release(s);
2379
2380	return k;
2381}
2382
2383ssize_t
2384tle_tcp_stream_writev(struct tle_stream *ts, struct rte_mempool *mp,
2385	const struct iovec *iov, int iovcnt)
2386{
2387	int32_t i, rc;
2388	uint32_t j, k, n, num, slen, state;
2389	uint64_t ol_flags;
2390	size_t sz, tsz;
2391	struct tle_tcp_stream *s;
2392	struct iovec iv;
2393	struct rte_mbuf *mb[2 * MAX_PKT_BURST];
2394
2395	s = TCP_STREAM(ts);
2396
2397	/* mark stream as not closable. */
2398	if (tcp_stream_acquire(s) < 0) {
2399		rte_errno = EAGAIN;
2400		return -1;
2401	}
2402
2403	state = s->tcb.state;
2404	if (state != TCP_ST_ESTABLISHED && state != TCP_ST_CLOSE_WAIT) {
2405		rte_errno = ENOTCONN;
2406		tcp_stream_release(s);
2407		return -1;
2408	}
2409
2410	/* figure out how many mbufs do we need */
2411	tsz = 0;
2412	for (i = 0; i != iovcnt; i++)
2413		tsz += iov[i].iov_len;
2414
2415	slen = rte_pktmbuf_data_room_size(mp);
2416	slen = RTE_MIN(slen, s->tcb.snd.mss);
2417
2418	num = (tsz + slen - 1) / slen;
2419	n = rte_ring_free_count(s->tx.q);
2420	num = RTE_MIN(num, n);
2421	n = RTE_MIN(num, RTE_DIM(mb));
2422
2423	/* allocate mbufs */
2424	if (rte_pktmbuf_alloc_bulk(mp, mb, n) != 0) {
2425		rte_errno = ENOMEM;
2426		tcp_stream_release(s);
2427		return -1;
2428	}
2429
2430	/* copy data into the mbufs */
2431	k = 0;
2432	sz = 0;
2433	for (i = 0; i != iovcnt; i++) {
2434		iv = iov[i];
2435		sz += iv.iov_len;
2436		k += _iovec_to_mbsegs(&iv, slen, mb + k, n - k);
2437		if (iv.iov_len != 0) {
2438			sz -= iv.iov_len;
2439			break;
2440		}
2441	}
2442
2443	/* partially filled segment */
2444	k += (k != n && mb[k]->data_len != 0);
2445
2446	/* fill pkt headers */
2447	ol_flags = s->tx.dst.ol_flags;
2448
2449	for (j = 0; j != k; j++) {
2450		rc = tcp_fill_mbuf(mb[j], s, &s->tx.dst, ol_flags,
2451			s->s.port, 0, TCP_FLAG_ACK, 0, 0);
2452		if (rc != 0)
2453			break;
2454	}
2455
2456	/* if no error encountered, then enqueue pkts for transmission */
2457	if (k == j)
2458		k = _rte_ring_enqueue_burst(s->tx.q, (void **)mb, j);
2459	else
2460		k = 0;
2461
2462	if (k != j) {
2463
2464		/* free pkts that were not enqueued */
2465		free_mbufs(mb + k, j - k);
2466
2467		/* our last segment can be partially filled */
2468		sz += slen - sz % slen;
2469		sz -= (j - k) * slen;
2470
2471		/* report an error */
2472		if (rc != 0) {
2473			rte_errno = -rc;
2474			sz = -1;
2475		}
2476	}
2477
2478        if (k != 0) {
2479
2480		/* notify BE about more data to send */
2481		txs_enqueue(s->s.ctx, s);
2482
2483		/* if possible, re-arm stream write event. */
2484		if (rte_ring_free_count(s->tx.q) != 0 && s->tx.ev != NULL)
2485			tle_event_raise(s->tx.ev);
2486	}
2487
2488	tcp_stream_release(s);
2489	return sz;
2490}
2491
2492/* send data and FIN (if needed) */
2493static inline void
2494tx_data_fin(struct tle_tcp_stream *s, uint32_t tms, uint32_t state)
2495{
2496	/* try to send some data */
2497	tx_nxt_data(s, tms);
2498
2499	/* we also have to send a FIN */
2500	if (state != TCP_ST_ESTABLISHED &&
2501			state != TCP_ST_CLOSE_WAIT &&
2502			tcp_txq_nxt_cnt(s) == 0 &&
2503			s->tcb.snd.fss != s->tcb.snd.nxt) {
2504		s->tcb.snd.fss = ++s->tcb.snd.nxt;
2505		send_ack(s, tms, TCP_FLAG_FIN | TCP_FLAG_ACK);
2506	}
2507}
2508
2509static inline void
2510tx_stream(struct tle_tcp_stream *s, uint32_t tms)
2511{
2512	uint32_t state;
2513
2514	state = s->tcb.state;
2515
2516	if (state == TCP_ST_SYN_SENT) {
2517		/* send the SYN, start the rto timer */
2518		send_ack(s, tms, TCP_FLAG_SYN);
2519		timer_start(s);
2520
2521	} else if (state >= TCP_ST_ESTABLISHED && state <= TCP_ST_LAST_ACK) {
2522
2523		tx_data_fin(s, tms, state);
2524
2525		/* start RTO timer. */
2526		if (s->tcb.snd.nxt != s->tcb.snd.una)
2527			timer_start(s);
2528	}
2529}
2530
2531static inline void
2532rto_stream(struct tle_tcp_stream *s, uint32_t tms)
2533{
2534	uint32_t state;
2535
2536	state = s->tcb.state;
2537
2538	TCP_LOG(DEBUG, "%s(%p, tms=%u): state=%u, "
2539		"retx=%u, retm=%u, "
2540		"rto=%u, snd.ts=%u, tmo=%u, "
2541		"snd.nxt=%lu, snd.una=%lu, flight_size=%lu, "
2542		"snd.rcvr=%lu, snd.fastack=%u, "
2543		"wnd=%u, cwnd=%u, ssthresh=%u, "
2544		"bytes sent=%lu, pkt remain=%u;\n",
2545		__func__, s, tms, s->tcb.state,
2546		s->tcb.snd.nb_retx, s->tcb.snd.nb_retm,
2547		s->tcb.snd.rto, s->tcb.snd.ts, tms - s->tcb.snd.ts,
2548		s->tcb.snd.nxt, s->tcb.snd.una, s->tcb.snd.nxt - s->tcb.snd.una,
2549		s->tcb.snd.rcvr, s->tcb.snd.fastack,
2550		s->tcb.snd.wnd, s->tcb.snd.cwnd, s->tcb.snd.ssthresh,
2551		s->tcb.snd.nxt - s->tcb.snd.iss, tcp_txq_nxt_cnt(s));
2552
2553	if (s->tcb.snd.nb_retx < s->tcb.snd.nb_retm) {
2554
2555		if (state >= TCP_ST_ESTABLISHED && state <= TCP_ST_LAST_ACK) {
2556
2557			/* update SND.CWD and SND.SSTHRESH */
2558			rto_cwnd_update(&s->tcb);
2559
2560			/* RFC 6582 3.2.4 */
2561			s->tcb.snd.rcvr = s->tcb.snd.nxt;
2562			s->tcb.snd.fastack = 0;
2563
2564			/* restart from last acked data */
2565			tcp_txq_rst_nxt_head(s);
2566			s->tcb.snd.nxt = s->tcb.snd.una;
2567
2568			tx_data_fin(s, tms, state);
2569
2570		} else if (state == TCP_ST_SYN_SENT) {
2571			/* resending SYN */
2572			s->tcb.so.ts.val = tms;
2573
2574			/* According to RFC 6928 2:
2575			 * To reduce the chance for spurious SYN or SYN/ACK
2576			 * retransmission, it is RECOMMENDED that
2577			 * implementations refrain from resetting the initial
2578			 * window to 1 segment, unless there have been more
2579			 * than one SYN or SYN/ACK retransmissions or true loss
2580			 * detection has been made.
2581			 */
2582			if (s->tcb.snd.nb_retx != 0)
2583				s->tcb.snd.cwnd = s->tcb.snd.mss;
2584
2585			send_ack(s, tms, TCP_FLAG_SYN);
2586
2587		} else if (state == TCP_ST_TIME_WAIT) {
2588			stream_term(s);
2589		}
2590
2591		/* RFC6298:5.5 back off the timer */
2592		s->tcb.snd.rto = rto_roundup(2 * s->tcb.snd.rto);
2593		s->tcb.snd.nb_retx++;
2594		timer_restart(s);
2595
2596	} else {
2597		send_rst(s, s->tcb.snd.una);
2598		stream_term(s);
2599	}
2600}
2601
2602int
2603tle_tcp_process(struct tle_ctx *ctx, uint32_t num)
2604{
2605	uint32_t i, k, tms;
2606	struct sdr *dr;
2607	struct tle_timer_wheel *tw;
2608	struct tle_stream *p;
2609	struct tle_tcp_stream *s, *rs[num];
2610
2611	/* process streams with RTO exipred */
2612
2613	tw = CTX_TCP_TMWHL(ctx);
2614	tms = tcp_get_tms(ctx->cycles_ms_shift);
2615	tle_timer_expire(tw, tms);
2616
2617	k = tle_timer_get_expired_bulk(tw, (void **)rs, RTE_DIM(rs));
2618
2619	for (i = 0; i != k; i++) {
2620
2621		s = rs[i];
2622		s->timer.handle = NULL;
2623		if (tcp_stream_try_acquire(s) > 0)
2624			rto_stream(s, tms);
2625		tcp_stream_release(s);
2626	}
2627
2628	/* process streams from to-send queue */
2629
2630	k = txs_dequeue_bulk(ctx, rs, RTE_DIM(rs));
2631
2632	for (i = 0; i != k; i++) {
2633
2634		s = rs[i];
2635		rte_atomic32_set(&s->tx.arm, 0);
2636
2637		if (tcp_stream_try_acquire(s) > 0)
2638			tx_stream(s, tms);
2639		else
2640			txs_enqueue(s->s.ctx, s);
2641		tcp_stream_release(s);
2642	}
2643
2644	/* collect streams to close from the death row */
2645
2646	dr = CTX_TCP_SDR(ctx);
2647	for (k = 0, p = STAILQ_FIRST(&dr->be);
2648			k != num && p != NULL;
2649			k++, p = STAILQ_NEXT(p, link))
2650		rs[k] = TCP_STREAM(p);
2651
2652	if (p == NULL)
2653		STAILQ_INIT(&dr->be);
2654	else
2655		STAILQ_FIRST(&dr->be) = p;
2656
2657	/* cleanup closed streams */
2658	for (i = 0; i != k; i++) {
2659		s = rs[i];
2660		tcp_stream_down(s);
2661		tcp_stream_reset(ctx, s);
2662	}
2663
2664	return 0;
2665}
2666