1/*
2 *-
3 *   BSD LICENSE
4 *
5 *   Copyright(c) 2015 Intel Corporation. All rights reserved.
6 *   All rights reserved.
7 *
8 *   Redistribution and use in source and binary forms, with or without
9 *   modification, are permitted provided that the following conditions
10 *   are met:
11 *
12 *     * Redistributions of source code must retain the above copyright
13 *       notice, this list of conditions and the following disclaimer.
14 *     * Redistributions in binary form must reproduce the above copyright
15 *       notice, this list of conditions and the following disclaimer in
16 *       the documentation and/or other materials provided with the
17 *       distribution.
18 *     * Neither the name of Intel Corporation nor the names of its
19 *       contributors may be used to endorse or promote products derived
20 *       from this software without specific prior written permission.
21 *
22 *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23 *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24 *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25 *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26 *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27 *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28 *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29 *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30 *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31 *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32 *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33 */
34
35/*
36 * Some portions of this software is derived from the producer
37 * consumer queues described by Dmitry Vyukov and published  here
38 * http://www.1024cores.net
39 *
40 * Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
41 * Redistribution and use in source and binary forms, with or without
42 * modification, are permitted provided that the following conditions
43 * are met:
44 *
45 * 1. Redistributions of source code must retain the above copyright notice,
46 * this list of conditions and the following disclaimer.
47 *
48 * 2. Redistributions in binary form must reproduce the above copyright notice,
49 * this list of conditions and the following disclaimer in the documentation
50 * and/or other materials provided with the distribution.
51 *
52 * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS"
53 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
54 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
55 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL DMITRY VYUKOV OR CONTRIBUTORS
56 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
57 * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
58 * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
59 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
60 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
61 * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
62 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
63 *
64 * The views and conclusions contained in the software and documentation are
65 * those of the authors and should not be interpreted as representing official
66 * policies, either expressed or implied, of Dmitry Vyukov.
67 */
68
69#ifndef LTHREAD_QUEUE_H_
70#define LTHREAD_QUEUE_H_
71
72#include <string.h>
73
74#include <rte_prefetch.h>
75#include <rte_per_lcore.h>
76
77#include "lthread_int.h"
78#include "lthread.h"
79#include "lthread_diag.h"
80#include "lthread_pool.h"
81
82struct lthread_queue;
83
84/*
85 * This file implements an unbounded FIFO queue based on a lock free
86 * linked list.
87 *
88 * The queue is non-intrusive in that it uses intermediate nodes, and does
89 * not require these nodes to be inserted into the object being placed
90 * in the queue.
91 *
92 * This is slightly more efficient than the very similar queue in lthread_pool
93 * in that it does not have to swing a stub node as the queue becomes empty.
94 *
95 * The queue access functions allocate and free intermediate node
96 * transparently from/to a per scheduler pool ( see lthread_pool.h ).
97 *
98 * The queue provides both MPSC and SPSC insert methods
99 */
100
101/*
102 * define a queue of lthread nodes
103 */
104struct lthread_queue {
105	struct qnode *head;
106	struct qnode *tail __rte_cache_aligned;
107	struct lthread_queue *p;
108	char name[LT_MAX_NAME_SIZE];
109
110	DIAG_COUNT_DEFINE(rd);
111	DIAG_COUNT_DEFINE(wr);
112	DIAG_COUNT_DEFINE(size);
113
114} __rte_cache_aligned;
115
116
117
118static inline struct lthread_queue *
119_lthread_queue_create(const char *name)
120{
121	struct qnode *stub;
122	struct lthread_queue *new_queue;
123
124	new_queue = rte_malloc_socket(NULL, sizeof(struct lthread_queue),
125					RTE_CACHE_LINE_SIZE,
126					rte_socket_id());
127	if (new_queue == NULL)
128		return NULL;
129
130	/* allocated stub node */
131	stub = _qnode_alloc();
132	RTE_ASSERT(stub);
133
134	if (name != NULL)
135		strncpy(new_queue->name, name, sizeof(new_queue->name));
136	new_queue->name[sizeof(new_queue->name)-1] = 0;
137
138	/* initialize queue as empty */
139	stub->next = NULL;
140	new_queue->head = stub;
141	new_queue->tail = stub;
142
143	DIAG_COUNT_INIT(new_queue, rd);
144	DIAG_COUNT_INIT(new_queue, wr);
145	DIAG_COUNT_INIT(new_queue, size);
146
147	return new_queue;
148}
149
150/**
151 * Return true if the queue is empty
152 */
153static inline int __attribute__ ((always_inline))
154_lthread_queue_empty(struct lthread_queue *q)
155{
156	return q->tail == q->head;
157}
158
159
160
161/**
162 * Destroy a queue
163 * fail if queue is not empty
164 */
165static inline int _lthread_queue_destroy(struct lthread_queue *q)
166{
167	if (q == NULL)
168		return -1;
169
170	if (!_lthread_queue_empty(q))
171		return -1;
172
173	_qnode_free(q->head);
174	rte_free(q);
175	return 0;
176}
177
178RTE_DECLARE_PER_LCORE(struct lthread_sched *, this_sched);
179
180/*
181 * Insert a node into a queue
182 * this implementation is multi producer safe
183 */
184static inline struct qnode *__attribute__ ((always_inline))
185_lthread_queue_insert_mp(struct lthread_queue
186							  *q, void *data)
187{
188	struct qnode *prev;
189	struct qnode *n = _qnode_alloc();
190
191	if (n == NULL)
192		return NULL;
193
194	/* set object in node */
195	n->data = data;
196	n->next = NULL;
197
198	/* this is an MPSC method, perform a locked update */
199	prev = n;
200	prev =
201	    (struct qnode *)__sync_lock_test_and_set((uint64_t *) &(q)->head,
202					       (uint64_t) prev);
203	/* there is a window of inconsistency until prev next is set,
204	 * which is why remove must retry
205	 */
206	prev->next = n;
207
208	DIAG_COUNT_INC(q, wr);
209	DIAG_COUNT_INC(q, size);
210
211	return n;
212}
213
214/*
215 * Insert an node into a queue in single producer mode
216 * this implementation is NOT mult producer safe
217 */
218static inline struct qnode *__attribute__ ((always_inline))
219_lthread_queue_insert_sp(struct lthread_queue
220							  *q, void *data)
221{
222	/* allocate a queue node */
223	struct qnode *prev;
224	struct qnode *n = _qnode_alloc();
225
226	if (n == NULL)
227		return NULL;
228
229	/* set data in node */
230	n->data = data;
231	n->next = NULL;
232
233	/* this is an SPSC method, no need for locked exchange operation */
234	prev = q->head;
235	prev->next = q->head = n;
236
237	DIAG_COUNT_INC(q, wr);
238	DIAG_COUNT_INC(q, size);
239
240	return n;
241}
242
243/*
244 * Remove a node from a queue
245 */
246static inline void *__attribute__ ((always_inline))
247_lthread_queue_poll(struct lthread_queue *q)
248{
249	void *data = NULL;
250	struct qnode *tail = q->tail;
251	struct qnode *next = (struct qnode *)tail->next;
252	/*
253	 * There is a small window of inconsistency between producer and
254	 * consumer whereby the queue may appear empty if consumer and
255	 * producer access it at the same time.
256	 * The consumer must handle this by retrying
257	 */
258
259	if (likely(next != NULL)) {
260		q->tail = next;
261		tail->data = next->data;
262		data = tail->data;
263
264		/* free the node */
265		_qnode_free(tail);
266
267		DIAG_COUNT_INC(q, rd);
268		DIAG_COUNT_DEC(q, size);
269		return data;
270	}
271	return NULL;
272}
273
274/*
275 * Remove a node from a queue
276 */
277static inline void *__attribute__ ((always_inline))
278_lthread_queue_remove(struct lthread_queue *q)
279{
280	void *data = NULL;
281
282	/*
283	 * There is a small window of inconsistency between producer and
284	 * consumer whereby the queue may appear empty if consumer and
285	 * producer access it at the same time. We handle this by retrying
286	 */
287	do {
288		data = _lthread_queue_poll(q);
289
290		if (likely(data != NULL)) {
291
292			DIAG_COUNT_INC(q, rd);
293			DIAG_COUNT_DEC(q, size);
294			return data;
295		}
296		rte_compiler_barrier();
297	} while (unlikely(!_lthread_queue_empty(q)));
298	return NULL;
299}
300
301
302#endif				/* LTHREAD_QUEUE_H_ */
303