tcp_rxq.h revision aa97dd1c
1/*
2 * Copyright (c) 2016  Intel Corporation.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#ifndef _TCP_RXQ_H_
17#define _TCP_RXQ_H_
18
19#include "tcp_ofo.h"
20
21#ifdef __cplusplus
22extern "C" {
23#endif
24
25static inline uint32_t
26rx_ofo_enqueue(struct tle_tcp_stream *s, union seqlen *sl,
27	struct rte_mbuf *mb[], uint32_t num)
28{
29	uint32_t i, n;
30
31	n = 0;
32	do {
33		i = _ofo_step(s->rx.ofo, sl, mb + n, num - n);
34		n += i;
35	} while (i != 0 && n != num);
36
37	_ofo_compact(s->rx.ofo);
38	return n;
39}
40
41static inline uint32_t
42rx_ofo_reduce(struct tle_tcp_stream *s)
43{
44	uint32_t i, n, end, seq;
45	struct ofo *ofo;
46	struct ofodb *db;
47	union seqlen sl;
48
49	seq = s->tcb.rcv.nxt;
50	ofo = s->rx.ofo;
51
52	n = 0;
53	for (i = 0; i != ofo->nb_elem; i++) {
54
55		db = ofo->db + i;
56
57		/* gap still present */
58		if (tcp_seq_lt(seq, db->sl.seq))
59			break;
60
61		end = db->sl.seq + db->sl.len;
62
63		/* this db is fully overlapped */
64		if (tcp_seq_leq(end, seq))
65			_ofodb_free(db);
66		else
67			n += _ofodb_enqueue(s->rx.q, db, &sl);
68
69		seq = sl.seq + sl.len;
70	}
71
72	s->tcb.rcv.nxt = seq;
73	_ofo_remove(ofo, 0, i);
74	return n;
75}
76
77static inline uint32_t
78rx_ino_enqueue(struct tle_tcp_stream *s, union seqlen *sl,
79	struct rte_mbuf *mb[], uint32_t num)
80{
81	uint32_t i, n;
82
83	n = rte_ring_enqueue_burst(s->rx.q, (void * const *)mb, num);
84
85	/* error: can'queue some packets into receive buffer. */
86	for (i = n; i != num; i++)
87		sl->len -= mb[i]->pkt_len;
88
89	s->tcb.rcv.nxt = sl->seq + sl->len;
90	return n;
91}
92
93static inline uint32_t
94rx_data_enqueue(struct tle_tcp_stream *s, uint32_t seq, uint32_t len,
95	struct rte_mbuf *mb[], uint32_t num)
96{
97	uint32_t n, r, t;
98	union seqlen sl;
99
100	sl.seq = seq;
101	sl.len = len;
102
103	r = rte_ring_count(s->rx.q);
104
105	/* in order packets, ready to be delivered */
106	if (seq == s->tcb.rcv.nxt) {
107
108		t = rx_ino_enqueue(s, &sl, mb, num);
109
110		/* failed to queue all input in-order packets */
111		if (t != num)
112			TCP_LOG(DEBUG,
113			"%s(s=%p, seq=%u, len=%u, num=%u) failed to queue "
114			"%u packets;\n",
115			__func__, s, seq, len, num, num - t);
116
117		/* try to consume some out-of-order packets*/
118		else {
119			n = rx_ofo_reduce(s);
120			if (n != 0)
121				TCP_LOG(DEBUG,
122				"%s(s=%p, rcv.nxt=%u) failed to queue %u "
123				"OFO packets;\n",
124				__func__, s, s->tcb.rcv.nxt, n);
125		}
126
127	/* queue out of order packets */
128	} else {
129		t = rx_ofo_enqueue(s, &sl, mb, num);
130	}
131
132	n = rte_ring_count(s->rx.q);
133	if (r != n) {
134		/* raise RX event */
135		if (s->rx.ev != NULL)
136			tle_event_raise(s->rx.ev);
137		/* if RX queue was empty invoke RX notification callback. */
138		else if (s->rx.cb.func != NULL && r == 0)
139			s->rx.cb.func(s->rx.cb.data, &s->s);
140	}
141
142	return t;
143}
144
145#ifdef __cplusplus
146}
147#endif
148
149#endif /* _TCP_RXQ_H_ */
150