tle_dring.h revision b419e591
1/*
2 * Copyright (c) 2016-2017  Intel Corporation.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#ifndef _TLE_DRING_H_
17#define _TLE_DRING_H_
18
19#include <string.h>
20
21#include <rte_common.h>
22#include <rte_atomic.h>
23#include <rte_memory.h>
24#include <rte_ring.h>
25#include <rte_debug.h>
26
27#ifdef __cplusplus
28extern "C" {
29#endif
30
31/**
32 * @file
33 * TLE dring
34 *
35 * The Dynamic Ring (dring) is a implementation of unbounded FIFO queue,
36 * that supports lockless bulk enqueue/dequeue for multiple producers/consumers.
37 * Internally it is represented by linked list of Dynamic Ring Blocks (drb).
38 * Each drb contains some metadata plus array of pointers to queued objects.
39 * It is a caller responsibility to provide sufficient number of drbs for
40 * enqueue operation, and manage unused drbs returned by dequeue operation.
41 * dring features:
42 *
43 * - FIFO (First In First Out)
44 * - Lockless implementation.
45 * - Multi- or single-consumer dequeue.
46 * - Multi- or single-producer enqueue.
47 * - Bulk dequeue.
48 * - Bulk enqueue.
49 */
50
51/*
52 * RTE_ASSERT was introduced in DPDK 16.07.
53 * For older versions, use RTE_VERIFY.
54 */
55#ifdef RTE_ASSERT
56#define TLE_DRING_ASSERT(exp)	RTE_ASSERT(exp)
57#else
58#define TLE_DRING_ASSERT(exp)	RTE_VERIFY(exp)
59#endif
60
61struct tle_drb {
62	struct tle_drb *next;
63	void *udata;          /**< user data. */
64	uint32_t size;        /**< number of objects in that buffer. */
65	uint32_t start;       /**< start index for that block. */
66	const void *objs[0];
67} __rte_cache_aligned;
68
69struct tle_dring {
70	uint32_t flags;
71	struct  {
72		uint32_t single;                /**< true if single producer */
73		volatile uint32_t head;         /**< producer head */
74		volatile uint32_t tail;         /**< producer tail */
75		struct tle_drb * volatile crb;  /**< block to enqueue to */
76	} prod __rte_cache_aligned;
77	struct  {
78		uint32_t single;                /**< true if single consumer */
79		volatile uint32_t head;         /**< consumer head */
80		volatile uint32_t tail;         /**< consumer tail */
81		struct tle_drb * volatile crb;  /**< block to dequeue from */
82	} cons __rte_cache_aligned;
83
84	struct tle_drb dummy;  /**< dummy block */
85};
86
87static inline uint32_t
88tle_dring_count(const struct tle_dring *dr)
89{
90	return dr->prod.tail - dr->cons.tail;
91}
92
93/*
94 * helper routine, to copy objects to/from the ring.
95 */
96static inline void __attribute__((always_inline))
97__tle_dring_copy_objs(const void *dst[], const void * const src[], uint32_t num)
98{
99	uint32_t i;
100
101	for (i = 0; i != RTE_ALIGN_FLOOR(num, 4); i += 4) {
102		dst[i] = src[i];
103		dst[i + 1] = src[i + 1];
104		dst[i + 2] = src[i + 2];
105		dst[i + 3] = src[i + 3];
106	}
107	switch (num % 4) {
108	case 3:
109		dst[i + 2] = src[i + 2];
110		/* fallthrough */
111	case 2:
112		dst[i + 1] = src[i + 1];
113		/* fallthrough */
114	case 1:
115		dst[i] = src[i];
116	}
117}
118
119/*
120 * helper routine, to enqueue objects into the ring.
121 */
122static inline uint32_t __attribute__((always_inline))
123__tle_dring_enqueue(struct tle_dring *dr, uint32_t head,
124	const void * const objs[], uint32_t nb_obj,
125	struct tle_drb *drbs[], uint32_t nb_drb)
126{
127	uint32_t i, j, k, n;
128	struct tle_drb *pb;
129
130	pb = dr->prod.crb;
131	i = 0;
132
133	/* fill the current producer block */
134	if (pb->size != 0) {
135		n = head - pb->start;
136		k = RTE_MIN(pb->size - n, nb_obj);
137		__tle_dring_copy_objs(pb->objs + n, objs, k);
138		i += k;
139	}
140
141	/* fill new blocks, if any */
142	j = 0;
143	if (i != nb_obj && nb_drb != 0) {
144
145		do {
146			pb->next = drbs[j];
147			pb = drbs[j];
148			pb->start = head + i;
149			k = RTE_MIN(pb->size, nb_obj - i);
150			__tle_dring_copy_objs(pb->objs, objs + i, k);
151			i += k;
152		} while (++j != nb_drb && i != nb_obj);
153
154		pb->next = NULL;
155
156		/* new procucer current block. */
157		dr->prod.crb = pb;
158	}
159
160	/* we have to enqueue all requested objects. */
161	TLE_DRING_ASSERT(nb_obj == i);
162
163	/* return number of unused blocks. */
164	return nb_drb - j;
165}
166
167/**
168 * Enqueue several objects on the dring (multi-producers safe).
169 * Note that it is a caller responsibility to provide enough drbs
170 * to enqueue all requested objects.
171 *
172 * @param dr
173 *   A pointer to the ring structure.
174 * @param objs
175 *   An array of pointers to objects to enqueue.
176 * @param nb_obj
177 *   The number of objects to add to the dring from the objs[].
178 * @param drbs
179 *   An array of pointers to the drbs that can be used by the dring
180 *   to perform enqueue operation.
181 * @param nb_drb
182 *   at input: number of elements in the drbs[] array.
183 *   at output: number of unused by the dring elements in the drbs[] array.
184 * @return
185 *   - number of enqueued objects.
186 */
187static inline uint32_t
188tle_dring_mp_enqueue(struct tle_dring *dr, const void * const objs[],
189	uint32_t nb_obj, struct tle_drb *drbs[], uint32_t *nb_drb)
190{
191	uint32_t head, next;
192
193	if (nb_obj == 0)
194		return 0;
195
196	/* reserve position inside the ring. */
197	do {
198		head = dr->prod.head;
199		next = head + nb_obj;
200	} while (rte_atomic32_cmpset(&dr->prod.head, head, next) == 0);
201
202	/*
203	 * If there are other enqueues in progress that preceded that one,
204	 * then wait for them to complete
205	 */
206	while (dr->prod.tail != head)
207		rte_pause();
208
209	/* make sure that changes from previous updates are visible. */
210	rte_smp_rmb();
211
212	/* now it is safe to enqueue into the ring. */
213	*nb_drb = __tle_dring_enqueue(dr, head, objs, nb_obj, drbs, *nb_drb);
214
215	/* make new objects visible to the consumer. */
216	rte_smp_wmb();
217	dr->prod.tail = next;
218
219	return nb_obj;
220}
221
222/**
223 * Enqueue several objects on the dring (NOT multi-producers safe).
224 * Note that it is a caller responsibility to provide enough drbs
225 * to enqueue all requested objects.
226 *
227 * @param dr
228 *   A pointer to the ring structure.
229 * @param objs
230 *   An array of pointers to objects to enqueue.
231 * @param nb_obj
232 *   The number of objects to add to the dring from the objs[].
233 * @param drbs
234 *   An array of pointers to the drbs that can be used by the dring
235 *   to perform enqueue operation.
236 * @param nb_drb
237 *   at input: number of elements in the drbs[] array.
238 *   at output: number of unused by the dring elements in the drbs[] array.
239 * @return
240 *   - number of enqueued objects.
241 */
242static inline uint32_t
243tle_dring_sp_enqueue(struct tle_dring *dr, const void * const objs[],
244	uint32_t nb_obj, struct tle_drb *drbs[], uint32_t *nb_drb)
245{
246	uint32_t head, next;
247
248	if (nb_obj == 0)
249		return 0;
250
251	head = dr->prod.head;
252	next = head + nb_obj;
253
254	/* update producer head value. */
255	dr->prod.head = next;
256
257	/* enqueue into the ring. */
258	*nb_drb = __tle_dring_enqueue(dr, head, objs, nb_obj, drbs, *nb_drb);
259
260	/* make new objects visible to the consumer. */
261	rte_smp_wmb();
262	dr->prod.tail = next;
263
264	return nb_obj;
265}
266
267/**
268 * Enqueue several objects on the dring.
269 * Note that it is a caller responsibility to provide enough drbs
270 * to enqueue all requested objects.
271 *
272 * @param dr
273 *   A pointer to the ring structure.
274 * @param objs
275 *   An array of pointers to objects to enqueue.
276 * @param nb_obj
277 *   The number of objects to add to the dring from the objs[].
278 * @param drbs
279 *   An array of pointers to the drbs that can be used by the dring
280 *   to perform enqueue operation.
281 * @param nb_drb
282 *   at input: number of elements in the drbs[] array.
283 *   at output: number of unused by the dring elements in the drbs[] array.
284 * @return
285 *   - number of enqueued objects.
286 */
287static inline uint32_t
288tle_dring_enqueue(struct tle_dring *dr, const void * const objs[],
289	uint32_t nb_obj, struct tle_drb *drbs[], uint32_t *nb_drb)
290{
291	if (dr->prod.single == 0)
292		return tle_dring_mp_enqueue(dr, objs, nb_obj, drbs, nb_drb);
293	else
294		return tle_dring_sp_enqueue(dr, objs, nb_obj, drbs, nb_drb);
295}
296
297/*
298 * helper routine, to dequeue objects from the ring.
299 */
300static inline uint32_t __attribute__((always_inline))
301__tle_dring_dequeue(struct tle_dring *dr, uint32_t head,
302	const void *objs[], uint32_t nb_obj,
303	struct tle_drb *drbs[], uint32_t nb_drb)
304{
305	uint32_t i, j, k, n;
306	struct tle_drb *pb;
307
308	pb = dr->cons.crb;
309	i = 0;
310
311	/* copy from the current consumer block */
312	if (pb->size != 0) {
313		n = head - pb->start;
314		k = RTE_MIN(pb->size - n, nb_obj);
315		__tle_dring_copy_objs(objs, pb->objs + n, k);
316		i += k;
317	}
318
319	/* copy from other blocks */
320	j = 0;
321	if (i != nb_obj && nb_drb != 0) {
322
323		do {
324			/* current block is empty, put it into the free list. */
325			if (pb != &dr->dummy)
326				drbs[j++] = pb;
327
328			/* proceed to the next block. */
329			pb = pb->next;
330			k = RTE_MIN(pb->size, nb_obj - i);
331			__tle_dring_copy_objs(objs + i, pb->objs, k);
332			i += k;
333		} while (j != nb_drb && i != nb_obj);
334
335		/* new consumer currect block. */
336		dr->cons.crb = pb;
337	}
338
339	/* we have to dequeue all requested objects. */
340	TLE_DRING_ASSERT(nb_obj == i);
341
342	/* return number of blocks to free. */
343	return j;
344}
345
346/**
347 * Dequeue several objects from the dring (multi-consumers safe).
348 * Note, that it is a caller responsibility to provide drbs[] large
349 * enough to store pointers to all drbs that might become unused
350 * after that dequeue operation. It is a caller responsibility to manage
351 * unused drbs after the dequeue operation is completed
352 * (i.e mark them as free/reusable again, etc.).
353 *
354 * @param dr
355 *   A pointer to the ring structure.
356 * @param objs
357 *   An array of pointers to objects that will be dequeued.
358 * @param nb_obj
359 *   The number of objects to dequeue from the dring.
360 * @param drbs
361 *   An array of pointers to the drbs that will become unused after that
362 *   dequeue operation.
363 * @param nb_drb
364 *   at input: number of elements in the drbs[] array.
365 *   at output: number of filled entries in the drbs[] array.
366 * @return
367 *   - number of dequeued objects.
368 */
369static inline uint32_t
370tle_dring_mc_dequeue(struct tle_dring *dr, const void *objs[], uint32_t nb_obj,
371	struct tle_drb *drbs[], uint32_t *nb_drb)
372{
373	uint32_t head, next, num, tail;
374
375	/* move cons.head atomically */
376	do {
377		head = dr->cons.head;
378		tail = dr->prod.tail;
379
380		num = RTE_MIN(tail - head, nb_obj);
381
382		/* no objects to dequeue */
383		if (num == 0) {
384			*nb_drb = 0;
385			return 0;
386		}
387
388		next = head + num;
389	} while (rte_atomic32_cmpset(&dr->cons.head, head, next) == 0);
390
391	/*
392	 * If there are other dequeues in progress that preceded that one,
393	 * then wait for them to complete
394	 */
395	 while (dr->cons.tail != head)
396		rte_pause();
397
398	/* make sure that changes from previous updates are visible. */
399	rte_smp_rmb();
400
401	/* now it is safe to dequeue from the ring. */
402	*nb_drb = __tle_dring_dequeue(dr, head, objs, num, drbs, *nb_drb);
403
404	/* update consumer tail value. */
405	rte_smp_wmb();
406	dr->cons.tail = next;
407
408	return num;
409}
410
411/**
412 * Dequeue several objects from the dring (NOT multi-consumers safe).
413 * Note, that it is a caller responsibility to provide drbs[] large
414 * enough to store pointers to all drbs that might become unused
415 * after that dequeue operation. It is a caller responsibility to manage
416 * unused drbs after the dequeue operation is completed
417 * (i.e mark them as free/reusable again, etc.).
418 *
419 * @param dr
420 *   A pointer to the ring structure.
421 * @param objs
422 *   An array of pointers to objects that will be dequeued.
423 * @param nb_obj
424 *   The number of objects to dequeue from the dring.
425 * @param drbs
426 *   An array of pointers to the drbs that will become unused after that
427 *   dequeue operation.
428 * @param nb_drb
429 *   at input: number of elements in the drbs[] array.
430 *   at output: number of filled entries in the drbs[] array.
431 * @return
432 *   - number of dequeued objects.
433 */
434static inline uint32_t
435tle_dring_sc_dequeue(struct tle_dring *dr, const void *objs[], uint32_t nb_obj,
436	struct tle_drb *drbs[], uint32_t *nb_drb)
437{
438	uint32_t head, next, num, tail;
439
440	head = dr->cons.head;
441	tail = dr->prod.tail;
442
443	num = RTE_MIN(tail - head, nb_obj);
444
445	/* no objects to dequeue */
446	if (num == 0) {
447		*nb_drb = 0;
448		return 0;
449	}
450
451	next = head + num;
452
453	/* update consumer head value. */
454	dr->cons.head = next;
455
456	/* dequeue from the ring. */
457	*nb_drb = __tle_dring_dequeue(dr, head, objs, num, drbs, *nb_drb);
458
459	/* update consumer tail value. */
460	rte_smp_wmb();
461	dr->cons.tail = next;
462
463	return num;
464}
465
466/**
467 * Dequeue several objects from the dring.
468 * Note, that it is a caller responsibility to provide drbs[] large
469 * enough to store pointers to all drbs that might become unused
470 * after that dequeue operation. It is a caller responsibility to manage
471 * unused drbs after the dequeue operation is completed
472 * (i.e mark them as free/reusable again, etc.).
473 *
474 * @param dr
475 *   A pointer to the ring structure.
476 * @param objs
477 *   An array of pointers to objects that will be dequeued.
478 * @param nb_obj
479 *   The number of objects to dequeue from the dring.
480 * @param drbs
481 *   An array of pointers to the drbs that will become unused after that
482 *   dequeue operation.
483 * @param nb_drb
484 *   at input: number of elements in the drbs[] array.
485 *   at output: number of filled entries in the drbs[] array.
486 * @return
487 *   - number of dequeued objects.
488 */
489static inline uint32_t
490tle_dring_dequeue(struct tle_dring *dr, const void *objs[], uint32_t nb_obj,
491	struct tle_drb *drbs[], uint32_t *nb_drb)
492{
493	if (dr->cons.single == 0)
494		return tle_dring_mc_dequeue(dr, objs, nb_obj, drbs, nb_drb);
495	else
496		return tle_dring_sc_dequeue(dr, objs, nb_obj, drbs, nb_drb);
497}
498
499/**
500 * Reset given dring to the initial state.
501 * Note, that information about all queued objects will be lost.
502 *
503 * @param dr
504 *   A pointer to the dring structure.
505 */
506static inline void
507tle_dring_reset(struct tle_dring *dr, uint32_t flags)
508{
509	memset(dr, 0, sizeof(*dr));
510	dr->prod.crb = &dr->dummy;
511	dr->cons.crb = &dr->dummy;
512	dr->prod.single = ((flags & RING_F_SP_ENQ) != 0);
513	dr->cons.single = ((flags & RING_F_SC_DEQ) != 0);
514	dr->flags = flags;
515}
516
517/**
518 * Calculate required size for drb to store up to *num* objects.
519 *
520 * @param num
521 *   Number of objects drb should be able to store.
522 * @return
523 *   - required size of the drb.
524 */
525static inline size_t
526tle_drb_calc_size(uint32_t num)
527{
528	size_t sz;
529
530	sz = offsetof(struct tle_drb, objs[num]);
531	return RTE_ALIGN_CEIL(sz, RTE_CACHE_LINE_SIZE);
532}
533
534/**
535 * Dump information about the dring to the file.
536 *
537 * @param f
538 *   A pointer to the file.
539 * @param verb
540 *   Verbosity level (currently only 0 or 1).
541 * @param dr
542 *   A pointer to the dring structure.
543 */
544extern void tle_dring_dump(FILE *f, int32_t verb, const struct tle_dring *dr);
545
546#ifdef __cplusplus
547}
548#endif
549
550#endif /* _TLE_DRING_H_ */
551