udp_rxtx.c revision c5f8f7f0
1/*
2 * Copyright (c) 2016  Intel Corporation.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#include <rte_malloc.h>
17#include <rte_errno.h>
18#include <rte_ethdev.h>
19#include <rte_ip.h>
20#include <rte_ip_frag.h>
21#include <rte_udp.h>
22
23#include "udp_stream.h"
24#include "misc.h"
25
26static inline struct tle_udp_stream *
27rx_stream_obtain(struct tle_dev *dev, uint32_t type, uint32_t port)
28{
29	struct tle_udp_stream *s;
30
31	if (type >= TLE_VNUM || dev->dp[type] == NULL)
32		return NULL;
33
34	s = (struct tle_udp_stream *)dev->dp[type]->streams[port];
35	if (s == NULL)
36		return NULL;
37
38	if (rwl_acquire(&s->rx.use) < 0)
39		return NULL;
40
41	return s;
42}
43
44static inline uint16_t
45get_pkt_type(const struct rte_mbuf *m)
46{
47	uint32_t v;
48
49	v = m->packet_type &
50		(RTE_PTYPE_L3_IPV4 | RTE_PTYPE_L3_IPV6 | RTE_PTYPE_L4_MASK);
51	if (v == (RTE_PTYPE_L3_IPV4 | RTE_PTYPE_L4_UDP))
52		return TLE_V4;
53	else if (v == (RTE_PTYPE_L3_IPV6 | RTE_PTYPE_L4_UDP))
54		return TLE_V6;
55	else
56		return TLE_VNUM;
57}
58
59static inline union l4_ports
60pkt_info(struct rte_mbuf *m, union l4_ports *ports, union ipv4_addrs *addr4,
61	union ipv6_addrs **addr6)
62{
63	uint32_t len;
64	union l4_ports ret, *up;
65	union ipv4_addrs *pa4;
66
67	ret.src = get_pkt_type(m);
68
69	len = m->l2_len;
70	if (ret.src == TLE_V4) {
71		pa4 = rte_pktmbuf_mtod_offset(m, union ipv4_addrs *,
72			len + offsetof(struct ipv4_hdr, src_addr));
73		addr4->raw = pa4->raw;
74	} else if (ret.src == TLE_V6) {
75		*addr6 = rte_pktmbuf_mtod_offset(m, union ipv6_addrs *,
76			len + offsetof(struct ipv6_hdr, src_addr));
77	}
78
79	len += m->l3_len;
80	up = rte_pktmbuf_mtod_offset(m, union l4_ports *,
81		len + offsetof(struct udp_hdr, src_port));
82	ports->raw = up->raw;
83	ret.dst = ports->dst;
84	return ret;
85}
86
87/*
88 * Helper routine, enqueues packets to the stream and calls RX
89 * notification callback, if needed.
90 */
91static inline uint16_t
92rx_stream(struct tle_udp_stream *s, void *mb[], struct rte_mbuf *rp[],
93	int32_t rc[], uint32_t num)
94{
95	uint32_t i, k, r;
96
97	r = _rte_ring_enqueue_burst(s->rx.q, mb, num);
98
99	/* if RX queue was empty invoke user RX notification callback. */
100	if (s->rx.cb.func != NULL && r != 0 && rte_ring_count(s->rx.q) == r)
101		s->rx.cb.func(s->rx.cb.data, &s->s);
102
103	for (i = r, k = 0; i != num; i++, k++) {
104		rc[k] = ENOBUFS;
105		rp[k] = mb[i];
106	}
107
108	return r;
109}
110
111static inline uint16_t
112rx_stream6(struct tle_udp_stream *s, struct rte_mbuf *pkt[],
113	union ipv6_addrs *addr[], union l4_ports port[],
114	struct rte_mbuf *rp[], int32_t rc[], uint16_t num)
115{
116	uint32_t i, k, n;
117	void *mb[num];
118
119	k = 0;
120	n = 0;
121
122	for (i = 0; i != num; i++) {
123
124		if ((port[i].raw & s->s.pmsk.raw) != s->s.port.raw ||
125				ymm_mask_cmp(&addr[i]->raw, &s->s.ipv6.addr.raw,
126				&s->s.ipv6.mask.raw) != 0) {
127			rc[k] = ENOENT;
128			rp[k] = pkt[i];
129			k++;
130		} else {
131			mb[n] = pkt[i];
132			n++;
133		}
134	}
135
136	return rx_stream(s, mb, rp + k, rc + k, n);
137}
138
139static inline uint16_t
140rx_stream4(struct tle_udp_stream *s, struct rte_mbuf *pkt[],
141	union ipv4_addrs addr[], union l4_ports port[],
142	struct rte_mbuf *rp[], int32_t rc[], uint16_t num)
143{
144	uint32_t i, k, n;
145	void *mb[num];
146
147	k = 0;
148	n = 0;
149
150	for (i = 0; i != num; i++) {
151
152		if ((addr[i].raw & s->s.ipv4.mask.raw) != s->s.ipv4.addr.raw ||
153				(port[i].raw & s->s.pmsk.raw) !=
154				s->s.port.raw) {
155			rc[k] = ENOENT;
156			rp[k] = pkt[i];
157			k++;
158		} else {
159			mb[n] = pkt[i];
160			n++;
161		}
162	}
163
164	return rx_stream(s, mb, rp + k, rc + k, n);
165}
166
167uint16_t
168tle_udp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[],
169	struct rte_mbuf *rp[], int32_t rc[], uint16_t num)
170{
171	struct tle_udp_stream *s;
172	uint32_t i, j, k, n, p, t;
173	union l4_ports tp[num], port[num];
174	union ipv4_addrs a4[num];
175	union ipv6_addrs *pa6[num];
176
177	for (i = 0; i != num; i++)
178		tp[i] = pkt_info(pkt[i], &port[i], &a4[i], &pa6[i]);
179
180	k = 0;
181	for (i = 0; i != num; i = j) {
182
183		for (j = i + 1; j != num && tp[j].raw == tp[i].raw; j++)
184			;
185
186		t = tp[i].src;
187		p = tp[i].dst;
188		s = rx_stream_obtain(dev, t, p);
189		if (s != NULL) {
190
191			if (t == TLE_V4)
192				n = rx_stream4(s, pkt + i, a4 + i,
193					port + i, rp + k, rc + k, j - i);
194			else
195				n = rx_stream6(s, pkt + i, pa6 + i, port + i,
196					rp + k, rc + k, j - i);
197
198			k += j - i - n;
199
200			if (s->rx.ev != NULL)
201				tle_event_raise(s->rx.ev);
202			rwl_release(&s->rx.use);
203
204		} else {
205			for (; i != j; i++) {
206				rc[k] = ENOENT;
207				rp[k] = pkt[i];
208				k++;
209			}
210		}
211	}
212
213	return num - k;
214}
215
216static inline void
217stream_drb_release(struct tle_udp_stream *s, struct tle_drb *drb[],
218	uint32_t nb_drb)
219{
220	uint32_t n;
221
222	n = rte_ring_count(s->tx.drb.r);
223	_rte_ring_enqueue_burst(s->tx.drb.r, (void **)drb, nb_drb);
224
225	/* If stream is still open, then mark it as avaialble for writing. */
226	if (rwl_try_acquire(&s->tx.use) > 0) {
227
228		if (s->tx.ev != NULL)
229			tle_event_raise(s->tx.ev);
230
231		/* if stream send buffer was full invoke TX callback */
232		else if (s->tx.cb.func != NULL && n == 0)
233			s->tx.cb.func(s->tx.cb.data, &s->s);
234
235	}
236
237	rwl_release(&s->tx.use);
238}
239
240uint16_t
241tle_udp_tx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], uint16_t num)
242{
243	uint32_t i, j, k, n;
244	struct tle_drb *drb[num];
245	struct tle_udp_stream *s;
246
247	/* extract packets from device TX queue. */
248
249	k = num;
250	n = tle_dring_sc_dequeue(&dev->tx.dr, (const void **)(uintptr_t)pkt,
251		num, drb, &k);
252
253	if (n == 0)
254		return 0;
255
256	/* free empty drbs and notify related streams. */
257
258	for (i = 0; i != k; i = j) {
259		s = drb[i]->udata;
260		for (j = i + 1; j != k && s == drb[j]->udata; j++)
261			;
262		stream_drb_release(s, drb + i, j - i);
263	}
264
265	return n;
266}
267
268/*
269 * helper function, do the necessary pre-processing for the received packets
270 * before handiing them to the strem_recv caller.
271 */
272static inline uint32_t
273recv_pkt_process(struct rte_mbuf *m[], uint32_t num, uint32_t type)
274{
275	uint32_t i, k;
276	uint64_t flg[num], ofl[num];
277
278	for (i = 0; i != num; i++) {
279		flg[i] = m[i]->ol_flags;
280		ofl[i] = m[i]->tx_offload;
281	}
282
283	k = 0;
284	for (i = 0; i != num; i++) {
285
286		/* drop packets with invalid cksum(s). */
287		if (check_pkt_csum(m[i], flg[i], type, IPPROTO_UDP) != 0) {
288			rte_pktmbuf_free(m[i]);
289			m[i] = NULL;
290			k++;
291		} else
292			rte_pktmbuf_adj(m[i], _tx_offload_l4_offset(ofl[i]));
293	}
294
295	return k;
296}
297
298uint16_t
299tle_udp_stream_recv(struct tle_stream *us, struct rte_mbuf *pkt[], uint16_t num)
300{
301	uint32_t k, n;
302	struct tle_udp_stream *s;
303
304	s = UDP_STREAM(us);
305	n = _rte_ring_mc_dequeue_burst(s->rx.q, (void **)pkt, num);
306	if (n == 0)
307		return 0;
308
309	/*
310	 * if we still have packets to read,
311	 * then rearm stream RX event.
312	 */
313	if (n == num && rte_ring_count(s->rx.q) != 0) {
314		if (rwl_try_acquire(&s->rx.use) > 0 && s->rx.ev != NULL)
315			tle_event_raise(s->rx.ev);
316		rwl_release(&s->rx.use);
317	}
318
319	k = recv_pkt_process(pkt, n, s->s.type);
320	return compress_pkt_list(pkt, n, k);
321}
322
323static inline int
324udp_fill_mbuf(struct rte_mbuf *m,
325	uint32_t type, uint64_t ol_flags, uint32_t pid,
326	union udph udph, const struct tle_dest *dst)
327{
328	uint32_t len, plen;
329	char *l2h;
330	union udph *l4h;
331
332	len = dst->l2_len + dst->l3_len;
333	plen = m->pkt_len;
334
335	/* copy to mbuf L2/L3 header template. */
336
337	l2h = rte_pktmbuf_prepend(m, len + sizeof(*l4h));
338	if (l2h == NULL)
339		return -ENOBUFS;
340
341	/* copy L2/L3 header */
342	rte_memcpy(l2h, dst->hdr, len);
343
344	/* copy UDP header */
345	l4h = (union udph *)(l2h + len);
346	l4h->raw = udph.raw;
347
348	/* setup mbuf TX offload related fields. */
349	m->tx_offload = _mbuf_tx_offload(dst->l2_len, dst->l3_len,
350		sizeof(*l4h), 0, 0, 0);
351	m->ol_flags |= ol_flags;
352
353	l4h->len = rte_cpu_to_be_16(plen + sizeof(*l4h));
354
355	/* update proto specific fields. */
356
357	if (type == TLE_V4) {
358		struct ipv4_hdr *l3h;
359		l3h = (struct ipv4_hdr *)(l2h + dst->l2_len);
360		l3h->packet_id = rte_cpu_to_be_16(pid);
361		l3h->total_length = rte_cpu_to_be_16(plen + dst->l3_len +
362			sizeof(*l4h));
363
364		if ((ol_flags & PKT_TX_UDP_CKSUM) != 0)
365			l4h->cksum = _ipv4x_phdr_cksum(l3h, m->l3_len,
366				ol_flags);
367		else
368			l4h->cksum = _ipv4_udptcp_mbuf_cksum(m, len, l3h);
369
370		if ((ol_flags & PKT_TX_IP_CKSUM) == 0)
371			l3h->hdr_checksum = _ipv4x_cksum(l3h, m->l3_len);
372	} else {
373		struct ipv6_hdr *l3h;
374		l3h = (struct ipv6_hdr *)(l2h + dst->l2_len);
375		l3h->payload_len = rte_cpu_to_be_16(plen + sizeof(*l4h));
376		if ((ol_flags & PKT_TX_UDP_CKSUM) != 0)
377			l4h->cksum = rte_ipv6_phdr_cksum(l3h, ol_flags);
378		else
379			l4h->cksum = _ipv6_udptcp_mbuf_cksum(m, len, l3h);
380	}
381
382	return 0;
383}
384
385/* ???
386 * probably this function should be there -
387 * rte_ipv[4,6]_fragment_packet should do that.
388 */
389static inline void
390frag_fixup(const struct rte_mbuf *ms, struct rte_mbuf *mf, uint32_t type)
391{
392	struct ipv4_hdr *l3h;
393
394	mf->ol_flags = ms->ol_flags;
395	mf->tx_offload = ms->tx_offload;
396
397	if (type == TLE_V4 && (ms->ol_flags & PKT_TX_IP_CKSUM) == 0) {
398		l3h = rte_pktmbuf_mtod(mf, struct ipv4_hdr *);
399		l3h->hdr_checksum = _ipv4x_cksum(l3h, mf->l3_len);
400	}
401}
402
403/*
404 * Returns negative for failure to fragment or actual number of fragments.
405 */
406static inline int
407fragment(struct rte_mbuf *pkt, struct rte_mbuf *frag[], uint32_t num,
408	uint32_t type, const struct tle_dest *dst)
409{
410	int32_t frag_num, i;
411	uint16_t mtu;
412	void *eth_hdr;
413
414	/* Remove the Ethernet header from the input packet */
415	rte_pktmbuf_adj(pkt, dst->l2_len);
416	mtu = dst->mtu - dst->l2_len;
417
418	/* fragment packet */
419	if (type == TLE_V4)
420		frag_num = rte_ipv4_fragment_packet(pkt, frag, num, mtu,
421			dst->head_mp, dst->head_mp);
422	else
423		frag_num = rte_ipv6_fragment_packet(pkt, frag, num, mtu,
424			dst->head_mp, dst->head_mp);
425
426	if (frag_num > 0) {
427		for (i = 0; i != frag_num; i++) {
428
429			frag_fixup(pkt, frag[i], type);
430
431			/* Move data_off to include l2 header first */
432			eth_hdr = rte_pktmbuf_prepend(frag[i], dst->l2_len);
433
434			/* copy l2 header into fragment */
435			rte_memcpy(eth_hdr, dst->hdr, dst->l2_len);
436		}
437	}
438
439	return frag_num;
440}
441
442static inline void
443stream_drb_free(struct tle_udp_stream *s, struct tle_drb *drbs[],
444	uint32_t nb_drb)
445{
446	_rte_ring_enqueue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
447}
448
449static inline uint32_t
450stream_drb_alloc(struct tle_udp_stream *s, struct tle_drb *drbs[],
451	uint32_t nb_drb)
452{
453	return _rte_ring_dequeue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
454}
455
456/* enqueue up to num packets to the destination device queue. */
457static inline uint16_t
458queue_pkt_out(struct tle_udp_stream *s, struct tle_dev *dev,
459		const void *pkt[], uint16_t nb_pkt,
460		struct tle_drb *drbs[], uint32_t *nb_drb)
461{
462	uint32_t bsz, i, n, nb, nbc, nbm;
463
464	bsz = s->tx.drb.nb_elem;
465
466	/* calulate how many drbs are needed.*/
467	nbc = *nb_drb;
468	nbm = (nb_pkt + bsz - 1) / bsz;
469	nb = RTE_MAX(nbm, nbc) - nbc;
470
471	/* allocate required drbs */
472	if (nb != 0)
473		nb = stream_drb_alloc(s, drbs + nbc, nb);
474
475	nb += nbc;
476
477	/* no free drbs, can't send anything */
478	if (nb == 0)
479		return 0;
480
481	/* not enough free drbs, reduce number of packets to send. */
482	else if (nb != nbm)
483		nb_pkt = nb * bsz;
484
485	/* enqueue packets to the destination device. */
486	nbc = nb;
487	n = tle_dring_mp_enqueue(&dev->tx.dr, pkt, nb_pkt, drbs, &nb);
488
489	/* if not all available drbs were consumed, move them to the start. */
490	nbc -= nb;
491	for (i = 0; i != nb; i++)
492		drbs[i] = drbs[nbc + i];
493
494	*nb_drb = nb;
495	return n;
496}
497
498uint16_t
499tle_udp_stream_send(struct tle_stream *us, struct rte_mbuf *pkt[],
500	uint16_t num, const struct sockaddr *dst_addr)
501{
502	int32_t di, frg, rc;
503	uint64_t ol_flags;
504	uint32_t i, k, n, nb;
505	uint32_t mtu, pid, type;
506	const struct sockaddr_in *d4;
507	const struct sockaddr_in6 *d6;
508	struct tle_udp_stream *s;
509	const void *da;
510	union udph udph;
511	struct tle_dest dst;
512	struct tle_drb *drb[num];
513
514	s = UDP_STREAM(us);
515	type = s->s.type;
516
517	/* start filling UDP header. */
518	udph.raw = 0;
519	udph.ports.src = s->s.port.dst;
520
521	/* figure out what destination addr/port to use. */
522	if (dst_addr != NULL) {
523		if (dst_addr->sa_family != s->prm.remote_addr.ss_family) {
524			rte_errno = EINVAL;
525			return 0;
526		}
527		if (type == TLE_V4) {
528			d4 = (const struct sockaddr_in *)dst_addr;
529			da = &d4->sin_addr;
530			udph.ports.dst = d4->sin_port;
531		} else {
532			d6 = (const struct sockaddr_in6 *)dst_addr;
533			da = &d6->sin6_addr;
534			udph.ports.dst = d6->sin6_port;
535		}
536	} else {
537		udph.ports.dst = s->s.port.src;
538		if (type == TLE_V4)
539			da = &s->s.ipv4.addr.src;
540		else
541			da = &s->s.ipv6.addr.src;
542	}
543
544	di = stream_get_dest(&s->s, da, &dst);
545	if (di < 0) {
546		rte_errno = -di;
547		return 0;
548	}
549
550	pid = rte_atomic32_add_return(&dst.dev->tx.packet_id[type], num) - num;
551	mtu = dst.mtu - dst.l2_len - dst.l3_len;
552
553	/* mark stream as not closable. */
554	if (rwl_acquire(&s->tx.use) < 0) {
555		rte_errno = EAGAIN;
556		return 0;
557	}
558
559	nb = 0;
560	for (i = 0, k = 0; k != num; k = i) {
561
562		/* copy L2/L3/L4 headers into mbufs, setup mbufs metadata. */
563
564		frg = 0;
565		ol_flags = dst.dev->tx.ol_flags[type];
566
567		while (i != num && frg == 0) {
568			frg = pkt[i]->pkt_len > mtu;
569			if (frg != 0)
570				ol_flags &= ~PKT_TX_UDP_CKSUM;
571			rc = udp_fill_mbuf(pkt[i], type, ol_flags, pid + i,
572				udph, &dst);
573			if (rc != 0) {
574				rte_errno = -rc;
575				goto out;
576			}
577			i += (frg == 0);
578		}
579
580		/* enqueue non-fragment packets to the destination device. */
581		if (k != i) {
582			k += queue_pkt_out(s, dst.dev,
583				(const void **)(uintptr_t)&pkt[k], i - k,
584				drb, &nb);
585
586			/* stream TX queue is full. */
587			if (k != i) {
588				rte_errno = EAGAIN;
589				break;
590			}
591		}
592
593		/* enqueue packet that need to be fragmented */
594		if (i != num) {
595
596			struct rte_mbuf *frag[RTE_LIBRTE_IP_FRAG_MAX_FRAG];
597
598			/* fragment the packet. */
599			rc = fragment(pkt[i], frag, RTE_DIM(frag), type, &dst);
600			if (rc < 0) {
601				rte_errno = -rc;
602				break;
603			}
604
605			n = queue_pkt_out(s, dst.dev,
606				(const void **)(uintptr_t)frag, rc, drb, &nb);
607			if (n == 0) {
608				while (rc-- != 0)
609					rte_pktmbuf_free(frag[rc]);
610				rte_errno = EAGAIN;
611				break;
612			}
613
614			/* all fragments enqueued, free the original packet. */
615			rte_pktmbuf_free(pkt[i]);
616			i++;
617		}
618	}
619
620	/* if possible, rearm socket write event. */
621	if (k == num && s->tx.ev != NULL)
622		tle_event_raise(s->tx.ev);
623
624out:
625	/* free unused drbs. */
626	if (nb != 0)
627		stream_drb_free(s, drb, nb);
628
629	/* stream can be closed. */
630	rwl_release(&s->tx.use);
631
632	/*
633	 * remove pkt l2/l3 headers, restore ol_flags for unsent, but
634	 * already modified packets.
635	 */
636	ol_flags = ~dst.dev->tx.ol_flags[type];
637	for (n = k; n != i; n++) {
638		rte_pktmbuf_adj(pkt[n], dst.l2_len + dst.l3_len + sizeof(udph));
639		pkt[n]->ol_flags &= ol_flags;
640	}
641
642	return k;
643}
644