tle_dring.h revision 7e18fa1b
1/*
2 * Copyright (c) 2016-2017  Intel Corporation.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#ifndef _TLE_DRING_H_
17#define _TLE_DRING_H_
18
19#include <string.h>
20
21#include <rte_common.h>
22#include <rte_atomic.h>
23#include <rte_memory.h>
24#include <rte_ring.h>
25#include <rte_debug.h>
26
27#ifdef __cplusplus
28extern "C" {
29#endif
30
31/**
32 * @file
33 * TLE dring
34 *
35 * The Dynamic Ring (dring) is a implementation of unbounded FIFO queue,
36 * that supports lockless bulk enqueue/dequeue for multiple producers/consumers.
37 * Internally it is represented by linked list of Dynamic Ring Blocks (drb).
38 * Each drb contains some metadata plus array of pointers to queued objects.
39 * It is a caller responsibility to provide sufficient number of drbs for
40 * enqueue operation, and manage unused drbs returned by dequeue operation.
41 * dring features:
42 *
43 * - FIFO (First In First Out)
44 * - Lockless implementation.
45 * - Multi- or single-consumer dequeue.
46 * - Multi- or single-producer enqueue.
47 * - Bulk dequeue.
48 * - Bulk enqueue.
49 */
50
51/*
52 * RTE_ASSERT was introduced in DPDK 16.07.
53 * For older versions, use RTE_VERIFY.
54 */
55#ifdef RTE_ASSERT
56#define TLE_DRING_ASSERT(exp)	RTE_ASSERT(exp)
57#else
58#define TLE_DRING_ASSERT(exp)	RTE_VERIFY(exp)
59#endif
60
61struct tle_drb {
62	struct tle_drb *next;
63	void *udata;          /**< user data. */
64	uint32_t size;        /**< number of objects in that buffer. */
65	uint32_t start;       /**< start index for that block. */
66	const void *objs[0];
67} __rte_cache_aligned;
68
69struct tle_dring {
70	uint32_t flags;
71	struct  {
72		uint32_t single;                /**< true if single producer */
73		volatile uint32_t head;         /**< producer head */
74		volatile uint32_t tail;         /**< producer tail */
75		struct tle_drb * volatile crb;  /**< block to enqueue to */
76	} prod __rte_cache_aligned;
77	struct  {
78		uint32_t single;                /**< true if single consumer */
79		volatile uint32_t head;         /**< consumer head */
80		volatile uint32_t tail;         /**< consumer tail */
81		struct tle_drb * volatile crb;  /**< block to dequeue from */
82	} cons __rte_cache_aligned;
83
84	struct tle_drb dummy;  /**< dummy block */
85};
86
87static inline uint32_t
88tle_dring_count(const struct tle_dring *dr)
89{
90	return dr->prod.tail - dr->cons.tail;
91}
92
93/*
94 * helper routine, to copy objects to/from the ring.
95 */
96static inline void __attribute__((always_inline))
97__tle_dring_copy_objs(const void *dst[], const void * const src[], uint32_t num)
98{
99	uint32_t i;
100
101	for (i = 0; i != RTE_ALIGN_FLOOR(num, 4); i += 4) {
102		dst[i] = src[i];
103		dst[i + 1] = src[i + 1];
104		dst[i + 2] = src[i + 2];
105		dst[i + 3] = src[i + 3];
106	}
107	switch (num % 4) {
108	case 3:
109		dst[i + 2] = src[i + 2];
110	case 2:
111		dst[i + 1] = src[i + 1];
112	case 1:
113		dst[i] = src[i];
114	}
115}
116
117/*
118 * helper routine, to enqueue objects into the ring.
119 */
120static inline uint32_t __attribute__((always_inline))
121__tle_dring_enqueue(struct tle_dring *dr, uint32_t head,
122	const void * const objs[], uint32_t nb_obj,
123	struct tle_drb *drbs[], uint32_t nb_drb)
124{
125	uint32_t i, j, k, n;
126	struct tle_drb *pb;
127
128	pb = dr->prod.crb;
129	i = 0;
130
131	/* fill the current producer block */
132	if (pb->size != 0) {
133		n = head - pb->start;
134		k = RTE_MIN(pb->size - n, nb_obj);
135		__tle_dring_copy_objs(pb->objs + n, objs, k);
136		i += k;
137	}
138
139	/* fill new blocks, if any */
140	j = 0;
141	if (i != nb_obj && nb_drb != 0) {
142
143		do {
144			pb->next = drbs[j];
145			pb = drbs[j];
146			pb->start = head + i;
147			k = RTE_MIN(pb->size, nb_obj - i);
148			__tle_dring_copy_objs(pb->objs, objs + i, k);
149			i += k;
150		} while (++j != nb_drb && i != nb_obj);
151
152		pb->next = NULL;
153
154		/* new procucer current block. */
155		dr->prod.crb = pb;
156	}
157
158	/* we have to enqueue all requested objects. */
159	TLE_DRING_ASSERT(nb_obj == i);
160
161	/* return number of unused blocks. */
162	return nb_drb - j;
163}
164
165/**
166 * Enqueue several objects on the dring (multi-producers safe).
167 * Note that it is a caller responsibility to provide enough drbs
168 * to enqueue all requested objects.
169 *
170 * @param dr
171 *   A pointer to the ring structure.
172 * @param objs
173 *   An array of pointers to objects to enqueue.
174 * @param nb_obj
175 *   The number of objects to add to the dring from the objs[].
176 * @param drbs
177 *   An array of pointers to the drbs that can be used by the dring
178 *   to perform enqueue operation.
179 * @param nb_drb
180 *   at input: number of elements in the drbs[] array.
181 *   at output: number of unused by the dring elements in the drbs[] array.
182 * @return
183 *   - number of enqueued objects.
184 */
185static inline uint32_t
186tle_dring_mp_enqueue(struct tle_dring *dr, const void * const objs[],
187	uint32_t nb_obj, struct tle_drb *drbs[], uint32_t *nb_drb)
188{
189	uint32_t head, next;
190
191	if (nb_obj == 0)
192		return 0;
193
194	/* reserve position inside the ring. */
195	do {
196		head = dr->prod.head;
197		next = head + nb_obj;
198	} while (rte_atomic32_cmpset(&dr->prod.head, head, next) == 0);
199
200	/*
201	 * If there are other enqueues in progress that preceded that one,
202	 * then wait for them to complete
203	 */
204	while (dr->prod.tail != head)
205		rte_pause();
206
207	/* make sure that changes from previous updates are visible. */
208	rte_smp_rmb();
209
210	/* now it is safe to enqueue into the ring. */
211	*nb_drb = __tle_dring_enqueue(dr, head, objs, nb_obj, drbs, *nb_drb);
212
213	/* make new objects visible to the consumer. */
214	rte_smp_wmb();
215	dr->prod.tail = next;
216
217	return nb_obj;
218}
219
220/**
221 * Enqueue several objects on the dring (NOT multi-producers safe).
222 * Note that it is a caller responsibility to provide enough drbs
223 * to enqueue all requested objects.
224 *
225 * @param dr
226 *   A pointer to the ring structure.
227 * @param objs
228 *   An array of pointers to objects to enqueue.
229 * @param nb_obj
230 *   The number of objects to add to the dring from the objs[].
231 * @param drbs
232 *   An array of pointers to the drbs that can be used by the dring
233 *   to perform enqueue operation.
234 * @param nb_drb
235 *   at input: number of elements in the drbs[] array.
236 *   at output: number of unused by the dring elements in the drbs[] array.
237 * @return
238 *   - number of enqueued objects.
239 */
240static inline uint32_t
241tle_dring_sp_enqueue(struct tle_dring *dr, const void * const objs[],
242	uint32_t nb_obj, struct tle_drb *drbs[], uint32_t *nb_drb)
243{
244	uint32_t head, next;
245
246	if (nb_obj == 0)
247		return 0;
248
249	head = dr->prod.head;
250	next = head + nb_obj;
251
252	/* update producer head value. */
253	dr->prod.head = next;
254
255	/* enqueue into the ring. */
256	*nb_drb = __tle_dring_enqueue(dr, head, objs, nb_obj, drbs, *nb_drb);
257
258	/* make new objects visible to the consumer. */
259	rte_smp_wmb();
260	dr->prod.tail = next;
261
262	return nb_obj;
263}
264
265/**
266 * Enqueue several objects on the dring.
267 * Note that it is a caller responsibility to provide enough drbs
268 * to enqueue all requested objects.
269 *
270 * @param dr
271 *   A pointer to the ring structure.
272 * @param objs
273 *   An array of pointers to objects to enqueue.
274 * @param nb_obj
275 *   The number of objects to add to the dring from the objs[].
276 * @param drbs
277 *   An array of pointers to the drbs that can be used by the dring
278 *   to perform enqueue operation.
279 * @param nb_drb
280 *   at input: number of elements in the drbs[] array.
281 *   at output: number of unused by the dring elements in the drbs[] array.
282 * @return
283 *   - number of enqueued objects.
284 */
285static inline uint32_t
286tle_dring_enqueue(struct tle_dring *dr, const void * const objs[],
287	uint32_t nb_obj, struct tle_drb *drbs[], uint32_t *nb_drb)
288{
289	if (dr->prod.single == 0)
290		return tle_dring_mp_enqueue(dr, objs, nb_obj, drbs, nb_drb);
291	else
292		return tle_dring_sp_enqueue(dr, objs, nb_obj, drbs, nb_drb);
293}
294
295/*
296 * helper routine, to dequeue objects from the ring.
297 */
298static inline uint32_t __attribute__((always_inline))
299__tle_dring_dequeue(struct tle_dring *dr, uint32_t head,
300	const void *objs[], uint32_t nb_obj,
301	struct tle_drb *drbs[], uint32_t nb_drb)
302{
303	uint32_t i, j, k, n;
304	struct tle_drb *pb;
305
306	pb = dr->cons.crb;
307	i = 0;
308
309	/* copy from the current consumer block */
310	if (pb->size != 0) {
311		n = head - pb->start;
312		k = RTE_MIN(pb->size - n, nb_obj);
313		__tle_dring_copy_objs(objs, pb->objs + n, k);
314		i += k;
315	}
316
317	/* copy from other blocks */
318	j = 0;
319	if (i != nb_obj && nb_drb != 0) {
320
321		do {
322			/* current block is empty, put it into the free list. */
323			if (pb != &dr->dummy)
324				drbs[j++] = pb;
325
326			/* proceed to the next block. */
327			pb = pb->next;
328			k = RTE_MIN(pb->size, nb_obj - i);
329			__tle_dring_copy_objs(objs + i, pb->objs, k);
330			i += k;
331		} while (j != nb_drb && i != nb_obj);
332
333		/* new consumer currect block. */
334		dr->cons.crb = pb;
335	}
336
337	/* we have to dequeue all requested objects. */
338	TLE_DRING_ASSERT(nb_obj == i);
339
340	/* return number of blocks to free. */
341	return j;
342}
343
344/**
345 * Dequeue several objects from the dring (multi-consumers safe).
346 * Note, that it is a caller responsibility to provide drbs[] large
347 * enough to store pointers to all drbs that might become unused
348 * after that dequeue operation. It is a caller responsibility to manage
349 * unused drbs after the dequeue operation is completed
350 * (i.e mark them as free/reusable again, etc.).
351 *
352 * @param dr
353 *   A pointer to the ring structure.
354 * @param objs
355 *   An array of pointers to objects that will be dequeued.
356 * @param nb_obj
357 *   The number of objects to dequeue from the dring.
358 * @param drbs
359 *   An array of pointers to the drbs that will become unused after that
360 *   dequeue operation.
361 * @param nb_drb
362 *   at input: number of elements in the drbs[] array.
363 *   at output: number of filled entries in the drbs[] array.
364 * @return
365 *   - number of dequeued objects.
366 */
367static inline uint32_t
368tle_dring_mc_dequeue(struct tle_dring *dr, const void *objs[], uint32_t nb_obj,
369	struct tle_drb *drbs[], uint32_t *nb_drb)
370{
371	uint32_t head, next, num, tail;
372
373	/* move cons.head atomically */
374	do {
375		head = dr->cons.head;
376		tail = dr->prod.tail;
377
378		num = RTE_MIN(tail - head, nb_obj);
379
380		/* no objects to dequeue */
381		if (num == 0) {
382			*nb_drb = 0;
383			return 0;
384		}
385
386		next = head + num;
387	} while (rte_atomic32_cmpset(&dr->cons.head, head, next) == 0);
388
389	/*
390	 * If there are other dequeues in progress that preceded that one,
391	 * then wait for them to complete
392	 */
393	 while (dr->cons.tail != head)
394		rte_pause();
395
396	/* make sure that changes from previous updates are visible. */
397	rte_smp_rmb();
398
399	/* now it is safe to dequeue from the ring. */
400	*nb_drb = __tle_dring_dequeue(dr, head, objs, num, drbs, *nb_drb);
401
402	/* update consumer tail value. */
403	rte_smp_wmb();
404	dr->cons.tail = next;
405
406	return num;
407}
408
409/**
410 * Dequeue several objects from the dring (NOT multi-consumers safe).
411 * Note, that it is a caller responsibility to provide drbs[] large
412 * enough to store pointers to all drbs that might become unused
413 * after that dequeue operation. It is a caller responsibility to manage
414 * unused drbs after the dequeue operation is completed
415 * (i.e mark them as free/reusable again, etc.).
416 *
417 * @param dr
418 *   A pointer to the ring structure.
419 * @param objs
420 *   An array of pointers to objects that will be dequeued.
421 * @param nb_obj
422 *   The number of objects to dequeue from the dring.
423 * @param drbs
424 *   An array of pointers to the drbs that will become unused after that
425 *   dequeue operation.
426 * @param nb_drb
427 *   at input: number of elements in the drbs[] array.
428 *   at output: number of filled entries in the drbs[] array.
429 * @return
430 *   - number of dequeued objects.
431 */
432static inline uint32_t
433tle_dring_sc_dequeue(struct tle_dring *dr, const void *objs[], uint32_t nb_obj,
434	struct tle_drb *drbs[], uint32_t *nb_drb)
435{
436	uint32_t head, next, num, tail;
437
438	head = dr->cons.head;
439	tail = dr->prod.tail;
440
441	num = RTE_MIN(tail - head, nb_obj);
442
443	/* no objects to dequeue */
444	if (num == 0) {
445		*nb_drb = 0;
446		return 0;
447	}
448
449	next = head + num;
450
451	/* update consumer head value. */
452	dr->cons.head = next;
453
454	/* dequeue from the ring. */
455	*nb_drb = __tle_dring_dequeue(dr, head, objs, num, drbs, *nb_drb);
456
457	/* update consumer tail value. */
458	rte_smp_wmb();
459	dr->cons.tail = next;
460
461	return num;
462}
463
464/**
465 * Dequeue several objects from the dring.
466 * Note, that it is a caller responsibility to provide drbs[] large
467 * enough to store pointers to all drbs that might become unused
468 * after that dequeue operation. It is a caller responsibility to manage
469 * unused drbs after the dequeue operation is completed
470 * (i.e mark them as free/reusable again, etc.).
471 *
472 * @param dr
473 *   A pointer to the ring structure.
474 * @param objs
475 *   An array of pointers to objects that will be dequeued.
476 * @param nb_obj
477 *   The number of objects to dequeue from the dring.
478 * @param drbs
479 *   An array of pointers to the drbs that will become unused after that
480 *   dequeue operation.
481 * @param nb_drb
482 *   at input: number of elements in the drbs[] array.
483 *   at output: number of filled entries in the drbs[] array.
484 * @return
485 *   - number of dequeued objects.
486 */
487static inline uint32_t
488tle_dring_dequeue(struct tle_dring *dr, const void *objs[], uint32_t nb_obj,
489	struct tle_drb *drbs[], uint32_t *nb_drb)
490{
491	if (dr->cons.single == 0)
492		return tle_dring_mc_dequeue(dr, objs, nb_obj, drbs, nb_drb);
493	else
494		return tle_dring_sc_dequeue(dr, objs, nb_obj, drbs, nb_drb);
495}
496
497/**
498 * Reset given dring to the initial state.
499 * Note, that information about all queued objects will be lost.
500 *
501 * @param dr
502 *   A pointer to the dring structure.
503 */
504static inline void
505tle_dring_reset(struct tle_dring *dr, uint32_t flags)
506{
507	memset(dr, 0, sizeof(*dr));
508	dr->prod.crb = &dr->dummy;
509	dr->cons.crb = &dr->dummy;
510	dr->prod.single = ((flags & RING_F_SP_ENQ) != 0);
511	dr->cons.single = ((flags & RING_F_SC_DEQ) != 0);
512	dr->flags = flags;
513}
514
515/**
516 * Calculate required size for drb to store up to *num* objects.
517 *
518 * @param num
519 *   Number of objects drb should be able to store.
520 * @return
521 *   - required size of the drb.
522 */
523static inline size_t
524tle_drb_calc_size(uint32_t num)
525{
526	size_t sz;
527
528	sz = offsetof(struct tle_drb, objs[num]);
529	return RTE_ALIGN_CEIL(sz, RTE_CACHE_LINE_SIZE);
530}
531
532/**
533 * Dump information about the dring to the file.
534 *
535 * @param f
536 *   A pointer to the file.
537 * @param verb
538 *   Verbosity level (currently only 0 or 1).
539 * @param dr
540 *   A pointer to the dring structure.
541 */
542extern void tle_dring_dump(FILE *f, int32_t verb, const struct tle_dring *dr);
543
544#ifdef __cplusplus
545}
546#endif
547
548#endif /* _TLE_DRING_H_ */
549