udp_rxtx.c revision 0104c556
1/*
2 * Copyright (c) 2016  Intel Corporation.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#include <rte_malloc.h>
17#include <rte_errno.h>
18#include <rte_ethdev.h>
19#include <rte_ip.h>
20#include <rte_ip_frag.h>
21#include <rte_udp.h>
22
23#include "udp_stream.h"
24#include "misc.h"
25
26static inline struct tle_udp_stream *
27rx_stream_obtain(struct tle_dev *dev, uint32_t type, uint32_t port)
28{
29	struct tle_udp_stream *s;
30
31	if (type >= TLE_VNUM || dev->dp[type] == NULL)
32		return NULL;
33
34	s = (struct tle_udp_stream *)dev->dp[type]->streams[port];
35	if (s == NULL)
36		return NULL;
37
38	if (rwl_acquire(&s->rx.use) < 0)
39		return NULL;
40
41	return s;
42}
43
44static inline uint16_t
45get_pkt_type(const struct rte_mbuf *m)
46{
47	uint32_t v;
48
49	v = m->packet_type &
50		(RTE_PTYPE_L3_IPV4 | RTE_PTYPE_L3_IPV6 | RTE_PTYPE_L4_MASK);
51	if (v == (RTE_PTYPE_L3_IPV4 | RTE_PTYPE_L4_UDP))
52		return TLE_V4;
53	else if (v == (RTE_PTYPE_L3_IPV6 | RTE_PTYPE_L4_UDP))
54		return TLE_V6;
55	else
56		return TLE_VNUM;
57}
58
59static inline union l4_ports
60pkt_info(const struct tle_dev *dev, struct rte_mbuf *m,
61	union l4_ports *ports, union ipv4_addrs *addr4,
62	union ipv6_addrs **addr6)
63{
64	uint32_t len;
65	union l4_ports ret, *up;
66	union ipv4_addrs *pa4;
67
68	ret.src = get_pkt_type(m);
69
70	len = m->l2_len;
71	if (ret.src == TLE_V4) {
72		pa4 = rte_pktmbuf_mtod_offset(m, union ipv4_addrs *,
73			len + offsetof(struct ipv4_hdr, src_addr));
74		addr4->raw = pa4->raw;
75		m->ol_flags |= dev->rx.ol_flags[TLE_V4];
76	} else if (ret.src == TLE_V6) {
77		*addr6 = rte_pktmbuf_mtod_offset(m, union ipv6_addrs *,
78			len + offsetof(struct ipv6_hdr, src_addr));
79		m->ol_flags |= dev->rx.ol_flags[TLE_V6];
80	}
81
82	len += m->l3_len;
83	up = rte_pktmbuf_mtod_offset(m, union l4_ports *,
84		len + offsetof(struct udp_hdr, src_port));
85	ports->raw = up->raw;
86	ret.dst = ports->dst;
87	return ret;
88}
89
90/*
91 * Helper routine, enqueues packets to the stream and calls RX
92 * notification callback, if needed.
93 */
94static inline uint16_t
95rx_stream(struct tle_udp_stream *s, void *mb[], struct rte_mbuf *rp[],
96	int32_t rc[], uint32_t num)
97{
98	uint32_t i, k, r;
99
100	r = _rte_ring_enqueue_burst(s->rx.q, mb, num);
101
102	/* if RX queue was empty invoke user RX notification callback. */
103	if (s->rx.cb.func != NULL && r != 0 && rte_ring_count(s->rx.q) == r)
104		s->rx.cb.func(s->rx.cb.data, &s->s);
105
106	for (i = r, k = 0; i != num; i++, k++) {
107		rc[k] = ENOBUFS;
108		rp[k] = mb[i];
109	}
110
111	return r;
112}
113
114static inline uint16_t
115rx_stream6(struct tle_udp_stream *s, struct rte_mbuf *pkt[],
116	union ipv6_addrs *addr[], union l4_ports port[],
117	struct rte_mbuf *rp[], int32_t rc[], uint16_t num)
118{
119	uint32_t i, k, n;
120	void *mb[num];
121
122	k = 0;
123	n = 0;
124
125	for (i = 0; i != num; i++) {
126
127		if ((port[i].raw & s->s.pmsk.raw) != s->s.port.raw ||
128				ymm_mask_cmp(&addr[i]->raw, &s->s.ipv6.addr.raw,
129				&s->s.ipv6.mask.raw) != 0) {
130			rc[k] = ENOENT;
131			rp[k] = pkt[i];
132			k++;
133		} else {
134			mb[n] = pkt[i];
135			n++;
136		}
137	}
138
139	return rx_stream(s, mb, rp + k, rc + k, n);
140}
141
142static inline uint16_t
143rx_stream4(struct tle_udp_stream *s, struct rte_mbuf *pkt[],
144	union ipv4_addrs addr[], union l4_ports port[],
145	struct rte_mbuf *rp[], int32_t rc[], uint16_t num)
146{
147	uint32_t i, k, n;
148	void *mb[num];
149
150	k = 0;
151	n = 0;
152
153	for (i = 0; i != num; i++) {
154
155		if ((addr[i].raw & s->s.ipv4.mask.raw) != s->s.ipv4.addr.raw ||
156				(port[i].raw & s->s.pmsk.raw) !=
157				s->s.port.raw) {
158			rc[k] = ENOENT;
159			rp[k] = pkt[i];
160			k++;
161		} else {
162			mb[n] = pkt[i];
163			n++;
164		}
165	}
166
167	return rx_stream(s, mb, rp + k, rc + k, n);
168}
169
170uint16_t
171tle_udp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[],
172	struct rte_mbuf *rp[], int32_t rc[], uint16_t num)
173{
174	struct tle_udp_stream *s;
175	uint32_t i, j, k, n, p, t;
176	union l4_ports tp[num], port[num];
177	union ipv4_addrs a4[num];
178	union ipv6_addrs *pa6[num];
179
180	for (i = 0; i != num; i++)
181		tp[i] = pkt_info(dev, pkt[i], &port[i], &a4[i], &pa6[i]);
182
183	k = 0;
184	for (i = 0; i != num; i = j) {
185
186		for (j = i + 1; j != num && tp[j].raw == tp[i].raw; j++)
187			;
188
189		t = tp[i].src;
190		p = tp[i].dst;
191		s = rx_stream_obtain(dev, t, p);
192		if (s != NULL) {
193
194			if (t == TLE_V4)
195				n = rx_stream4(s, pkt + i, a4 + i,
196					port + i, rp + k, rc + k, j - i);
197			else
198				n = rx_stream6(s, pkt + i, pa6 + i, port + i,
199					rp + k, rc + k, j - i);
200
201			k += j - i - n;
202
203			if (s->rx.ev != NULL)
204				tle_event_raise(s->rx.ev);
205			rwl_release(&s->rx.use);
206
207		} else {
208			for (; i != j; i++) {
209				rc[k] = ENOENT;
210				rp[k] = pkt[i];
211				k++;
212			}
213		}
214	}
215
216	return num - k;
217}
218
219static inline void
220stream_drb_release(struct tle_udp_stream *s, struct tle_drb *drb[],
221	uint32_t nb_drb)
222{
223	uint32_t n;
224
225	n = rte_ring_count(s->tx.drb.r);
226	_rte_ring_enqueue_burst(s->tx.drb.r, (void **)drb, nb_drb);
227
228	/* If stream is still open, then mark it as avaialble for writing. */
229	if (rwl_try_acquire(&s->tx.use) > 0) {
230
231		if (s->tx.ev != NULL)
232			tle_event_raise(s->tx.ev);
233
234		/* if stream send buffer was full invoke TX callback */
235		else if (s->tx.cb.func != NULL && n == 0)
236			s->tx.cb.func(s->tx.cb.data, &s->s);
237
238	}
239
240	rwl_release(&s->tx.use);
241}
242
243uint16_t
244tle_udp_tx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], uint16_t num)
245{
246	uint32_t i, j, k, n;
247	struct tle_drb *drb[num];
248	struct tle_udp_stream *s;
249
250	/* extract packets from device TX queue. */
251
252	k = num;
253	n = tle_dring_sc_dequeue(&dev->tx.dr, (const void **)(uintptr_t)pkt,
254		num, drb, &k);
255
256	if (n == 0)
257		return 0;
258
259	/* free empty drbs and notify related streams. */
260
261	for (i = 0; i != k; i = j) {
262		s = drb[i]->udata;
263		for (j = i + 1; j != k && s == drb[j]->udata; j++)
264			;
265		stream_drb_release(s, drb + i, j - i);
266	}
267
268	return n;
269}
270
271/*
272 * helper function, do the necessary pre-processing for the received packets
273 * before handiing them to the strem_recv caller.
274 */
275static inline uint32_t
276recv_pkt_process(struct rte_mbuf *m[], uint32_t num, uint32_t type)
277{
278	uint32_t i, k;
279	uint64_t f, flg[num], ofl[num];
280
281	for (i = 0; i != num; i++) {
282		flg[i] = m[i]->ol_flags;
283		ofl[i] = m[i]->tx_offload;
284	}
285
286	k = 0;
287	for (i = 0; i != num; i++) {
288
289		f = flg[i] & (PKT_RX_IP_CKSUM_BAD | PKT_RX_L4_CKSUM_BAD);
290
291		/* drop packets with invalid cksum(s). */
292		if (f != 0 && check_pkt_csum(m[i], m[i]->ol_flags, type,
293				IPPROTO_UDP) != 0) {
294			rte_pktmbuf_free(m[i]);
295			m[i] = NULL;
296			k++;
297		} else {
298			m[i]->ol_flags ^= f;
299			rte_pktmbuf_adj(m[i], _tx_offload_l4_offset(ofl[i]));
300		}
301	}
302
303	return k;
304}
305
306uint16_t
307tle_udp_stream_recv(struct tle_stream *us, struct rte_mbuf *pkt[], uint16_t num)
308{
309	uint32_t k, n;
310	struct tle_udp_stream *s;
311
312	s = UDP_STREAM(us);
313	n = _rte_ring_mc_dequeue_burst(s->rx.q, (void **)pkt, num);
314	if (n == 0)
315		return 0;
316
317	/*
318	 * if we still have packets to read,
319	 * then rearm stream RX event.
320	 */
321	if (n == num && rte_ring_count(s->rx.q) != 0) {
322		if (rwl_try_acquire(&s->rx.use) > 0 && s->rx.ev != NULL)
323			tle_event_raise(s->rx.ev);
324		rwl_release(&s->rx.use);
325	}
326
327	k = recv_pkt_process(pkt, n, s->s.type);
328	return compress_pkt_list(pkt, n, k);
329}
330
331static inline int
332udp_fill_mbuf(struct rte_mbuf *m,
333	uint32_t type, uint64_t ol_flags, uint32_t pid,
334	union udph udph, const struct tle_dest *dst)
335{
336	uint32_t len, plen;
337	char *l2h;
338	union udph *l4h;
339
340	len = dst->l2_len + dst->l3_len;
341	plen = m->pkt_len;
342
343	/* copy to mbuf L2/L3 header template. */
344
345	l2h = rte_pktmbuf_prepend(m, len + sizeof(*l4h));
346	if (l2h == NULL)
347		return -ENOBUFS;
348
349	/* copy L2/L3 header */
350	rte_memcpy(l2h, dst->hdr, len);
351
352	/* copy UDP header */
353	l4h = (union udph *)(l2h + len);
354	l4h->raw = udph.raw;
355
356	/* setup mbuf TX offload related fields. */
357	m->tx_offload = _mbuf_tx_offload(dst->l2_len, dst->l3_len,
358		sizeof(*l4h), 0, 0, 0);
359	m->ol_flags |= ol_flags;
360
361	l4h->len = rte_cpu_to_be_16(plen + sizeof(*l4h));
362
363	/* update proto specific fields. */
364
365	if (type == TLE_V4) {
366		struct ipv4_hdr *l3h;
367		l3h = (struct ipv4_hdr *)(l2h + dst->l2_len);
368		l3h->packet_id = rte_cpu_to_be_16(pid);
369		l3h->total_length = rte_cpu_to_be_16(plen + dst->l3_len +
370			sizeof(*l4h));
371
372		if ((ol_flags & PKT_TX_UDP_CKSUM) != 0)
373			l4h->cksum = _ipv4x_phdr_cksum(l3h, m->l3_len,
374				ol_flags);
375		else
376			l4h->cksum = _ipv4_udptcp_mbuf_cksum(m, len, l3h);
377
378		if ((ol_flags & PKT_TX_IP_CKSUM) == 0)
379			l3h->hdr_checksum = _ipv4x_cksum(l3h, m->l3_len);
380	} else {
381		struct ipv6_hdr *l3h;
382		l3h = (struct ipv6_hdr *)(l2h + dst->l2_len);
383		l3h->payload_len = rte_cpu_to_be_16(plen + sizeof(*l4h));
384		if ((ol_flags & PKT_TX_UDP_CKSUM) != 0)
385			l4h->cksum = rte_ipv6_phdr_cksum(l3h, ol_flags);
386		else
387			l4h->cksum = _ipv6_udptcp_mbuf_cksum(m, len, l3h);
388	}
389
390	return 0;
391}
392
393/* ???
394 * probably this function should be there -
395 * rte_ipv[4,6]_fragment_packet should do that.
396 */
397static inline void
398frag_fixup(const struct rte_mbuf *ms, struct rte_mbuf *mf, uint32_t type)
399{
400	struct ipv4_hdr *l3h;
401
402	mf->ol_flags = ms->ol_flags;
403	mf->tx_offload = ms->tx_offload;
404
405	if (type == TLE_V4 && (ms->ol_flags & PKT_TX_IP_CKSUM) == 0) {
406		l3h = rte_pktmbuf_mtod(mf, struct ipv4_hdr *);
407		l3h->hdr_checksum = _ipv4x_cksum(l3h, mf->l3_len);
408	}
409}
410
411/*
412 * Returns negative for failure to fragment or actual number of fragments.
413 */
414static inline int
415fragment(struct rte_mbuf *pkt, struct rte_mbuf *frag[], uint32_t num,
416	uint32_t type, const struct tle_dest *dst)
417{
418	int32_t frag_num, i;
419	uint16_t mtu;
420	void *eth_hdr;
421
422	/* Remove the Ethernet header from the input packet */
423	rte_pktmbuf_adj(pkt, dst->l2_len);
424	mtu = dst->mtu - dst->l2_len;
425
426	/* fragment packet */
427	if (type == TLE_V4)
428		frag_num = rte_ipv4_fragment_packet(pkt, frag, num, mtu,
429			dst->head_mp, dst->head_mp);
430	else
431		frag_num = rte_ipv6_fragment_packet(pkt, frag, num, mtu,
432			dst->head_mp, dst->head_mp);
433
434	if (frag_num > 0) {
435		for (i = 0; i != frag_num; i++) {
436
437			frag_fixup(pkt, frag[i], type);
438
439			/* Move data_off to include l2 header first */
440			eth_hdr = rte_pktmbuf_prepend(frag[i], dst->l2_len);
441
442			/* copy l2 header into fragment */
443			rte_memcpy(eth_hdr, dst->hdr, dst->l2_len);
444		}
445	}
446
447	return frag_num;
448}
449
450static inline void
451stream_drb_free(struct tle_udp_stream *s, struct tle_drb *drbs[],
452	uint32_t nb_drb)
453{
454	_rte_ring_enqueue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
455}
456
457static inline uint32_t
458stream_drb_alloc(struct tle_udp_stream *s, struct tle_drb *drbs[],
459	uint32_t nb_drb)
460{
461	return _rte_ring_dequeue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
462}
463
464/* enqueue up to num packets to the destination device queue. */
465static inline uint16_t
466queue_pkt_out(struct tle_udp_stream *s, struct tle_dev *dev,
467		const void *pkt[], uint16_t nb_pkt,
468		struct tle_drb *drbs[], uint32_t *nb_drb)
469{
470	uint32_t bsz, i, n, nb, nbc, nbm;
471
472	bsz = s->tx.drb.nb_elem;
473
474	/* calulate how many drbs are needed.*/
475	nbc = *nb_drb;
476	nbm = (nb_pkt + bsz - 1) / bsz;
477	nb = RTE_MAX(nbm, nbc) - nbc;
478
479	/* allocate required drbs */
480	if (nb != 0)
481		nb = stream_drb_alloc(s, drbs + nbc, nb);
482
483	nb += nbc;
484
485	/* no free drbs, can't send anything */
486	if (nb == 0)
487		return 0;
488
489	/* not enough free drbs, reduce number of packets to send. */
490	else if (nb != nbm)
491		nb_pkt = nb * bsz;
492
493	/* enqueue packets to the destination device. */
494	nbc = nb;
495	n = tle_dring_mp_enqueue(&dev->tx.dr, pkt, nb_pkt, drbs, &nb);
496
497	/* if not all available drbs were consumed, move them to the start. */
498	nbc -= nb;
499	for (i = 0; i != nb; i++)
500		drbs[i] = drbs[nbc + i];
501
502	*nb_drb = nb;
503	return n;
504}
505
506uint16_t
507tle_udp_stream_send(struct tle_stream *us, struct rte_mbuf *pkt[],
508	uint16_t num, const struct sockaddr *dst_addr)
509{
510	int32_t di, frg, rc;
511	uint64_t ol_flags;
512	uint32_t i, k, n, nb;
513	uint32_t mtu, pid, type;
514	const struct sockaddr_in *d4;
515	const struct sockaddr_in6 *d6;
516	struct tle_udp_stream *s;
517	const void *da;
518	union udph udph;
519	struct tle_dest dst;
520	struct tle_drb *drb[num];
521
522	s = UDP_STREAM(us);
523	type = s->s.type;
524
525	/* start filling UDP header. */
526	udph.raw = 0;
527	udph.ports.src = s->s.port.dst;
528
529	/* figure out what destination addr/port to use. */
530	if (dst_addr != NULL) {
531		if (dst_addr->sa_family != s->prm.remote_addr.ss_family) {
532			rte_errno = EINVAL;
533			return 0;
534		}
535		if (type == TLE_V4) {
536			d4 = (const struct sockaddr_in *)dst_addr;
537			da = &d4->sin_addr;
538			udph.ports.dst = d4->sin_port;
539		} else {
540			d6 = (const struct sockaddr_in6 *)dst_addr;
541			da = &d6->sin6_addr;
542			udph.ports.dst = d6->sin6_port;
543		}
544	} else {
545		udph.ports.dst = s->s.port.src;
546		if (type == TLE_V4)
547			da = &s->s.ipv4.addr.src;
548		else
549			da = &s->s.ipv6.addr.src;
550	}
551
552	di = stream_get_dest(&s->s, da, &dst);
553	if (di < 0) {
554		rte_errno = -di;
555		return 0;
556	}
557
558	pid = rte_atomic32_add_return(&dst.dev->tx.packet_id[type], num) - num;
559	mtu = dst.mtu - dst.l2_len - dst.l3_len;
560
561	/* mark stream as not closable. */
562	if (rwl_acquire(&s->tx.use) < 0) {
563		rte_errno = EAGAIN;
564		return 0;
565	}
566
567	nb = 0;
568	for (i = 0, k = 0; k != num; k = i) {
569
570		/* copy L2/L3/L4 headers into mbufs, setup mbufs metadata. */
571
572		frg = 0;
573		ol_flags = dst.dev->tx.ol_flags[type];
574
575		while (i != num && frg == 0) {
576			frg = pkt[i]->pkt_len > mtu;
577			if (frg != 0)
578				ol_flags &= ~PKT_TX_UDP_CKSUM;
579			rc = udp_fill_mbuf(pkt[i], type, ol_flags, pid + i,
580				udph, &dst);
581			if (rc != 0) {
582				rte_errno = -rc;
583				goto out;
584			}
585			i += (frg == 0);
586		}
587
588		/* enqueue non-fragment packets to the destination device. */
589		if (k != i) {
590			k += queue_pkt_out(s, dst.dev,
591				(const void **)(uintptr_t)&pkt[k], i - k,
592				drb, &nb);
593
594			/* stream TX queue is full. */
595			if (k != i) {
596				rte_errno = EAGAIN;
597				break;
598			}
599		}
600
601		/* enqueue packet that need to be fragmented */
602		if (i != num) {
603
604			struct rte_mbuf *frag[RTE_LIBRTE_IP_FRAG_MAX_FRAG];
605
606			/* fragment the packet. */
607			rc = fragment(pkt[i], frag, RTE_DIM(frag), type, &dst);
608			if (rc < 0) {
609				rte_errno = -rc;
610				break;
611			}
612
613			n = queue_pkt_out(s, dst.dev,
614				(const void **)(uintptr_t)frag, rc, drb, &nb);
615			if (n == 0) {
616				while (rc-- != 0)
617					rte_pktmbuf_free(frag[rc]);
618				rte_errno = EAGAIN;
619				break;
620			}
621
622			/* all fragments enqueued, free the original packet. */
623			rte_pktmbuf_free(pkt[i]);
624			i++;
625		}
626	}
627
628	/* if possible, rearm socket write event. */
629	if (k == num && s->tx.ev != NULL)
630		tle_event_raise(s->tx.ev);
631
632out:
633	/* free unused drbs. */
634	if (nb != 0)
635		stream_drb_free(s, drb, nb);
636
637	/* stream can be closed. */
638	rwl_release(&s->tx.use);
639
640	/*
641	 * remove pkt l2/l3 headers, restore ol_flags for unsent, but
642	 * already modified packets.
643	 */
644	ol_flags = ~dst.dev->tx.ol_flags[type];
645	for (n = k; n != i; n++) {
646		rte_pktmbuf_adj(pkt[n], dst.l2_len + dst.l3_len + sizeof(udph));
647		pkt[n]->ol_flags &= ol_flags;
648	}
649
650	return k;
651}
652