1/*
2 * Copyright (c) 2016  Intel Corporation.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#include <rte_malloc.h>
17#include <rte_errno.h>
18#include <rte_ethdev.h>
19#include <rte_ip.h>
20#include <rte_ip_frag.h>
21#include <rte_udp.h>
22
23#include "udp_stream.h"
24#include "misc.h"
25
26static inline struct tle_udp_stream *
27rx_stream_obtain(struct tle_dev *dev, uint32_t type, uint32_t port)
28{
29	struct tle_udp_stream *s;
30
31	if (type >= TLE_VNUM || dev->dp[type] == NULL)
32		return NULL;
33
34	s = (struct tle_udp_stream *)dev->dp[type]->streams[port];
35	if (s == NULL)
36		return NULL;
37
38	if (rwl_acquire(&s->rx.use) < 0)
39		return NULL;
40
41	return s;
42}
43
44static inline uint16_t
45get_pkt_type(const struct rte_mbuf *m)
46{
47	uint32_t v;
48
49	v = m->packet_type &
50		(RTE_PTYPE_L3_IPV4 | RTE_PTYPE_L3_IPV6 | RTE_PTYPE_L4_MASK);
51	if (v == (RTE_PTYPE_L3_IPV4 | RTE_PTYPE_L4_UDP))
52		return TLE_V4;
53	else if (v == (RTE_PTYPE_L3_IPV6 | RTE_PTYPE_L4_UDP))
54		return TLE_V6;
55	else
56		return TLE_VNUM;
57}
58
59static inline union l4_ports
60pkt_info(struct rte_mbuf *m, union l4_ports *ports, union ipv4_addrs *addr4,
61	union ipv6_addrs **addr6)
62{
63	uint32_t len;
64	union l4_ports ret, *up;
65	union ipv4_addrs *pa4;
66
67	ret.src = get_pkt_type(m);
68
69	len = m->l2_len;
70	if (ret.src == TLE_V4) {
71		pa4 = rte_pktmbuf_mtod_offset(m, union ipv4_addrs *,
72			len + offsetof(struct ipv4_hdr, src_addr));
73		addr4->raw = pa4->raw;
74	} else if (ret.src == TLE_V6) {
75		*addr6 = rte_pktmbuf_mtod_offset(m, union ipv6_addrs *,
76			len + offsetof(struct ipv6_hdr, src_addr));
77	}
78
79	len += m->l3_len;
80	up = rte_pktmbuf_mtod_offset(m, union l4_ports *,
81		len + offsetof(struct udp_hdr, src_port));
82	ports->raw = up->raw;
83	ret.dst = ports->dst;
84	return ret;
85}
86
87/*
88 * Helper routine, enqueues packets to the stream and calls RX
89 * notification callback, if needed.
90 */
91static inline uint16_t
92rx_stream(struct tle_udp_stream *s, void *mb[], struct rte_mbuf *rp[],
93	int32_t rc[], uint32_t num)
94{
95	uint32_t i, k, r;
96
97	r = _rte_ring_enqueue_burst(s->rx.q, mb, num);
98
99	/* if RX queue was empty invoke user RX notification callback. */
100	if (s->rx.cb.func != NULL && r != 0 && rte_ring_count(s->rx.q) == r)
101		s->rx.cb.func(s->rx.cb.data, &s->s);
102
103	for (i = r, k = 0; i != num; i++, k++) {
104		rc[k] = ENOBUFS;
105		rp[k] = mb[i];
106	}
107
108	return r;
109}
110
111static inline uint16_t
112rx_stream6(struct tle_udp_stream *s, struct rte_mbuf *pkt[],
113	union ipv6_addrs *addr[], union l4_ports port[],
114	struct rte_mbuf *rp[], int32_t rc[], uint16_t num)
115{
116	uint32_t i, k, n;
117	void *mb[num];
118
119	k = 0;
120	n = 0;
121
122	for (i = 0; i != num; i++) {
123
124		if ((port[i].raw & s->s.pmsk.raw) != s->s.port.raw ||
125				ymm_mask_cmp(&addr[i]->raw, &s->s.ipv6.addr.raw,
126				&s->s.ipv6.mask.raw) != 0) {
127			rc[k] = ENOENT;
128			rp[k] = pkt[i];
129			k++;
130		} else {
131			mb[n] = pkt[i];
132			n++;
133		}
134	}
135
136	return rx_stream(s, mb, rp + k, rc + k, n);
137}
138
139static inline uint16_t
140rx_stream4(struct tle_udp_stream *s, struct rte_mbuf *pkt[],
141	union ipv4_addrs addr[], union l4_ports port[],
142	struct rte_mbuf *rp[], int32_t rc[], uint16_t num)
143{
144	uint32_t i, k, n;
145	void *mb[num];
146
147	k = 0;
148	n = 0;
149
150	for (i = 0; i != num; i++) {
151
152		if ((addr[i].raw & s->s.ipv4.mask.raw) != s->s.ipv4.addr.raw ||
153				(port[i].raw & s->s.pmsk.raw) !=
154				s->s.port.raw) {
155			rc[k] = ENOENT;
156			rp[k] = pkt[i];
157			k++;
158		} else {
159			mb[n] = pkt[i];
160			n++;
161		}
162	}
163
164	return rx_stream(s, mb, rp + k, rc + k, n);
165}
166
167uint16_t
168tle_udp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[],
169	struct rte_mbuf *rp[], int32_t rc[], uint16_t num)
170{
171	struct tle_udp_stream *s;
172	uint32_t i, j, k, n, p, t;
173	union l4_ports tp[num], port[num];
174	union ipv4_addrs a4[num];
175	union ipv6_addrs *pa6[num];
176
177	for (i = 0; i != num; i++)
178		tp[i] = pkt_info(pkt[i], &port[i], &a4[i], &pa6[i]);
179
180	k = 0;
181	for (i = 0; i != num; i = j) {
182
183		for (j = i + 1; j != num && tp[j].raw == tp[i].raw; j++)
184			;
185
186		t = tp[i].src;
187		p = tp[i].dst;
188		s = rx_stream_obtain(dev, t, p);
189		if (s != NULL) {
190
191			if (t == TLE_V4)
192				n = rx_stream4(s, pkt + i, a4 + i,
193					port + i, rp + k, rc + k, j - i);
194			else
195				n = rx_stream6(s, pkt + i, pa6 + i, port + i,
196					rp + k, rc + k, j - i);
197
198			k += j - i - n;
199
200			if (s->rx.ev != NULL)
201				tle_event_raise(s->rx.ev);
202			rwl_release(&s->rx.use);
203
204		} else {
205			for (; i != j; i++) {
206				rc[k] = ENOENT;
207				rp[k] = pkt[i];
208				k++;
209			}
210		}
211	}
212
213	return num - k;
214}
215
216static inline void
217stream_drb_release(struct tle_udp_stream *s, struct tle_drb *drb[],
218	uint32_t nb_drb)
219{
220	uint32_t n;
221
222	n = rte_ring_count(s->tx.drb.r);
223	_rte_ring_enqueue_burst(s->tx.drb.r, (void **)drb, nb_drb);
224
225	/* If stream is still open, then mark it as avaialble for writing. */
226	if (rwl_try_acquire(&s->tx.use) > 0) {
227
228		if (s->tx.ev != NULL)
229			tle_event_raise(s->tx.ev);
230
231		/* if stream send buffer was full invoke TX callback */
232		else if (s->tx.cb.func != NULL && n == 0)
233			s->tx.cb.func(s->tx.cb.data, &s->s);
234
235	}
236
237	rwl_release(&s->tx.use);
238}
239
240uint16_t
241tle_udp_tx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], uint16_t num)
242{
243	uint32_t i, j, k, n;
244	struct tle_drb *drb[num];
245	struct tle_udp_stream *s;
246
247	/* extract packets from device TX queue. */
248
249	k = num;
250	n = tle_dring_sc_dequeue(&dev->tx.dr, (const void **)(uintptr_t)pkt,
251		num, drb, &k);
252
253	if (n == 0)
254		return 0;
255
256	/* free empty drbs and notify related streams. */
257
258	for (i = 0; i != k; i = j) {
259		s = drb[i]->udata;
260		for (j = i + 1; j != k && s == drb[j]->udata; j++)
261			;
262		stream_drb_release(s, drb + i, j - i);
263	}
264
265	return n;
266}
267
268/*
269 * helper function, do the necessary pre-processing for the received packets
270 * before handiing them to the strem_recv caller.
271 */
272static inline uint32_t
273recv_pkt_process(struct rte_mbuf *m[], uint32_t num, uint32_t type)
274{
275	uint32_t i, k;
276	uint64_t flg[num], ofl[num];
277
278	for (i = 0; i != num; i++) {
279		flg[i] = m[i]->ol_flags;
280		ofl[i] = m[i]->tx_offload;
281	}
282
283	k = 0;
284	for (i = 0; i != num; i++) {
285
286		/* drop packets with invalid cksum(s). */
287		if (check_pkt_csum(m[i], flg[i], type, IPPROTO_UDP) != 0) {
288			rte_pktmbuf_free(m[i]);
289			m[i] = NULL;
290			k++;
291		} else
292			rte_pktmbuf_adj(m[i], _tx_offload_l4_offset(ofl[i]));
293	}
294
295	return k;
296}
297
298uint16_t
299tle_udp_stream_recv(struct tle_stream *us, struct rte_mbuf *pkt[], uint16_t num)
300{
301	uint32_t k, n;
302	struct tle_udp_stream *s;
303
304	s = UDP_STREAM(us);
305	n = _rte_ring_mc_dequeue_burst(s->rx.q, (void **)pkt, num);
306	if (n == 0)
307		return 0;
308
309	/*
310	 * if we still have packets to read,
311	 * then rearm stream RX event.
312	 */
313	if (n == num && rte_ring_count(s->rx.q) != 0) {
314		if (rwl_try_acquire(&s->rx.use) > 0 && s->rx.ev != NULL)
315			tle_event_raise(s->rx.ev);
316		rwl_release(&s->rx.use);
317	}
318
319	k = recv_pkt_process(pkt, n, s->s.type);
320	return compress_pkt_list(pkt, n, k);
321}
322
323static inline int
324udp_fill_mbuf(struct rte_mbuf *m,
325	uint32_t type, uint64_t ol_flags, uint32_t pid,
326	union udph udph, const struct tle_dest *dst)
327{
328	uint32_t len, plen;
329	char *l2h;
330	union udph *l4h;
331
332	len = dst->l2_len + dst->l3_len;
333	plen = m->pkt_len;
334
335	/* copy to mbuf L2/L3 header template. */
336
337	l2h = rte_pktmbuf_prepend(m, len + sizeof(*l4h));
338	if (l2h == NULL)
339		return -ENOBUFS;
340
341	/* copy L2/L3 header */
342	rte_memcpy(l2h, dst->hdr, len);
343
344	/* copy UDP header */
345	l4h = (union udph *)(l2h + len);
346	l4h->raw = udph.raw;
347
348	/* setup mbuf TX offload related fields. */
349	m->tx_offload = _mbuf_tx_offload(dst->l2_len, dst->l3_len,
350		sizeof(*l4h), 0, 0, 0);
351	m->ol_flags |= ol_flags;
352
353	l4h->len = rte_cpu_to_be_16(plen + sizeof(*l4h));
354
355	/* update proto specific fields. */
356
357	if (type == TLE_V4) {
358		struct ipv4_hdr *l3h;
359		l3h = (struct ipv4_hdr *)(l2h + dst->l2_len);
360		l3h->packet_id = rte_cpu_to_be_16(pid);
361		l3h->total_length = rte_cpu_to_be_16(plen + dst->l3_len +
362			sizeof(*l4h));
363
364		if ((ol_flags & PKT_TX_UDP_CKSUM) != 0)
365			l4h->cksum = _ipv4x_phdr_cksum(l3h, m->l3_len,
366				ol_flags);
367		else
368			l4h->cksum = _ipv4_udptcp_mbuf_cksum(m, len, l3h);
369
370		if ((ol_flags & PKT_TX_IP_CKSUM) == 0)
371			l3h->hdr_checksum = _ipv4x_cksum(l3h, m->l3_len);
372	} else {
373		struct ipv6_hdr *l3h;
374		l3h = (struct ipv6_hdr *)(l2h + dst->l2_len);
375		l3h->payload_len = rte_cpu_to_be_16(plen + sizeof(*l4h));
376		if ((ol_flags & PKT_TX_UDP_CKSUM) != 0)
377			l4h->cksum = rte_ipv6_phdr_cksum(l3h, ol_flags);
378		else
379			l4h->cksum = _ipv6_udptcp_mbuf_cksum(m, len, l3h);
380	}
381
382	return 0;
383}
384
385/* ???
386 * probably this function should be there -
387 * rte_ipv[4,6]_fragment_packet should do that.
388 */
389static inline void
390frag_fixup(const struct rte_mbuf *ms, struct rte_mbuf *mf, uint32_t type)
391{
392	struct ipv4_hdr *l3h;
393
394	mf->ol_flags = ms->ol_flags;
395	mf->tx_offload = ms->tx_offload;
396
397	if (type == TLE_V4 && (ms->ol_flags & PKT_TX_IP_CKSUM) == 0) {
398		l3h = rte_pktmbuf_mtod(mf, struct ipv4_hdr *);
399		l3h->hdr_checksum = _ipv4x_cksum(l3h, mf->l3_len);
400	}
401}
402
403/*
404 * Returns negative for failure to fragment or actual number of fragments.
405 */
406static inline int
407fragment(struct rte_mbuf *pkt, struct rte_mbuf *frag[], uint32_t num,
408	uint32_t type, const struct tle_dest *dst)
409{
410	int32_t frag_num, i;
411	uint16_t mtu;
412	void *eth_hdr;
413
414	/* Remove the Ethernet header from the input packet */
415	rte_pktmbuf_adj(pkt, dst->l2_len);
416	mtu = dst->mtu - dst->l2_len;
417
418	/* fragment packet */
419	if (type == TLE_V4)
420		frag_num = rte_ipv4_fragment_packet(pkt, frag, num, mtu,
421			dst->head_mp, dst->head_mp);
422	else
423		frag_num = rte_ipv6_fragment_packet(pkt, frag, num, mtu,
424			dst->head_mp, dst->head_mp);
425
426	if (frag_num > 0) {
427		for (i = 0; i != frag_num; i++) {
428
429			frag_fixup(pkt, frag[i], type);
430
431			/* Move data_off to include l2 header first */
432			eth_hdr = rte_pktmbuf_prepend(frag[i], dst->l2_len);
433
434			/* copy l2 header into fragment */
435			rte_memcpy(eth_hdr, dst->hdr, dst->l2_len);
436		}
437	}
438
439	return frag_num;
440}
441
442static inline void
443stream_drb_free(struct tle_udp_stream *s, struct tle_drb *drbs[],
444	uint32_t nb_drb)
445{
446	_rte_ring_enqueue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
447}
448
449static inline uint32_t
450stream_drb_alloc(struct tle_udp_stream *s, struct tle_drb *drbs[],
451	uint32_t nb_drb)
452{
453	return _rte_ring_dequeue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
454}
455
456/* enqueue up to num packets to the destination device queue. */
457static inline uint16_t
458queue_pkt_out(struct tle_udp_stream *s, struct tle_dev *dev,
459		const void *pkt[], uint16_t nb_pkt,
460		struct tle_drb *drbs[], uint32_t *nb_drb, uint8_t all_or_nothing)
461{
462	uint32_t bsz, i, n, nb, nbc, nbm;
463
464	bsz = s->tx.drb.nb_elem;
465
466	/* calulate how many drbs are needed.*/
467	nbc = *nb_drb;
468	nbm = (nb_pkt + bsz - 1) / bsz;
469	nb = RTE_MAX(nbm, nbc) - nbc;
470
471	/* allocate required drbs */
472	if (nb != 0)
473		nb = stream_drb_alloc(s, drbs + nbc, nb);
474
475	nb += nbc;
476
477	/* no free drbs, can't send anything */
478	if (nb == 0)
479		return 0;
480
481	/* not enough free drbs, reduce number of packets to send. */
482	else if (nb != nbm) {
483		if (all_or_nothing)
484			return 0;
485		nb_pkt = nb * bsz;
486	}
487
488	/* enqueue packets to the destination device. */
489	nbc = nb;
490	n = tle_dring_mp_enqueue(&dev->tx.dr, pkt, nb_pkt, drbs, &nb);
491
492	/* if not all available drbs were consumed, move them to the start. */
493	nbc -= nb;
494	for (i = 0; i != nb; i++)
495		drbs[i] = drbs[nbc + i];
496
497	*nb_drb = nb;
498	return n;
499}
500
501uint16_t
502tle_udp_stream_send(struct tle_stream *us, struct rte_mbuf *pkt[],
503	uint16_t num, const struct sockaddr *dst_addr)
504{
505	int32_t di, frg, rc;
506	uint64_t ol_flags;
507	uint32_t i, k, n, nb;
508	uint32_t mtu, pid, type;
509	const struct sockaddr_in *d4;
510	const struct sockaddr_in6 *d6;
511	struct tle_udp_stream *s;
512	const void *da;
513	union udph udph;
514	struct tle_dest dst;
515	struct tle_drb *drb[num];
516
517	s = UDP_STREAM(us);
518	type = s->s.type;
519
520	/* start filling UDP header. */
521	udph.raw = 0;
522	udph.ports.src = s->s.port.dst;
523
524	/* figure out what destination addr/port to use. */
525	if (dst_addr != NULL) {
526		if (dst_addr->sa_family != s->prm.remote_addr.ss_family) {
527			rte_errno = EINVAL;
528			return 0;
529		}
530		if (type == TLE_V4) {
531			d4 = (const struct sockaddr_in *)dst_addr;
532			da = &d4->sin_addr;
533			udph.ports.dst = d4->sin_port;
534		} else {
535			d6 = (const struct sockaddr_in6 *)dst_addr;
536			da = &d6->sin6_addr;
537			udph.ports.dst = d6->sin6_port;
538		}
539	} else {
540		udph.ports.dst = s->s.port.src;
541		if (type == TLE_V4)
542			da = &s->s.ipv4.addr.src;
543		else
544			da = &s->s.ipv6.addr.src;
545	}
546
547	di = stream_get_dest(&s->s, da, &dst);
548	if (di < 0) {
549		rte_errno = -di;
550		return 0;
551	}
552
553	pid = rte_atomic32_add_return(&dst.dev->tx.packet_id[type], num) - num;
554	mtu = dst.mtu - dst.l2_len - dst.l3_len;
555
556	/* mark stream as not closable. */
557	if (rwl_acquire(&s->tx.use) < 0) {
558		rte_errno = EAGAIN;
559		return 0;
560	}
561
562	nb = 0;
563	for (i = 0, k = 0; k != num; k = i) {
564
565		/* copy L2/L3/L4 headers into mbufs, setup mbufs metadata. */
566
567		frg = 0;
568		ol_flags = dst.dev->tx.ol_flags[type];
569
570		while (i != num && frg == 0) {
571			frg = pkt[i]->pkt_len > mtu;
572			if (frg != 0)
573				ol_flags &= ~PKT_TX_UDP_CKSUM;
574			rc = udp_fill_mbuf(pkt[i], type, ol_flags, pid + i,
575				udph, &dst);
576			if (rc != 0) {
577				rte_errno = -rc;
578				goto out;
579			}
580			i += (frg == 0);
581		}
582
583		/* enqueue non-fragment packets to the destination device. */
584		if (k != i) {
585			k += queue_pkt_out(s, dst.dev,
586				(const void **)(uintptr_t)&pkt[k], i - k,
587				drb, &nb, 0);
588
589			/* stream TX queue is full. */
590			if (k != i) {
591				rte_errno = EAGAIN;
592				break;
593			}
594		}
595
596		/* enqueue packet that need to be fragmented */
597		if (i != num) {
598
599			struct rte_mbuf *frag[RTE_LIBRTE_IP_FRAG_MAX_FRAG];
600
601			/* fragment the packet. */
602			rc = fragment(pkt[i], frag, RTE_DIM(frag), type, &dst);
603			if (rc < 0) {
604				rte_errno = -rc;
605				break;
606			}
607
608			n = queue_pkt_out(s, dst.dev,
609				(const void **)(uintptr_t)frag, rc, drb, &nb, 1);
610			if (n == 0) {
611				while (rc-- != 0)
612					rte_pktmbuf_free(frag[rc]);
613				rte_errno = EAGAIN;
614				break;
615			}
616
617			/* all fragments enqueued, free the original packet. */
618			rte_pktmbuf_free(pkt[i]);
619			i++;
620		}
621	}
622
623	/* if possible, rearm socket write event. */
624	if (k == num && s->tx.ev != NULL)
625		tle_event_raise(s->tx.ev);
626
627out:
628	/* free unused drbs. */
629	if (nb != 0)
630		stream_drb_free(s, drb, nb);
631
632	/* stream can be closed. */
633	rwl_release(&s->tx.use);
634
635	/*
636	 * remove pkt l2/l3 headers, restore ol_flags for unsent, but
637	 * already modified packets.
638	 */
639	ol_flags = ~dst.dev->tx.ol_flags[type];
640	for (n = k; n != i; n++) {
641		rte_pktmbuf_adj(pkt[n], dst.l2_len + dst.l3_len + sizeof(udph));
642		pkt[n]->ol_flags &= ol_flags;
643	}
644
645	return k;
646}
647