1aa97dd1cSKonstantin Ananyev/*
2aa97dd1cSKonstantin Ananyev * Copyright (c) 2016  Intel Corporation.
3aa97dd1cSKonstantin Ananyev * Licensed under the Apache License, Version 2.0 (the "License");
4aa97dd1cSKonstantin Ananyev * you may not use this file except in compliance with the License.
5aa97dd1cSKonstantin Ananyev * You may obtain a copy of the License at:
6aa97dd1cSKonstantin Ananyev *
7aa97dd1cSKonstantin Ananyev *     http://www.apache.org/licenses/LICENSE-2.0
8aa97dd1cSKonstantin Ananyev *
9aa97dd1cSKonstantin Ananyev * Unless required by applicable law or agreed to in writing, software
10aa97dd1cSKonstantin Ananyev * distributed under the License is distributed on an "AS IS" BASIS,
11aa97dd1cSKonstantin Ananyev * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12aa97dd1cSKonstantin Ananyev * See the License for the specific language governing permissions and
13aa97dd1cSKonstantin Ananyev * limitations under the License.
14aa97dd1cSKonstantin Ananyev */
15aa97dd1cSKonstantin Ananyev
16aa97dd1cSKonstantin Ananyev#ifndef UDP_H_
17aa97dd1cSKonstantin Ananyev#define UDP_H_
18aa97dd1cSKonstantin Ananyev
19aa97dd1cSKonstantin Ananyev/*
20aa97dd1cSKonstantin Ananyev * helper function: opens IPv4 and IPv6 streams for selected port.
21aa97dd1cSKonstantin Ananyev */
22aa97dd1cSKonstantin Ananyevstatic struct netfe_stream *
23aa97dd1cSKonstantin Ananyevnetfe_stream_open_udp(struct netfe_lcore *fe, struct netfe_sprm *sprm,
24aa97dd1cSKonstantin Ananyev	uint32_t lcore, uint16_t op, uint32_t bidx)
25aa97dd1cSKonstantin Ananyev{
26aa97dd1cSKonstantin Ananyev	int32_t rc;
27aa97dd1cSKonstantin Ananyev	struct netfe_stream *fes;
28aa97dd1cSKonstantin Ananyev	struct sockaddr_in *l4;
29aa97dd1cSKonstantin Ananyev	struct sockaddr_in6 *l6;
30aa97dd1cSKonstantin Ananyev	uint16_t errport;
31aa97dd1cSKonstantin Ananyev	struct tle_udp_stream_param uprm;
32aa97dd1cSKonstantin Ananyev
33aa97dd1cSKonstantin Ananyev	fes = netfe_get_stream(&fe->free);
34aa97dd1cSKonstantin Ananyev	if (fes == NULL) {
35aa97dd1cSKonstantin Ananyev		rte_errno = ENOBUFS;
36aa97dd1cSKonstantin Ananyev		return NULL;
37aa97dd1cSKonstantin Ananyev	}
38aa97dd1cSKonstantin Ananyev
39aa97dd1cSKonstantin Ananyev	fes->rxev = tle_event_alloc(fe->rxeq, fes);
40aa97dd1cSKonstantin Ananyev	fes->txev = tle_event_alloc(fe->txeq, fes);
41aa97dd1cSKonstantin Ananyev
42aa97dd1cSKonstantin Ananyev	if (fes->rxev == NULL || fes->txev == NULL) {
43aa97dd1cSKonstantin Ananyev		netfe_stream_close(fe, fes);
44aa97dd1cSKonstantin Ananyev		rte_errno = ENOMEM;
45aa97dd1cSKonstantin Ananyev		return NULL;
46aa97dd1cSKonstantin Ananyev	}
47aa97dd1cSKonstantin Ananyev
48aa97dd1cSKonstantin Ananyev	if (op == TXONLY || op == FWD) {
49aa97dd1cSKonstantin Ananyev		tle_event_active(fes->txev, TLE_SEV_DOWN);
50aa97dd1cSKonstantin Ananyev		fes->stat.txev[TLE_SEV_DOWN]++;
51aa97dd1cSKonstantin Ananyev	}
52aa97dd1cSKonstantin Ananyev
53aa97dd1cSKonstantin Ananyev	if (op != TXONLY) {
54aa97dd1cSKonstantin Ananyev		tle_event_active(fes->rxev, TLE_SEV_DOWN);
55aa97dd1cSKonstantin Ananyev		fes->stat.rxev[TLE_SEV_DOWN]++;
56aa97dd1cSKonstantin Ananyev	}
57aa97dd1cSKonstantin Ananyev
58aa97dd1cSKonstantin Ananyev	memset(&uprm, 0, sizeof(uprm));
59aa97dd1cSKonstantin Ananyev	uprm.local_addr = sprm->local_addr;
60aa97dd1cSKonstantin Ananyev	uprm.remote_addr = sprm->remote_addr;
61aa97dd1cSKonstantin Ananyev	uprm.recv_ev = fes->rxev;
62aa97dd1cSKonstantin Ananyev	if (op != FWD)
63aa97dd1cSKonstantin Ananyev		uprm.send_ev = fes->txev;
64aa97dd1cSKonstantin Ananyev	fes->s = tle_udp_stream_open(becfg.cpu[bidx].ctx, &uprm);
65aa97dd1cSKonstantin Ananyev
66aa97dd1cSKonstantin Ananyev	if (fes->s == NULL) {
67aa97dd1cSKonstantin Ananyev		rc = rte_errno;
68aa97dd1cSKonstantin Ananyev		netfe_stream_close(fe, fes);
69aa97dd1cSKonstantin Ananyev		rte_errno = rc;
70aa97dd1cSKonstantin Ananyev
71aa97dd1cSKonstantin Ananyev		if (sprm->local_addr.ss_family == AF_INET) {
72aa97dd1cSKonstantin Ananyev			l4 = (struct sockaddr_in *) &sprm->local_addr;
73aa97dd1cSKonstantin Ananyev			errport = ntohs(l4->sin_port);
74aa97dd1cSKonstantin Ananyev		} else {
75aa97dd1cSKonstantin Ananyev			l6 = (struct sockaddr_in6 *) &sprm->local_addr;
76aa97dd1cSKonstantin Ananyev			errport = ntohs(l6->sin6_port);
77aa97dd1cSKonstantin Ananyev		}
78aa97dd1cSKonstantin Ananyev
79aa97dd1cSKonstantin Ananyev		RTE_LOG(ERR, USER1, "stream open failed for port %u with error "
80aa97dd1cSKonstantin Ananyev			"code=%u, bidx=%u, lc=%u\n",
81aa97dd1cSKonstantin Ananyev			errport, rc, bidx, becfg.cpu[bidx].id);
82aa97dd1cSKonstantin Ananyev		return NULL;
83aa97dd1cSKonstantin Ananyev	}
84aa97dd1cSKonstantin Ananyev
85aa97dd1cSKonstantin Ananyev	RTE_LOG(NOTICE, USER1,
86aa97dd1cSKonstantin Ananyev		"%s(%u)={s=%p, op=%hu, proto=%s, rxev=%p, txev=%p}, belc=%u\n",
87aa97dd1cSKonstantin Ananyev		__func__, lcore, fes->s, op, proto_name[becfg.proto],
88aa97dd1cSKonstantin Ananyev		fes->rxev, fes->txev, becfg.cpu[bidx].id);
89aa97dd1cSKonstantin Ananyev
90aa97dd1cSKonstantin Ananyev	fes->op = op;
91aa97dd1cSKonstantin Ananyev	fes->proto = becfg.proto;
92aa97dd1cSKonstantin Ananyev	fes->family = sprm->local_addr.ss_family;
93aa97dd1cSKonstantin Ananyev
94aa97dd1cSKonstantin Ananyev	return fes;
95aa97dd1cSKonstantin Ananyev}
96aa97dd1cSKonstantin Ananyev
97aa97dd1cSKonstantin Ananyevstatic int
98aa97dd1cSKonstantin Ananyevnetfe_lcore_init_udp(const struct netfe_lcore_prm *prm)
99aa97dd1cSKonstantin Ananyev{
100aa97dd1cSKonstantin Ananyev	size_t sz;
101aa97dd1cSKonstantin Ananyev	int32_t rc;
102aa97dd1cSKonstantin Ananyev	uint32_t i, lcore, snum;
103aa97dd1cSKonstantin Ananyev	struct netfe_lcore *fe;
104aa97dd1cSKonstantin Ananyev	struct tle_evq_param eprm;
105aa97dd1cSKonstantin Ananyev	struct netfe_stream *fes;
106aa97dd1cSKonstantin Ananyev	struct netfe_sprm *sprm;
107aa97dd1cSKonstantin Ananyev
108aa97dd1cSKonstantin Ananyev	lcore = rte_lcore_id();
109aa97dd1cSKonstantin Ananyev
110aa97dd1cSKonstantin Ananyev	snum = prm->max_streams;
111aa97dd1cSKonstantin Ananyev	RTE_LOG(NOTICE, USER1, "%s(lcore=%u, nb_streams=%u, max_streams=%u)\n",
112aa97dd1cSKonstantin Ananyev		__func__, lcore, prm->nb_streams, snum);
113aa97dd1cSKonstantin Ananyev
114aa97dd1cSKonstantin Ananyev	memset(&eprm, 0, sizeof(eprm));
115aa97dd1cSKonstantin Ananyev	eprm.socket_id = rte_lcore_to_socket_id(lcore);
116aa97dd1cSKonstantin Ananyev	eprm.max_events = snum;
117aa97dd1cSKonstantin Ananyev
118aa97dd1cSKonstantin Ananyev	sz = sizeof(*fe) + snum * sizeof(struct netfe_stream);
119aa97dd1cSKonstantin Ananyev	fe = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
120aa97dd1cSKonstantin Ananyev		rte_lcore_to_socket_id(lcore));
121aa97dd1cSKonstantin Ananyev
122aa97dd1cSKonstantin Ananyev	if (fe == NULL) {
123aa97dd1cSKonstantin Ananyev		RTE_LOG(ERR, USER1, "%s:%d failed to allocate %zu bytes\n",
124aa97dd1cSKonstantin Ananyev			__func__, __LINE__, sz);
125aa97dd1cSKonstantin Ananyev		return -ENOMEM;
126aa97dd1cSKonstantin Ananyev	}
127aa97dd1cSKonstantin Ananyev
128aa97dd1cSKonstantin Ananyev	RTE_PER_LCORE(_fe) = fe;
129aa97dd1cSKonstantin Ananyev
130aa97dd1cSKonstantin Ananyev	fe->snum = snum;
131aa97dd1cSKonstantin Ananyev	/* initialize the stream pool */
132aa97dd1cSKonstantin Ananyev	LIST_INIT(&fe->free.head);
133aa97dd1cSKonstantin Ananyev	LIST_INIT(&fe->use.head);
134aa97dd1cSKonstantin Ananyev	fes = (struct netfe_stream *)(fe + 1);
135aa97dd1cSKonstantin Ananyev	for (i = 0; i != snum; i++, fes++)
136aa97dd1cSKonstantin Ananyev		netfe_put_stream(fe, &fe->free, fes);
137aa97dd1cSKonstantin Ananyev
138aa97dd1cSKonstantin Ananyev	/* allocate the event queues */
139aa97dd1cSKonstantin Ananyev	fe->rxeq = tle_evq_create(&eprm);
140aa97dd1cSKonstantin Ananyev	fe->txeq = tle_evq_create(&eprm);
141aa97dd1cSKonstantin Ananyev
142aa97dd1cSKonstantin Ananyev	RTE_LOG(INFO, USER1, "%s(%u) rx evq=%p, tx evq=%p\n",
143aa97dd1cSKonstantin Ananyev		__func__, lcore, fe->rxeq, fe->txeq);
144aa97dd1cSKonstantin Ananyev	if (fe->rxeq == NULL || fe->txeq == NULL)
145aa97dd1cSKonstantin Ananyev		return -ENOMEM;
146aa97dd1cSKonstantin Ananyev
147aa97dd1cSKonstantin Ananyev	rc = fwd_tbl_init(fe, AF_INET, lcore);
148aa97dd1cSKonstantin Ananyev	RTE_LOG(ERR, USER1, "%s(%u) fwd_tbl_init(%u) returns %d\n",
149aa97dd1cSKonstantin Ananyev		__func__, lcore, AF_INET, rc);
150aa97dd1cSKonstantin Ananyev	if (rc != 0)
151aa97dd1cSKonstantin Ananyev		return rc;
152aa97dd1cSKonstantin Ananyev
153aa97dd1cSKonstantin Ananyev	rc = fwd_tbl_init(fe, AF_INET6, lcore);
154aa97dd1cSKonstantin Ananyev	RTE_LOG(ERR, USER1, "%s(%u) fwd_tbl_init(%u) returns %d\n",
155aa97dd1cSKonstantin Ananyev		__func__, lcore, AF_INET6, rc);
156aa97dd1cSKonstantin Ananyev	if (rc != 0)
157aa97dd1cSKonstantin Ananyev		return rc;
158aa97dd1cSKonstantin Ananyev
159aa97dd1cSKonstantin Ananyev	/* open all requested streams. */
160aa97dd1cSKonstantin Ananyev	for (i = 0; i != prm->nb_streams; i++) {
161aa97dd1cSKonstantin Ananyev		sprm = &prm->stream[i].sprm;
162aa97dd1cSKonstantin Ananyev		fes = netfe_stream_open_udp(fe, sprm, lcore, prm->stream[i].op,
163aa97dd1cSKonstantin Ananyev			sprm->bidx);
164aa97dd1cSKonstantin Ananyev		if (fes == NULL) {
165aa97dd1cSKonstantin Ananyev			rc = -rte_errno;
166aa97dd1cSKonstantin Ananyev			break;
167aa97dd1cSKonstantin Ananyev		}
168aa97dd1cSKonstantin Ananyev
169aa97dd1cSKonstantin Ananyev		netfe_stream_dump(fes, &sprm->local_addr, &sprm->remote_addr);
170aa97dd1cSKonstantin Ananyev
171aa97dd1cSKonstantin Ananyev		if (prm->stream[i].op == FWD) {
172aa97dd1cSKonstantin Ananyev			fes->fwdprm = prm->stream[i].fprm;
173aa97dd1cSKonstantin Ananyev			rc = fwd_tbl_add(fe,
174aa97dd1cSKonstantin Ananyev				prm->stream[i].fprm.remote_addr.ss_family,
175aa97dd1cSKonstantin Ananyev				(const struct sockaddr *)
176aa97dd1cSKonstantin Ananyev				&prm->stream[i].fprm.remote_addr,
177aa97dd1cSKonstantin Ananyev				fes);
178aa97dd1cSKonstantin Ananyev			if (rc != 0) {
179aa97dd1cSKonstantin Ananyev				netfe_stream_close(fe, fes);
180aa97dd1cSKonstantin Ananyev				break;
181aa97dd1cSKonstantin Ananyev			}
182aa97dd1cSKonstantin Ananyev		} else if (prm->stream[i].op == TXONLY) {
183aa97dd1cSKonstantin Ananyev			fes->txlen = prm->stream[i].txlen;
184aa97dd1cSKonstantin Ananyev			fes->raddr = prm->stream[i].sprm.remote_addr;
185aa97dd1cSKonstantin Ananyev		}
186aa97dd1cSKonstantin Ananyev	}
187aa97dd1cSKonstantin Ananyev
188aa97dd1cSKonstantin Ananyev	return rc;
189aa97dd1cSKonstantin Ananyev}
190aa97dd1cSKonstantin Ananyev
191aa97dd1cSKonstantin Ananyevstatic struct netfe_stream *
192aa97dd1cSKonstantin Ananyevfind_fwd_dst_udp(uint32_t lcore, struct netfe_stream *fes,
193aa97dd1cSKonstantin Ananyev	const struct sockaddr *sa)
194aa97dd1cSKonstantin Ananyev{
195aa97dd1cSKonstantin Ananyev	uint32_t rc;
196aa97dd1cSKonstantin Ananyev	struct netfe_stream *fed;
197aa97dd1cSKonstantin Ananyev	struct netfe_lcore *fe;
198aa97dd1cSKonstantin Ananyev	struct tle_udp_stream_param uprm;
199aa97dd1cSKonstantin Ananyev
200aa97dd1cSKonstantin Ananyev	fe = RTE_PER_LCORE(_fe);
201aa97dd1cSKonstantin Ananyev
202aa97dd1cSKonstantin Ananyev	fed = fwd_tbl_lkp(fe, fes->family, sa);
203aa97dd1cSKonstantin Ananyev	if (fed != NULL)
204aa97dd1cSKonstantin Ananyev		return fed;
205aa97dd1cSKonstantin Ananyev
206aa97dd1cSKonstantin Ananyev	/* create a new stream and put it into the fwd table. */
207aa97dd1cSKonstantin Ananyev	memset(&uprm, 0, sizeof(uprm));
208aa97dd1cSKonstantin Ananyev	uprm.local_addr = fes->fwdprm.local_addr;
209aa97dd1cSKonstantin Ananyev	uprm.remote_addr = fes->fwdprm.remote_addr;
210aa97dd1cSKonstantin Ananyev
211aa97dd1cSKonstantin Ananyev	/* open forward stream with wildcard remote addr. */
212aa97dd1cSKonstantin Ananyev	memset(&uprm.remote_addr.ss_family + 1, 0,
213aa97dd1cSKonstantin Ananyev		sizeof(uprm.remote_addr) - sizeof(uprm.remote_addr.ss_family));
214aa97dd1cSKonstantin Ananyev
215aa97dd1cSKonstantin Ananyev	fed = netfe_stream_open_udp(fe, &fes->fwdprm, lcore, FWD,
216aa97dd1cSKonstantin Ananyev		fes->fwdprm.bidx);
217aa97dd1cSKonstantin Ananyev	if (fed == NULL)
218aa97dd1cSKonstantin Ananyev		return NULL;
219aa97dd1cSKonstantin Ananyev
220aa97dd1cSKonstantin Ananyev	rc = fwd_tbl_add(fe, fes->family, sa, fed);
221aa97dd1cSKonstantin Ananyev	if (rc != 0) {
222aa97dd1cSKonstantin Ananyev		netfe_stream_close(fe, fed);
223aa97dd1cSKonstantin Ananyev		fed = NULL;
224aa97dd1cSKonstantin Ananyev	}
225aa97dd1cSKonstantin Ananyev
226aa97dd1cSKonstantin Ananyev	fed->fwdprm.remote_addr = *(const struct sockaddr_storage *)sa;
227aa97dd1cSKonstantin Ananyev	return fed;
228aa97dd1cSKonstantin Ananyev}
229aa97dd1cSKonstantin Ananyev
230aa97dd1cSKonstantin Ananyevstatic inline int
231aa97dd1cSKonstantin Ananyevnetfe_addr_eq(struct sockaddr_storage *l, struct sockaddr_storage *r,
232aa97dd1cSKonstantin Ananyev	uint16_t family)
233aa97dd1cSKonstantin Ananyev{
234aa97dd1cSKonstantin Ananyev	struct sockaddr_in *l4, *r4;
235aa97dd1cSKonstantin Ananyev	struct sockaddr_in6 *l6, *r6;
236aa97dd1cSKonstantin Ananyev
237aa97dd1cSKonstantin Ananyev	if (family == AF_INET) {
238aa97dd1cSKonstantin Ananyev		l4 = (struct sockaddr_in *)l;
239aa97dd1cSKonstantin Ananyev		r4 = (struct sockaddr_in *)r;
240aa97dd1cSKonstantin Ananyev		return (l4->sin_port == r4->sin_port &&
241aa97dd1cSKonstantin Ananyev				l4->sin_addr.s_addr == r4->sin_addr.s_addr);
242aa97dd1cSKonstantin Ananyev	} else {
243aa97dd1cSKonstantin Ananyev		l6 = (struct sockaddr_in6 *)l;
244aa97dd1cSKonstantin Ananyev		r6 = (struct sockaddr_in6 *)r;
245aa97dd1cSKonstantin Ananyev		return (l6->sin6_port == r6->sin6_port &&
246aa97dd1cSKonstantin Ananyev				memcmp(&l6->sin6_addr, &r6->sin6_addr,
247aa97dd1cSKonstantin Ananyev				sizeof(l6->sin6_addr)));
248aa97dd1cSKonstantin Ananyev	}
249aa97dd1cSKonstantin Ananyev}
250aa97dd1cSKonstantin Ananyev
251aa97dd1cSKonstantin Ananyevstatic inline void
252aa97dd1cSKonstantin Ananyevnetfe_pkt_addr(const struct rte_mbuf *m, struct sockaddr_storage *ps,
253aa97dd1cSKonstantin Ananyev	uint16_t family)
254aa97dd1cSKonstantin Ananyev{
255aa97dd1cSKonstantin Ananyev	const struct ipv4_hdr *ip4h;
256aa97dd1cSKonstantin Ananyev	const struct ipv6_hdr *ip6h;
257aa97dd1cSKonstantin Ananyev	const struct udp_hdr *udph;
258aa97dd1cSKonstantin Ananyev	struct sockaddr_in *in4;
259aa97dd1cSKonstantin Ananyev	struct sockaddr_in6 *in6;
260aa97dd1cSKonstantin Ananyev
261aa97dd1cSKonstantin Ananyev	NETFE_PKT_DUMP(m);
262aa97dd1cSKonstantin Ananyev
263aa97dd1cSKonstantin Ananyev	udph = rte_pktmbuf_mtod_offset(m, struct udp_hdr *, -m->l4_len);
264aa97dd1cSKonstantin Ananyev
265aa97dd1cSKonstantin Ananyev	if (family == AF_INET) {
266aa97dd1cSKonstantin Ananyev		in4 = (struct sockaddr_in *)ps;
267aa97dd1cSKonstantin Ananyev		ip4h = rte_pktmbuf_mtod_offset(m, struct ipv4_hdr *,
268aa97dd1cSKonstantin Ananyev			-(m->l4_len + m->l3_len));
269aa97dd1cSKonstantin Ananyev		in4->sin_port = udph->src_port;
270aa97dd1cSKonstantin Ananyev		in4->sin_addr.s_addr = ip4h->src_addr;
271aa97dd1cSKonstantin Ananyev	} else {
272aa97dd1cSKonstantin Ananyev		in6 = (struct sockaddr_in6 *)ps;
273aa97dd1cSKonstantin Ananyev		ip6h = rte_pktmbuf_mtod_offset(m, struct ipv6_hdr *,
274aa97dd1cSKonstantin Ananyev			-(m->l4_len + m->l3_len));
275aa97dd1cSKonstantin Ananyev		in6->sin6_port = udph->src_port;
276aa97dd1cSKonstantin Ananyev		rte_memcpy(&in6->sin6_addr, ip6h->src_addr,
277aa97dd1cSKonstantin Ananyev			sizeof(in6->sin6_addr));
278aa97dd1cSKonstantin Ananyev	}
279aa97dd1cSKonstantin Ananyev}
280aa97dd1cSKonstantin Ananyev
281aa97dd1cSKonstantin Ananyevstatic inline uint32_t
282aa97dd1cSKonstantin Ananyevpkt_eq_addr(struct rte_mbuf *pkt[], uint32_t num, uint16_t family,
283aa97dd1cSKonstantin Ananyev	struct sockaddr_storage *cur, struct sockaddr_storage *nxt)
284aa97dd1cSKonstantin Ananyev{
285aa97dd1cSKonstantin Ananyev	uint32_t i;
286aa97dd1cSKonstantin Ananyev
287aa97dd1cSKonstantin Ananyev	for (i = 0; i != num; i++) {
288aa97dd1cSKonstantin Ananyev		netfe_pkt_addr(pkt[i], nxt, family);
289aa97dd1cSKonstantin Ananyev		if (netfe_addr_eq(cur, nxt, family) == 0)
290aa97dd1cSKonstantin Ananyev			break;
291aa97dd1cSKonstantin Ananyev	}
292aa97dd1cSKonstantin Ananyev
293aa97dd1cSKonstantin Ananyev	return i;
294aa97dd1cSKonstantin Ananyev}
295aa97dd1cSKonstantin Ananyev
296aa97dd1cSKonstantin Ananyevstatic inline void
297aa97dd1cSKonstantin Ananyevnetfe_fwd_udp(uint32_t lcore, struct netfe_stream *fes)
298aa97dd1cSKonstantin Ananyev{
299aa97dd1cSKonstantin Ananyev	uint32_t i, j, k, n, x;
300aa97dd1cSKonstantin Ananyev	uint16_t family;
301aa97dd1cSKonstantin Ananyev	void *pi0, *pi1, *pt;
302aa97dd1cSKonstantin Ananyev	struct rte_mbuf **pkt;
303aa97dd1cSKonstantin Ananyev	struct netfe_stream *fed;
304aa97dd1cSKonstantin Ananyev	struct sockaddr_storage in[2];
305aa97dd1cSKonstantin Ananyev
306aa97dd1cSKonstantin Ananyev	family = fes->family;
307aa97dd1cSKonstantin Ananyev	n = fes->pbuf.num;
308aa97dd1cSKonstantin Ananyev	pkt = fes->pbuf.pkt;
309aa97dd1cSKonstantin Ananyev
310aa97dd1cSKonstantin Ananyev	if (n == 0)
311aa97dd1cSKonstantin Ananyev		return;
312aa97dd1cSKonstantin Ananyev
313aa97dd1cSKonstantin Ananyev	in[0].ss_family = family;
314aa97dd1cSKonstantin Ananyev	in[1].ss_family = family;
315aa97dd1cSKonstantin Ananyev	pi0 = &in[0];
316aa97dd1cSKonstantin Ananyev	pi1 = &in[1];
317aa97dd1cSKonstantin Ananyev
318aa97dd1cSKonstantin Ananyev	netfe_pkt_addr(pkt[0], pi0, family);
319aa97dd1cSKonstantin Ananyev
320aa97dd1cSKonstantin Ananyev	x = 0;
321aa97dd1cSKonstantin Ananyev	for (i = 0; i != n; i = j) {
322aa97dd1cSKonstantin Ananyev
323aa97dd1cSKonstantin Ananyev		j = i + pkt_eq_addr(&pkt[i + 1],
324aa97dd1cSKonstantin Ananyev			n - i - 1, family, pi0, pi1) + 1;
325aa97dd1cSKonstantin Ananyev
326aa97dd1cSKonstantin Ananyev		fed = find_fwd_dst_udp(lcore, fes,
327aa97dd1cSKonstantin Ananyev			(const struct sockaddr *)pi0);
328aa97dd1cSKonstantin Ananyev		if (fed != NULL) {
329aa97dd1cSKonstantin Ananyev
330aa97dd1cSKonstantin Ananyev			/**
331aa97dd1cSKonstantin Ananyev			 * TODO: cannot use function pointers for unequal
332aa97dd1cSKonstantin Ananyev			 * number of params.
333aa97dd1cSKonstantin Ananyev			 */
334aa97dd1cSKonstantin Ananyev			k = tle_udp_stream_send(fed->s, pkt + i, j - i,
335aa97dd1cSKonstantin Ananyev				(const struct sockaddr *)
336aa97dd1cSKonstantin Ananyev				&fes->fwdprm.remote_addr);
337aa97dd1cSKonstantin Ananyev
338aa97dd1cSKonstantin Ananyev			NETFE_TRACE("%s(%u): tle_%s_stream_send(%p, %u) "
339aa97dd1cSKonstantin Ananyev				"returns %u\n",
340aa97dd1cSKonstantin Ananyev				__func__, lcore, proto_name[fes->proto],
341aa97dd1cSKonstantin Ananyev				fed->s, j - i, k);
342aa97dd1cSKonstantin Ananyev
343aa97dd1cSKonstantin Ananyev			fed->stat.txp += k;
344aa97dd1cSKonstantin Ananyev			fed->stat.drops += j - i - k;
345aa97dd1cSKonstantin Ananyev			fes->stat.fwp += k;
346aa97dd1cSKonstantin Ananyev
347aa97dd1cSKonstantin Ananyev		} else {
348aa97dd1cSKonstantin Ananyev			NETFE_TRACE("%s(%u, %p): no fwd stream for %u pkts;\n",
349aa97dd1cSKonstantin Ananyev				__func__, lcore, fes->s, j - i);
350aa97dd1cSKonstantin Ananyev			for (k = i; k != j; k++) {
351aa97dd1cSKonstantin Ananyev				NETFE_TRACE("%s(%u, %p): free(%p);\n",
352aa97dd1cSKonstantin Ananyev				__func__, lcore, fes->s, pkt[k]);
353aa97dd1cSKonstantin Ananyev				rte_pktmbuf_free(pkt[j]);
354aa97dd1cSKonstantin Ananyev			}
355aa97dd1cSKonstantin Ananyev			fes->stat.drops += j - i;
356aa97dd1cSKonstantin Ananyev		}
357aa97dd1cSKonstantin Ananyev
358aa97dd1cSKonstantin Ananyev		/* copy unforwarded mbufs. */
359aa97dd1cSKonstantin Ananyev		for (i += k; i != j; i++, x++)
360aa97dd1cSKonstantin Ananyev			pkt[x] = pkt[i];
361