queue.c revision e4a08c1f
1/*
2 *------------------------------------------------------------------
3 * svm_queue.c - unidirectional shared-memory queues
4 *
5 * Copyright (c) 2009-2019 Cisco and/or its affiliates.
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at:
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *------------------------------------------------------------------
18 */
19
20
21#include <stdio.h>
22#include <stdlib.h>
23#include <string.h>
24#include <pthread.h>
25#include <vppinfra/mem.h>
26#include <vppinfra/format.h>
27#include <vppinfra/cache.h>
28#include <svm/queue.h>
29#include <vppinfra/time.h>
30#include <vppinfra/lock.h>
31
32svm_queue_t *
33svm_queue_init (void *base, int nels, int elsize)
34{
35  svm_queue_t *q;
36  pthread_mutexattr_t attr;
37  pthread_condattr_t cattr;
38
39  q = (svm_queue_t *) base;
40  clib_memset (q, 0, sizeof (*q));
41
42  q->elsize = elsize;
43  q->maxsize = nels;
44  q->producer_evtfd = -1;
45  q->consumer_evtfd = -1;
46
47  clib_memset (&attr, 0, sizeof (attr));
48  clib_memset (&cattr, 0, sizeof (cattr));
49
50  if (pthread_mutexattr_init (&attr))
51    clib_unix_warning ("mutexattr_init");
52  if (pthread_mutexattr_setpshared (&attr, PTHREAD_PROCESS_SHARED))
53    clib_unix_warning ("pthread_mutexattr_setpshared");
54  if (pthread_mutex_init (&q->mutex, &attr))
55    clib_unix_warning ("mutex_init");
56  if (pthread_mutexattr_destroy (&attr))
57    clib_unix_warning ("mutexattr_destroy");
58  if (pthread_condattr_init (&cattr))
59    clib_unix_warning ("condattr_init");
60  /* prints funny-looking messages in the Linux target */
61  if (pthread_condattr_setpshared (&cattr, PTHREAD_PROCESS_SHARED))
62    clib_unix_warning ("condattr_setpshared");
63  if (pthread_cond_init (&q->condvar, &cattr))
64    clib_unix_warning ("cond_init1");
65  if (pthread_condattr_destroy (&cattr))
66    clib_unix_warning ("cond_init2");
67
68  return (q);
69}
70
71svm_queue_t *
72svm_queue_alloc_and_init (int nels, int elsize, int consumer_pid)
73{
74  svm_queue_t *q;
75
76  q = clib_mem_alloc_aligned (sizeof (svm_queue_t)
77			      + nels * elsize, CLIB_CACHE_LINE_BYTES);
78  clib_memset (q, 0, sizeof (*q));
79  q = svm_queue_init (q, nels, elsize);
80  q->consumer_pid = consumer_pid;
81
82  return q;
83}
84
85/*
86 * svm_queue_free
87 */
88void
89svm_queue_free (svm_queue_t * q)
90{
91  (void) pthread_mutex_destroy (&q->mutex);
92  (void) pthread_cond_destroy (&q->condvar);
93  clib_mem_free (q);
94}
95
96void
97svm_queue_lock (svm_queue_t * q)
98{
99  pthread_mutex_lock (&q->mutex);
100}
101
102void
103svm_queue_unlock (svm_queue_t * q)
104{
105  pthread_mutex_unlock (&q->mutex);
106}
107
108int
109svm_queue_is_full (svm_queue_t * q)
110{
111  return q->cursize == q->maxsize;
112}
113
114static inline void
115svm_queue_send_signal_inline (svm_queue_t * q, u8 is_prod)
116{
117  if (q->producer_evtfd == -1)
118    {
119      (void) pthread_cond_broadcast (&q->condvar);
120    }
121  else
122    {
123      int __clib_unused rv, fd;
124      u64 data = 1;
125      ASSERT (q->consumer_evtfd > 0 && q->producer_evtfd > 0);
126      fd = is_prod ? q->producer_evtfd : q->consumer_evtfd;
127      rv = write (fd, &data, sizeof (data));
128      if (PREDICT_FALSE (rv < 0))
129	clib_unix_warning ("signal write on %d returned %d", fd, rv);
130    }
131}
132
133void
134svm_queue_send_signal (svm_queue_t * q, u8 is_prod)
135{
136  svm_queue_send_signal_inline (q, is_prod);
137}
138
139static inline void
140svm_queue_wait_inline (svm_queue_t * q)
141{
142  if (q->producer_evtfd == -1)
143    {
144      pthread_cond_wait (&q->condvar, &q->mutex);
145    }
146  else
147    {
148      /* Fake a wait for event. We could use epoll but that would mean
149       * using yet another fd. Should do for now */
150      u32 cursize = q->cursize;
151      svm_queue_unlock (q);
152      while (q->cursize == cursize)
153	CLIB_PAUSE ();
154      svm_queue_lock (q);
155    }
156}
157
158void
159svm_queue_wait (svm_queue_t * q)
160{
161  svm_queue_wait_inline (q);
162}
163
164static inline int
165svm_queue_timedwait_inline (svm_queue_t * q, double timeout)
166{
167  struct timespec ts;
168  ts.tv_sec = unix_time_now () + (u32) timeout;
169  ts.tv_nsec = (timeout - (u32) timeout) * 1e9;
170
171  if (q->producer_evtfd == -1)
172    {
173      return pthread_cond_timedwait (&q->condvar, &q->mutex, &ts);
174    }
175  else
176    {
177      double max_time = unix_time_now () + timeout;
178      u32 cursize = q->cursize;
179      int rv;
180
181      svm_queue_unlock (q);
182      while (q->cursize == cursize && unix_time_now () < max_time)
183	CLIB_PAUSE ();
184      rv = unix_time_now () < max_time ? 0 : ETIMEDOUT;
185      svm_queue_lock (q);
186      return rv;
187    }
188}
189
190int
191svm_queue_timedwait (svm_queue_t * q, double timeout)
192{
193  return svm_queue_timedwait_inline (q, timeout);
194}
195
196/*
197 * svm_queue_add_nolock
198 */
199int
200svm_queue_add_nolock (svm_queue_t * q, u8 * elem)
201{
202  i8 *tailp;
203  int need_broadcast = 0;
204
205  if (PREDICT_FALSE (q->cursize == q->maxsize))
206    {
207      while (q->cursize == q->maxsize)
208	svm_queue_wait_inline (q);
209    }
210
211  tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
212  clib_memcpy_fast (tailp, elem, q->elsize);
213
214  q->tail++;
215  q->cursize++;
216
217  need_broadcast = (q->cursize == 1);
218
219  if (q->tail == q->maxsize)
220    q->tail = 0;
221
222  if (need_broadcast)
223    svm_queue_send_signal_inline (q, 1);
224  return 0;
225}
226
227void
228svm_queue_add_raw (svm_queue_t * q, u8 * elem)
229{
230  i8 *tailp;
231
232  tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
233  clib_memcpy_fast (tailp, elem, q->elsize);
234
235  q->tail = (q->tail + 1) % q->maxsize;
236  q->cursize++;
237
238  if (q->cursize == 1)
239    svm_queue_send_signal_inline (q, 1);
240}
241
242
243/*
244 * svm_queue_add
245 */
246int
247svm_queue_add (svm_queue_t * q, u8 * elem, int nowait)
248{
249  i8 *tailp;
250  int need_broadcast = 0;
251
252  if (nowait)
253    {
254      /* zero on success */
255      if (pthread_mutex_trylock (&q->mutex))
256	{
257	  return (-1);
258	}
259    }
260  else
261    svm_queue_lock (q);
262
263  if (PREDICT_FALSE (q->cursize == q->maxsize))
264    {
265      if (nowait)
266	{
267	  svm_queue_unlock (q);
268	  return (-2);
269	}
270      while (q->cursize == q->maxsize)
271	svm_queue_wait_inline (q);
272    }
273
274  tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
275  clib_memcpy_fast (tailp, elem, q->elsize);
276
277  q->tail++;
278  q->cursize++;
279
280  need_broadcast = (q->cursize == 1);
281
282  if (q->tail == q->maxsize)
283    q->tail = 0;
284
285  if (need_broadcast)
286    svm_queue_send_signal_inline (q, 1);
287
288  svm_queue_unlock (q);
289
290  return 0;
291}
292
293/*
294 * svm_queue_add2
295 */
296int
297svm_queue_add2 (svm_queue_t * q, u8 * elem, u8 * elem2, int nowait)
298{
299  i8 *tailp;
300  int need_broadcast = 0;
301
302  if (nowait)
303    {
304      /* zero on success */
305      if (pthread_mutex_trylock (&q->mutex))
306	{
307	  return (-1);
308	}
309    }
310  else
311    svm_queue_lock (q);
312
313  if (PREDICT_FALSE (q->cursize + 1 == q->maxsize))
314    {
315      if (nowait)
316	{
317	  svm_queue_unlock (q);
318	  return (-2);
319	}
320      while (q->cursize + 1 == q->maxsize)
321	svm_queue_wait_inline (q);
322    }
323
324  tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
325  clib_memcpy_fast (tailp, elem, q->elsize);
326
327  q->tail++;
328  q->cursize++;
329
330  if (q->tail == q->maxsize)
331    q->tail = 0;
332
333  need_broadcast = (q->cursize == 1);
334
335  tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
336  clib_memcpy_fast (tailp, elem2, q->elsize);
337
338  q->tail++;
339  q->cursize++;
340
341  if (q->tail == q->maxsize)
342    q->tail = 0;
343
344  if (need_broadcast)
345    svm_queue_send_signal_inline (q, 1);
346
347  svm_queue_unlock (q);
348
349  return 0;
350}
351
352/*
353 * svm_queue_sub
354 */
355int
356svm_queue_sub (svm_queue_t * q, u8 * elem, svm_q_conditional_wait_t cond,
357	       u32 time)
358{
359  i8 *headp;
360  int need_broadcast = 0;
361  int rc = 0;
362
363  if (cond == SVM_Q_NOWAIT)
364    {
365      /* zero on success */
366      if (pthread_mutex_trylock (&q->mutex))
367	{
368	  return (-1);
369	}
370    }
371  else
372    svm_queue_lock (q);
373
374  if (PREDICT_FALSE (q->cursize == 0))
375    {
376      if (cond == SVM_Q_NOWAIT)
377	{
378	  svm_queue_unlock (q);
379	  return (-2);
380	}
381      else if (cond == SVM_Q_TIMEDWAIT)
382	{
383	  while (q->cursize == 0 && rc == 0)
384	    rc = svm_queue_timedwait_inline (q, time);
385
386	  if (rc == ETIMEDOUT)
387	    {
388	      svm_queue_unlock (q);
389	      return ETIMEDOUT;
390	    }
391	}
392      else
393	{
394	  while (q->cursize == 0)
395	    svm_queue_wait_inline (q);
396	}
397    }
398
399  headp = (i8 *) (&q->data[0] + q->elsize * q->head);
400  clib_memcpy_fast (elem, headp, q->elsize);
401
402  q->head++;
403  /* $$$$ JFC shouldn't this be == 0? */
404  if (q->cursize == q->maxsize)
405    need_broadcast = 1;
406
407  q->cursize--;
408
409  if (q->head == q->maxsize)
410    q->head = 0;
411
412  if (need_broadcast)
413    svm_queue_send_signal_inline (q, 0);
414
415  svm_queue_unlock (q);
416
417  return 0;
418}
419
420int
421svm_queue_sub2 (svm_queue_t * q, u8 * elem)
422{
423  int need_broadcast;
424  i8 *headp;
425
426  svm_queue_lock (q);
427  if (q->cursize == 0)
428    {
429      svm_queue_unlock (q);
430      return -1;
431    }
432
433  headp = (i8 *) (&q->data[0] + q->elsize * q->head);
434  clib_memcpy_fast (elem, headp, q->elsize);
435
436  q->head++;
437  need_broadcast = (q->cursize == q->maxsize / 2);
438  q->cursize--;
439
440  if (PREDICT_FALSE (q->head == q->maxsize))
441    q->head = 0;
442  svm_queue_unlock (q);
443
444  if (need_broadcast)
445    svm_queue_send_signal_inline (q, 0);
446
447  return 0;
448}
449
450int
451svm_queue_sub_raw (svm_queue_t * q, u8 * elem)
452{
453  int need_broadcast;
454  i8 *headp;
455
456  if (PREDICT_FALSE (q->cursize == 0))
457    {
458      while (q->cursize == 0)
459	;
460    }
461
462  headp = (i8 *) (&q->data[0] + q->elsize * q->head);
463  clib_memcpy_fast (elem, headp, q->elsize);
464
465  need_broadcast = q->cursize == q->maxsize;
466
467  q->head = (q->head + 1) % q->maxsize;
468  q->cursize--;
469
470  if (PREDICT_FALSE (need_broadcast))
471    svm_queue_send_signal_inline (q, 0);
472
473  return 0;
474}
475
476void
477svm_queue_set_producer_event_fd (svm_queue_t * q, int fd)
478{
479  q->producer_evtfd = fd;
480}
481
482void
483svm_queue_set_consumer_event_fd (svm_queue_t * q, int fd)
484{
485  q->consumer_evtfd = fd;
486}
487
488/*
489 * fd.io coding-style-patch-verification: ON
490 *
491 * Local Variables:
492 * eval: (c-set-style "gnu")
493 * End:
494 */
495