1/*
2 * Copyright (c) 2016-2017  Intel Corporation.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#ifndef _TCP_RXQ_H_
17#define _TCP_RXQ_H_
18
19#include "tcp_ofo.h"
20
21#ifdef __cplusplus
22extern "C" {
23#endif
24
25struct rxq_objs {
26	struct rte_mbuf **mb;
27	uint32_t num;
28};
29
30static inline uint32_t
31rx_ofo_enqueue(struct tle_tcp_stream *s, union seqlen *sl,
32	struct rte_mbuf *mb[], uint32_t num)
33{
34	uint32_t i, n;
35
36	n = 0;
37	do {
38		i = _ofo_step(s->rx.ofo, sl, mb + n, num - n);
39		n += i;
40	} while (i != 0 && n != num);
41
42	_ofo_compact(s->rx.ofo);
43	return n;
44}
45
46static inline uint32_t
47rx_ofo_reduce(struct tle_tcp_stream *s)
48{
49	uint32_t i, n, seq;
50	struct ofo *ofo;
51	struct ofodb *db;
52
53	seq = s->tcb.rcv.nxt;
54	ofo = s->rx.ofo;
55
56	if (ofo->nb_elem == 0)
57		return 0;
58
59	n = 0;
60	for (i = 0; i != ofo->nb_elem; i++) {
61
62		db = ofo->db + i;
63
64		/* gap still present */
65		if (tcp_seq_lt(seq, db->sl.seq))
66			break;
67
68		/* this db is fully overlapped */
69		if (tcp_seq_leq(db->sl.seq + db->sl.len, seq))
70			_ofodb_free(db);
71		else
72			n += _ofodb_enqueue(s->rx.q, db, &seq);
73	}
74
75	s->tcb.rcv.nxt = seq;
76	_ofo_remove(ofo, 0, i);
77	return n;
78}
79
80static inline uint32_t
81rx_ino_enqueue(struct tle_tcp_stream *s, union seqlen *sl,
82	struct rte_mbuf *mb[], uint32_t num)
83{
84	uint32_t i, n;
85
86	n = _rte_ring_enqueue_burst(s->rx.q, (void * const *)mb, num);
87
88	/* error: can'queue some packets into receive buffer. */
89	for (i = n; i != num; i++)
90		sl->len -= mb[i]->pkt_len;
91
92	s->tcb.rcv.nxt = sl->seq + sl->len;
93	return n;
94}
95
96static inline uint32_t
97rx_data_enqueue(struct tle_tcp_stream *s, uint32_t seq, uint32_t len,
98	struct rte_mbuf *mb[], uint32_t num)
99{
100	uint32_t n, r, t;
101	union seqlen sl;
102
103	sl.seq = seq;
104	sl.len = len;
105
106	r = rte_ring_count(s->rx.q);
107
108	/* in order packets, ready to be delivered */
109	if (seq == s->tcb.rcv.nxt) {
110
111		t = rx_ino_enqueue(s, &sl, mb, num);
112
113		/* failed to queue all input in-order packets */
114		if (t != num)
115			TCP_LOG(DEBUG,
116			"%s(s=%p, seq=%u, len=%u, num=%u) failed to queue "
117			"%u packets;\n",
118			__func__, s, seq, len, num, num - t);
119
120		/* try to consume some out-of-order packets*/
121		else {
122			n = rx_ofo_reduce(s);
123			if (n != 0)
124				TCP_LOG(DEBUG,
125				"%s(s=%p, rcv.nxt=%u) failed to queue %u "
126				"OFO packets;\n",
127				__func__, s, s->tcb.rcv.nxt, n);
128		}
129
130	/* queue out of order packets */
131	} else {
132		t = rx_ofo_enqueue(s, &sl, mb, num);
133	}
134
135	n = rte_ring_count(s->rx.q);
136	if (r != n) {
137		/* raise RX event */
138		if (s->rx.ev != NULL)
139			tle_event_raise(s->rx.ev);
140		/* if RX queue was empty invoke RX notification callback. */
141		else if (s->rx.cb.func != NULL && r == 0)
142			s->rx.cb.func(s->rx.cb.data, &s->s);
143	}
144
145	return t;
146}
147
148static inline uint32_t
149tcp_rxq_get_objs(struct tle_tcp_stream *s, struct rxq_objs obj[2])
150{
151	struct rte_ring *r;
152	uint32_t n, head, sz;
153
154	r = s->rx.q;
155
156	n = _rte_ring_mcs_dequeue_start(r, UINT32_MAX);
157	if (n == 0)
158		return 0;
159
160	sz = _rte_ring_get_size(r);
161	head = (r->cons.head - n) & _rte_ring_get_mask(r);
162
163	obj[0].mb = (struct rte_mbuf **)(_rte_ring_get_data(r) + head);
164	obj[1].mb = (struct rte_mbuf **)_rte_ring_get_data(r);
165
166	if (head + n <= sz) {
167		obj[0].num = n;
168		obj[1].num = 0;
169		return 1;
170	} else {
171		obj[0].num = sz - head;
172		obj[1].num = n + head - sz;
173		return 2;
174	}
175}
176
177static inline void
178tcp_rxq_consume(struct tle_tcp_stream *s, uint32_t num)
179{
180	_rte_ring_mcs_dequeue_finish(s->rx.q, num);
181}
182
183#ifdef __cplusplus
184}
185#endif
186
187#endif /* _TCP_RXQ_H_ */
188