1/*
2 * Copyright (c) 2016  Intel Corporation.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#ifndef UDP_H_
17#define UDP_H_
18
19/*
20 * helper function: opens IPv4 and IPv6 streams for selected port.
21 */
22static struct netfe_stream *
23netfe_stream_open_udp(struct netfe_lcore *fe, struct netfe_sprm *sprm,
24	uint32_t lcore, uint16_t op, uint32_t bidx)
25{
26	int32_t rc;
27	struct netfe_stream *fes;
28	struct sockaddr_in *l4;
29	struct sockaddr_in6 *l6;
30	uint16_t errport;
31	struct tle_udp_stream_param uprm;
32
33	fes = netfe_get_stream(&fe->free);
34	if (fes == NULL) {
35		rte_errno = ENOBUFS;
36		return NULL;
37	}
38
39	fes->rxev = tle_event_alloc(fe->rxeq, fes);
40	fes->txev = tle_event_alloc(fe->txeq, fes);
41
42	if (fes->rxev == NULL || fes->txev == NULL) {
43		netfe_stream_close(fe, fes);
44		rte_errno = ENOMEM;
45		return NULL;
46	}
47
48	if (op == TXONLY || op == FWD) {
49		tle_event_active(fes->txev, TLE_SEV_DOWN);
50		fes->stat.txev[TLE_SEV_DOWN]++;
51	}
52
53	if (op != TXONLY) {
54		tle_event_active(fes->rxev, TLE_SEV_DOWN);
55		fes->stat.rxev[TLE_SEV_DOWN]++;
56	}
57
58	memset(&uprm, 0, sizeof(uprm));
59	uprm.local_addr = sprm->local_addr;
60	uprm.remote_addr = sprm->remote_addr;
61	uprm.recv_ev = fes->rxev;
62	if (op != FWD)
63		uprm.send_ev = fes->txev;
64	fes->s = tle_udp_stream_open(becfg.cpu[bidx].ctx, &uprm);
65
66	if (fes->s == NULL) {
67		rc = rte_errno;
68		netfe_stream_close(fe, fes);
69		rte_errno = rc;
70
71		if (sprm->local_addr.ss_family == AF_INET) {
72			l4 = (struct sockaddr_in *) &sprm->local_addr;
73			errport = ntohs(l4->sin_port);
74		} else {
75			l6 = (struct sockaddr_in6 *) &sprm->local_addr;
76			errport = ntohs(l6->sin6_port);
77		}
78
79		RTE_LOG(ERR, USER1, "stream open failed for port %u with error "
80			"code=%u, bidx=%u, lc=%u\n",
81			errport, rc, bidx, becfg.cpu[bidx].id);
82		return NULL;
83	}
84
85	RTE_LOG(NOTICE, USER1,
86		"%s(%u)={s=%p, op=%hu, proto=%s, rxev=%p, txev=%p}, belc=%u\n",
87		__func__, lcore, fes->s, op, proto_name[becfg.proto],
88		fes->rxev, fes->txev, becfg.cpu[bidx].id);
89
90	fes->op = op;
91	fes->proto = becfg.proto;
92	fes->family = sprm->local_addr.ss_family;
93
94	return fes;
95}
96
97static int
98netfe_lcore_init_udp(const struct netfe_lcore_prm *prm)
99{
100	size_t sz;
101	int32_t rc;
102	uint32_t i, lcore, snum;
103	struct netfe_lcore *fe;
104	struct tle_evq_param eprm;
105	struct netfe_stream *fes;
106	struct netfe_sprm *sprm;
107
108	lcore = rte_lcore_id();
109
110	snum = prm->max_streams;
111	RTE_LOG(NOTICE, USER1, "%s(lcore=%u, nb_streams=%u, max_streams=%u)\n",
112		__func__, lcore, prm->nb_streams, snum);
113
114	memset(&eprm, 0, sizeof(eprm));
115	eprm.socket_id = rte_lcore_to_socket_id(lcore);
116	eprm.max_events = snum;
117
118	sz = sizeof(*fe) + snum * sizeof(struct netfe_stream);
119	fe = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
120		rte_lcore_to_socket_id(lcore));
121
122	if (fe == NULL) {
123		RTE_LOG(ERR, USER1, "%s:%d failed to allocate %zu bytes\n",
124			__func__, __LINE__, sz);
125		return -ENOMEM;
126	}
127
128	RTE_PER_LCORE(_fe) = fe;
129
130	fe->snum = snum;
131	/* initialize the stream pool */
132	LIST_INIT(&fe->free.head);
133	LIST_INIT(&fe->use.head);
134	fes = (struct netfe_stream *)(fe + 1);
135	for (i = 0; i != snum; i++, fes++)
136		netfe_put_stream(fe, &fe->free, fes);
137
138	/* allocate the event queues */
139	fe->rxeq = tle_evq_create(&eprm);
140	fe->txeq = tle_evq_create(&eprm);
141
142	RTE_LOG(INFO, USER1, "%s(%u) rx evq=%p, tx evq=%p\n",
143		__func__, lcore, fe->rxeq, fe->txeq);
144	if (fe->rxeq == NULL || fe->txeq == NULL)
145		return -ENOMEM;
146
147	rc = fwd_tbl_init(fe, AF_INET, lcore);
148	RTE_LOG(ERR, USER1, "%s(%u) fwd_tbl_init(%u) returns %d\n",
149		__func__, lcore, AF_INET, rc);
150	if (rc != 0)
151		return rc;
152
153	rc = fwd_tbl_init(fe, AF_INET6, lcore);
154	RTE_LOG(ERR, USER1, "%s(%u) fwd_tbl_init(%u) returns %d\n",
155		__func__, lcore, AF_INET6, rc);
156	if (rc != 0)
157		return rc;
158
159	/* open all requested streams. */
160	for (i = 0; i != prm->nb_streams; i++) {
161		sprm = &prm->stream[i].sprm;
162		fes = netfe_stream_open_udp(fe, sprm, lcore, prm->stream[i].op,
163			sprm->bidx);
164		if (fes == NULL) {
165			rc = -rte_errno;
166			break;
167		}
168
169		netfe_stream_dump(fes, &sprm->local_addr, &sprm->remote_addr);
170
171		if (prm->stream[i].op == FWD) {
172			fes->fwdprm = prm->stream[i].fprm;
173			rc = fwd_tbl_add(fe,
174				prm->stream[i].fprm.remote_addr.ss_family,
175				(const struct sockaddr *)
176				&prm->stream[i].fprm.remote_addr,
177				fes);
178			if (rc != 0) {
179				netfe_stream_close(fe, fes);
180				break;
181			}
182		} else if (prm->stream[i].op == TXONLY) {
183			fes->txlen = prm->stream[i].txlen;
184			fes->raddr = prm->stream[i].sprm.remote_addr;
185		}
186	}
187
188	return rc;
189}
190
191static struct netfe_stream *
192find_fwd_dst_udp(uint32_t lcore, struct netfe_stream *fes,
193	const struct sockaddr *sa)
194{
195	uint32_t rc;
196	struct netfe_stream *fed;
197	struct netfe_lcore *fe;
198	struct tle_udp_stream_param uprm;
199
200	fe = RTE_PER_LCORE(_fe);
201
202	fed = fwd_tbl_lkp(fe, fes->family, sa);
203	if (fed != NULL)
204		return fed;
205
206	/* create a new stream and put it into the fwd table. */
207	memset(&uprm, 0, sizeof(uprm));
208	uprm.local_addr = fes->fwdprm.local_addr;
209	uprm.remote_addr = fes->fwdprm.remote_addr;
210
211	/* open forward stream with wildcard remote addr. */
212	memset(&uprm.remote_addr.ss_family + 1, 0,
213		sizeof(uprm.remote_addr) - sizeof(uprm.remote_addr.ss_family));
214
215	fed = netfe_stream_open_udp(fe, &fes->fwdprm, lcore, FWD,
216		fes->fwdprm.bidx);
217	if (fed == NULL)
218		return NULL;
219
220	rc = fwd_tbl_add(fe, fes->family, sa, fed);
221	if (rc != 0) {
222		netfe_stream_close(fe, fed);
223		fed = NULL;
224	}
225
226	fed->fwdprm.remote_addr = *(const struct sockaddr_storage *)sa;
227	return fed;
228}
229
230static inline int
231netfe_addr_eq(struct sockaddr_storage *l, struct sockaddr_storage *r,
232	uint16_t family)
233{
234	struct sockaddr_in *l4, *r4;
235	struct sockaddr_in6 *l6, *r6;
236
237	if (family == AF_INET) {
238		l4 = (struct sockaddr_in *)l;
239		r4 = (struct sockaddr_in *)r;
240		return (l4->sin_port == r4->sin_port &&
241				l4->sin_addr.s_addr == r4->sin_addr.s_addr);
242	} else {
243		l6 = (struct sockaddr_in6 *)l;
244		r6 = (struct sockaddr_in6 *)r;
245		return (l6->sin6_port == r6->sin6_port &&
246				memcmp(&l6->sin6_addr, &r6->sin6_addr,
247				sizeof(l6->sin6_addr)));
248	}
249}
250
251static inline void
252netfe_pkt_addr(const struct rte_mbuf *m, struct sockaddr_storage *ps,
253	uint16_t family)
254{
255	const struct ipv4_hdr *ip4h;
256	const struct ipv6_hdr *ip6h;
257	const struct udp_hdr *udph;
258	struct sockaddr_in *in4;
259	struct sockaddr_in6 *in6;
260
261	NETFE_PKT_DUMP(m);
262
263	udph = rte_pktmbuf_mtod_offset(m, struct udp_hdr *, -m->l4_len);
264
265	if (family == AF_INET) {
266		in4 = (struct sockaddr_in *)ps;
267		ip4h = rte_pktmbuf_mtod_offset(m, struct ipv4_hdr *,
268			-(m->l4_len + m->l3_len));
269		in4->sin_port = udph->src_port;
270		in4->sin_addr.s_addr = ip4h->src_addr;
271	} else {
272		in6 = (struct sockaddr_in6 *)ps;
273		ip6h = rte_pktmbuf_mtod_offset(m, struct ipv6_hdr *,
274			-(m->l4_len + m->l3_len));
275		in6->sin6_port = udph->src_port;
276		rte_memcpy(&in6->sin6_addr, ip6h->src_addr,
277			sizeof(in6->sin6_addr));
278	}
279}
280
281static inline uint32_t
282pkt_eq_addr(struct rte_mbuf *pkt[], uint32_t num, uint16_t family,
283	struct sockaddr_storage *cur, struct sockaddr_storage *nxt)
284{
285	uint32_t i;
286
287	for (i = 0; i != num; i++) {
288		netfe_pkt_addr(pkt[i], nxt, family);
289		if (netfe_addr_eq(cur, nxt, family) == 0)
290			break;
291	}
292
293	return i;
294}
295
296static inline void
297netfe_fwd_udp(uint32_t lcore, struct netfe_stream *fes)
298{
299	uint32_t i, j, k, n, x;
300	uint16_t family;
301	void *pi0, *pi1, *pt;
302	struct rte_mbuf **pkt;
303	struct netfe_stream *fed;
304	struct sockaddr_storage in[2];
305
306	family = fes->family;
307	n = fes->pbuf.num;
308	pkt = fes->pbuf.pkt;
309
310	if (n == 0)
311		return;
312
313	in[0].ss_family = family;
314	in[1].ss_family = family;
315	pi0 = &in[0];
316	pi1 = &in[1];
317
318	netfe_pkt_addr(pkt[0], pi0, family);
319
320	x = 0;
321	for (i = 0; i != n; i = j) {
322
323		j = i + pkt_eq_addr(&pkt[i + 1],
324			n - i - 1, family, pi0, pi1) + 1;
325
326		fed = find_fwd_dst_udp(lcore, fes,
327			(const struct sockaddr *)pi0);
328		if (fed != NULL) {
329
330			/**
331			 * TODO: cannot use function pointers for unequal
332			 * number of params.
333			 */
334			k = tle_udp_stream_send(fed->s, pkt + i, j - i,
335				(const struct sockaddr *)
336				&fes->fwdprm.remote_addr);
337
338			NETFE_TRACE("%s(%u): tle_%s_stream_send(%p, %u) "
339				"returns %u\n",
340				__func__, lcore, proto_name[fes->proto],
341				fed->s, j - i, k);
342
343			fed->stat.txp += k;
344			fed->stat.drops += j - i - k;
345			fes->stat.fwp += k;
346
347		} else {
348			NETFE_TRACE("%s(%u, %p): no fwd stream for %u pkts;\n",
349				__func__, lcore, fes->s, j - i);
350			for (k = i; k != j; k++) {
351				NETFE_TRACE("%s(%u, %p): free(%p);\n",
352				__func__, lcore, fes->s, pkt[k]);
353				rte_pktmbuf_free(pkt[j]);
354			}
355			fes->stat.drops += j - i;
356		}
357
358		/* copy unforwarded mbufs. */
359		for (i += k; i != j; i++, x++)
360			pkt[x] = pkt[i];
361
362		/* swap the pointers */
363		pt = pi0;
364		pi0 = pi1;
365		pi1 = pt;
366	}
367
368	fes->pbuf.num = x;
369
370	if (x != 0) {
371		tle_event_raise(fes->txev);
372		fes->stat.txev[TLE_SEV_UP]++;
373	}
374
375	if (n == RTE_DIM(fes->pbuf.pkt)) {
376		tle_event_active(fes->rxev, TLE_SEV_UP);
377		fes->stat.rxev[TLE_SEV_UP]++;
378	}
379}
380
381static inline void
382netfe_rxtx_process_udp(__rte_unused uint32_t lcore, struct netfe_stream *fes)
383{
384	uint32_t i, j, k, n;
385	uint16_t family;
386	void *pi0, *pi1, *pt;
387	struct rte_mbuf **pkt;
388	struct sockaddr_storage in[2];
389
390	family = fes->family;
391	n = fes->pbuf.num;
392	pkt = fes->pbuf.pkt;
393
394	/* there is nothing to send. */
395	if (n == 0) {
396		tle_event_idle(fes->txev);
397		fes->stat.txev[TLE_SEV_IDLE]++;
398		return;
399	}
400
401	in[0].ss_family = family;
402	in[1].ss_family = family;
403	pi0 = &in[0];
404	pi1 = &in[1];
405
406	netfe_pkt_addr(pkt[0], pi0, family);
407
408	for (i = 0; i != n; i = j) {
409
410		j = i + pkt_eq_addr(&pkt[i + 1],
411			n - i - 1, family, pi0, pi1) + 1;
412
413		/**
414		 * TODO: cannot use function pointers for unequal param num.
415		 */
416		k = tle_udp_stream_send(fes->s, pkt + i, j - i,
417			(const struct sockaddr *)pi0);
418
419		NETFE_TRACE("%s(%u): tle_%s_stream_send(%p, %u) returns %u\n",
420			__func__, lcore, proto_name[fes->proto],
421			fes->s, j - i, k);
422		fes->stat.txp += k;
423		fes->stat.drops += j - i - k;
424
425		i += k;
426
427		/* stream send buffer is full */
428		if (i != j)
429			break;
430
431		/* swap the pointers */
432		pt = pi0;
433		pi0 = pi1;
434		pi1 = pt;
435	}
436
437	/* not able to send anything. */
438	if (i == 0)
439		return;
440
441	if (n == RTE_DIM(fes->pbuf.pkt)) {
442		/* mark stream as readable */
443		tle_event_active(fes->rxev, TLE_SEV_UP);
444		fes->stat.rxev[TLE_SEV_UP]++;
445	}
446
447	/* adjust pbuf array. */
448	fes->pbuf.num = n - i;
449	for (j = i; j != n; j++)
450		pkt[j - i] = pkt[j];
451}
452
453static inline void
454netfe_tx_process_udp(uint32_t lcore, struct netfe_stream *fes)
455{
456	uint32_t i, k, n;
457
458	/* refill with new mbufs. */
459	pkt_buf_fill(lcore, &fes->pbuf, fes->txlen);
460
461	n = fes->pbuf.num;
462	if (n == 0)
463		return;
464
465	/**
466	 * TODO: cannot use function pointers for unequal param num.
467	 */
468	k = tle_udp_stream_send(fes->s, fes->pbuf.pkt, n, NULL);
469	NETFE_TRACE("%s(%u): tle_%s_stream_send(%p, %u) returns %u\n",
470		__func__, lcore, proto_name[fes->proto], fes->s, n, k);
471	fes->stat.txp += k;
472	fes->stat.drops += n - k;
473
474	if (k == 0)
475		return;
476
477	/* adjust pbuf array. */
478	fes->pbuf.num = n - k;
479	for (i = k; i != n; i++)
480		fes->pbuf.pkt[i - k] = fes->pbuf.pkt[i];
481}
482
483static inline void
484netfe_lcore_udp(void)
485{
486	struct netfe_lcore *fe;
487	uint32_t j, n, lcore;
488	struct netfe_stream *fs[MAX_PKT_BURST];
489
490	fe = RTE_PER_LCORE(_fe);
491	if (fe == NULL)
492		return;
493
494	lcore = rte_lcore_id();
495
496	/* look for rx events */
497	n = tle_evq_get(fe->rxeq, (const void **)(uintptr_t)fs, RTE_DIM(fs));
498
499	if (n != 0) {
500		NETFE_TRACE("%s(%u): tle_evq_get(rxevq=%p) returns %u\n",
501			__func__, lcore, fe->rxeq, n);
502		for (j = 0; j != n; j++)
503			netfe_rx_process(lcore, fs[j]);
504	}
505
506	/* look for tx events */
507	n = tle_evq_get(fe->txeq, (const void **)(uintptr_t)fs, RTE_DIM(fs));
508
509	if (n != 0) {
510		NETFE_TRACE("%s(%u): tle_evq_get(txevq=%p) returns %u\n",
511			__func__, lcore, fe->txeq, n);
512		for (j = 0; j != n; j++) {
513			if (fs[j]->op == ECHO)
514				netfe_rxtx_process_udp(lcore, fs[j]);
515			else if (fs[j]->op == FWD)
516				netfe_fwd_udp(lcore, fs[j]);
517			else if (fs[j]->op == TXONLY)
518				netfe_tx_process_udp(lcore, fs[j]);
519		}
520	}
521}
522
523static void
524netfe_lcore_fini_udp(void)
525{
526	struct netfe_lcore *fe;
527	uint32_t i;
528	struct tle_udp_stream_param uprm;
529	struct netfe_stream *fes;
530
531	fe = RTE_PER_LCORE(_fe);
532	if (fe == NULL)
533		return;
534
535	for (i = 0; i != fe->use.num; i++) {
536		fes = netfe_get_stream(&fe->use);
537		tle_udp_stream_get_param(fes->s, &uprm);
538		netfe_stream_dump(fes, &uprm.local_addr, &uprm.remote_addr);
539		netfe_stream_close(fe, fes);
540	}
541
542	tle_evq_destroy(fe->txeq);
543	tle_evq_destroy(fe->rxeq);
544	RTE_PER_LCORE(_fe) = NULL;
545	rte_free(fe);
546}
547
548static int
549lcore_main_udp(void *arg)
550{
551	int32_t rc;
552	uint32_t lcore;
553	struct lcore_prm *prm;
554
555	prm = arg;
556	lcore = rte_lcore_id();
557
558	RTE_LOG(NOTICE, USER1, "%s(lcore=%u) start\n",
559		__func__, lcore);
560
561	rc = 0;
562
563	/* lcore FE init. */
564	if (prm->fe.max_streams != 0)
565		rc = netfe_lcore_init_udp(&prm->fe);
566
567	/* lcore FE init. */
568	if (rc == 0 && prm->be.lc != NULL)
569		rc = netbe_lcore_setup(prm->be.lc);
570
571	if (rc != 0)
572		sig_handle(SIGQUIT);
573
574	while (force_quit == 0) {
575		netfe_lcore_udp();
576		netbe_lcore();
577	}
578
579	RTE_LOG(NOTICE, USER1, "%s(lcore=%u) finish\n",
580		__func__, lcore);
581
582	netfe_lcore_fini_udp();
583	netbe_lcore_clear();
584
585	return rc;
586}
587
588#endif /* UDP_H_ */
589