tcp_rxq.h revision 7e18fa1b
1/*
2 * Copyright (c) 2016-2017  Intel Corporation.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#ifndef _TCP_RXQ_H_
17#define _TCP_RXQ_H_
18
19#include "tcp_ofo.h"
20
21#ifdef __cplusplus
22extern "C" {
23#endif
24
25struct rxq_objs {
26	struct rte_mbuf **mb;
27	uint32_t num;
28};
29
30static inline uint32_t
31rx_ofo_enqueue(struct tle_tcp_stream *s, union seqlen *sl,
32	struct rte_mbuf *mb[], uint32_t num)
33{
34	uint32_t i, n;
35
36	n = 0;
37	do {
38		i = _ofo_step(s->rx.ofo, sl, mb + n, num - n);
39		n += i;
40	} while (i != 0 && n != num);
41
42	_ofo_compact(s->rx.ofo);
43	return n;
44}
45
46static inline uint32_t
47rx_ofo_reduce(struct tle_tcp_stream *s)
48{
49	uint32_t i, n, end, seq;
50	struct ofo *ofo;
51	struct ofodb *db;
52	union seqlen sl;
53
54	seq = s->tcb.rcv.nxt;
55	ofo = s->rx.ofo;
56
57	n = 0;
58	for (i = 0; i != ofo->nb_elem; i++) {
59
60		db = ofo->db + i;
61
62		/* gap still present */
63		if (tcp_seq_lt(seq, db->sl.seq))
64			break;
65
66		end = db->sl.seq + db->sl.len;
67
68		/* this db is fully overlapped */
69		if (tcp_seq_leq(end, seq))
70			_ofodb_free(db);
71		else
72			n += _ofodb_enqueue(s->rx.q, db, &sl);
73
74		seq = sl.seq + sl.len;
75	}
76
77	s->tcb.rcv.nxt = seq;
78	_ofo_remove(ofo, 0, i);
79	return n;
80}
81
82static inline uint32_t
83rx_ino_enqueue(struct tle_tcp_stream *s, union seqlen *sl,
84	struct rte_mbuf *mb[], uint32_t num)
85{
86	uint32_t i, n;
87
88	n = _rte_ring_enqueue_burst(s->rx.q, (void * const *)mb, num);
89
90	/* error: can'queue some packets into receive buffer. */
91	for (i = n; i != num; i++)
92		sl->len -= mb[i]->pkt_len;
93
94	s->tcb.rcv.nxt = sl->seq + sl->len;
95	return n;
96}
97
98static inline uint32_t
99rx_data_enqueue(struct tle_tcp_stream *s, uint32_t seq, uint32_t len,
100	struct rte_mbuf *mb[], uint32_t num)
101{
102	uint32_t n, r, t;
103	union seqlen sl;
104
105	sl.seq = seq;
106	sl.len = len;
107
108	r = rte_ring_count(s->rx.q);
109
110	/* in order packets, ready to be delivered */
111	if (seq == s->tcb.rcv.nxt) {
112
113		t = rx_ino_enqueue(s, &sl, mb, num);
114
115		/* failed to queue all input in-order packets */
116		if (t != num)
117			TCP_LOG(DEBUG,
118			"%s(s=%p, seq=%u, len=%u, num=%u) failed to queue "
119			"%u packets;\n",
120			__func__, s, seq, len, num, num - t);
121
122		/* try to consume some out-of-order packets*/
123		else {
124			n = rx_ofo_reduce(s);
125			if (n != 0)
126				TCP_LOG(DEBUG,
127				"%s(s=%p, rcv.nxt=%u) failed to queue %u "
128				"OFO packets;\n",
129				__func__, s, s->tcb.rcv.nxt, n);
130		}
131
132	/* queue out of order packets */
133	} else {
134		t = rx_ofo_enqueue(s, &sl, mb, num);
135	}
136
137	n = rte_ring_count(s->rx.q);
138	if (r != n) {
139		/* raise RX event */
140		if (s->rx.ev != NULL)
141			tle_event_raise(s->rx.ev);
142		/* if RX queue was empty invoke RX notification callback. */
143		else if (s->rx.cb.func != NULL && r == 0)
144			s->rx.cb.func(s->rx.cb.data, &s->s);
145	}
146
147	return t;
148}
149
150static inline uint32_t
151tcp_rxq_get_objs(struct tle_tcp_stream *s, struct rxq_objs obj[2])
152{
153	struct rte_ring *r;
154	uint32_t n, head, sz;
155
156	r = s->rx.q;
157
158	n = _rte_ring_mcs_dequeue_start(r, UINT32_MAX);
159	if (n == 0)
160		return 0;
161
162	sz = _rte_ring_get_size(r);
163	head = (r->cons.head - n) & _rte_ring_get_mask(r);
164
165	obj[0].mb = (struct rte_mbuf **)(_rte_ring_get_data(r) + head);
166	obj[1].mb = (struct rte_mbuf **)_rte_ring_get_data(r);
167
168	if (head + n <= sz) {
169		obj[0].num = n;
170		obj[1].num = 0;
171		return 1;
172	} else {
173		obj[0].num = sz - head;
174		obj[1].num = n + head - sz;
175		return 2;
176	}
177}
178
179static inline void
180tcp_rxq_consume(struct tle_tcp_stream *s, uint32_t num)
181{
182	_rte_ring_mcs_dequeue_finish(s->rx.q, num);
183}
184
185#ifdef __cplusplus
186}
187#endif
188
189#endif /* _TCP_RXQ_H_ */
190