test_dring.c revision fbba0a3b
1/*
2 * Copyright (c) 2016  Intel Corporation.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#include <string.h>
17#include <stdarg.h>
18#include <stdio.h>
19#include <stdlib.h>
20#include <stdint.h>
21#include <inttypes.h>
22#include <errno.h>
23
24#include <rte_common.h>
25#include <rte_log.h>
26#include <rte_errno.h>
27#include <rte_launch.h>
28#include <rte_cycles.h>
29#include <rte_eal.h>
30#include <rte_per_lcore.h>
31#include <rte_lcore.h>
32#include <rte_ring.h>
33#include <tle_dring.h>
34#include <rte_random.h>
35
36#define OBJ_NUM		UINT16_MAX
37#define ITER_NUM	(4 * OBJ_NUM)
38
39enum {
40	NONE,
41	SINGLE,
42	MULTI,
43};
44
45struct dring_arg {
46	struct tle_dring *dr;
47	struct rte_ring *r;
48	uint32_t iter;
49	int32_t enq_type;
50	int32_t deq_type;
51	uint32_t enq;
52	uint32_t deq;
53};
54
55/*
56 * free memory allocated for drbs and for the ring itself.
57 */
58static void
59fini_drb_ring(struct rte_ring *r)
60{
61	struct tle_drb *drb;
62
63	/* free drbs. */
64	while (rte_ring_dequeue(r, (void **)&drb) == 0)
65		free(drb);
66
67	/* free ring. */
68	free(r);
69}
70
71/*
72 * allocate drbs for specified number of objects, put them into the ring.
73 */
74static struct rte_ring *
75init_drb_ring(uint32_t num)
76{
77	uint32_t i, k, n;
78	size_t sz, tsz;
79	struct rte_ring *r;
80	struct tle_drb *drb;
81
82	/* allocate and initialise rte_ring. */
83
84	n = rte_align32pow2(num);
85	sz =  rte_ring_get_memsize(n);
86
87	r = calloc(1, sz);
88	if (r == NULL) {
89		printf("%s:%d(%u) failed to allocate %zu bytes;\n",
90			__func__, __LINE__, num, sz);
91		return NULL;
92	}
93
94	rte_ring_init(r, __func__, n, 0);
95
96	/* allocate drbs and put them into the ring. */
97
98	tsz = sz;
99	for (i = 0; i != num; i += k) {
100		k =  rte_rand() % (UINT8_MAX + 1) + 1;
101		k = RTE_MIN(k, num - i);
102		sz = tle_drb_calc_size(k);
103		drb = calloc(1, sz);
104		if (drb == NULL) {
105			printf("%s:%d(%u) %u-th iteration: "
106				"failed to allocate %zu bytes;\n",
107				__func__, __LINE__, num, i, sz);
108			fini_drb_ring(r);
109			return NULL;
110		}
111		drb->size = k;
112		rte_ring_enqueue(r, drb);
113		tsz += sz;
114	}
115
116	printf("%s(%u) total %zu bytes allocated, number of drbs: %u;\n",
117		__func__, num, tsz, rte_ring_count(r));
118	return r;
119}
120
121/*
122 * Each enqueued object will contain:
123 * [2-3]B: it's own sequence number.
124 * [0-1]B: next object sequence number, or UINT16_MAX.
125 */
126static void
127test_fill_obj(uintptr_t obj[], uint32_t num)
128{
129	uint32_t i;
130
131	for (i = 0; i != num - 1; i++)
132		obj[i] = i << 16 | (i + 1);
133
134	obj[i] = i << 16 | UINT16_MAX;
135}
136
137static uint32_t
138test_check_obj(uintptr_t obj[], uint32_t num)
139{
140	uint32_t i, h, l, oh, ol;
141
142	h = obj[0] >> 16;
143	l = obj[0] & UINT16_MAX;
144
145	if (h + 1 != l && l != UINT16_MAX)
146		return 0;
147
148	if (l == UINT16_MAX)
149		l = 0;
150
151	for (i = 1; i != num; i++) {
152
153		oh = obj[i] >> 16;
154		ol = obj[i] & UINT16_MAX;
155
156		if (l != oh || (oh + 1 != ol && ol != UINT16_MAX))
157			return i;
158
159		l = ol;
160		if (l == UINT16_MAX)
161			l = 0;
162	}
163
164	return num;
165}
166
167static int
168test_dring_dequeue(struct tle_dring *dr, struct rte_ring *r, uint32_t num,
169	int32_t type)
170{
171	uint32_t i, k, lc, n, t;
172	struct tle_drb *drb[num];
173	uintptr_t obj[num];
174
175	lc = rte_lcore_id();
176	k = num;
177
178	/* dequeue objects. */
179	if (type == SINGLE)
180		n = tle_dring_sc_dequeue(dr, (const void **)obj, num, drb, &k);
181	else if (type == MULTI)
182		n = tle_dring_mc_dequeue(dr, (const void **)obj, num, drb, &k);
183	else
184		return -EINVAL;
185
186	if (n == 0)
187		return 0;
188
189	/* check the data returned. */
190	t = test_check_obj(obj, n);
191	if (t != n) {
192		printf("%s:%d(%p, %u) at lcore %u: invalid dequeued object, "
193			"n=%u, idx=%u, obj=%#x, prev obj=%#x;\n",
194			__func__, __LINE__, dr, num, lc, n, t,
195			(uint32_t)obj[t], (t == 0) ? 0 : (uint32_t)obj[t - 1]);
196		return -EFAULT;
197	}
198
199	/* check and free drbs. */
200	for (i = 0; i != k; i++) {
201		/* udata value for drb in use shouldn't be zero. */
202		if (drb[i]->udata == NULL) {
203			printf("error @ %s:%d(%p, %u) at lcore %u: "
204				"erroneous drb@%p={udata=%p, size=%u,};\n",
205				__func__, __LINE__, dr, num, lc, drb[i],
206				drb[i]->udata, drb[i]->size);
207			return -EFAULT;
208		}
209		drb[i]->udata = NULL;
210		rte_ring_enqueue(r, drb[i]);
211	}
212
213	return n;
214}
215
216static int
217test_dring_enqueue(struct tle_dring *dr, struct rte_ring *r, uint32_t num,
218	int32_t type)
219{
220	uint32_t i, j, k, lc, nb;
221	struct tle_drb *drb[num];
222	uintptr_t obj[num];
223
224	lc = rte_lcore_id();
225
226	/* prepare drbs to enqueue up to *num* objects. */
227	for (i = 0, j = 0; i != num; i += k, j++) {
228
229		if (rte_ring_dequeue(r, (void **)&drb[j]) != 0)
230			break;
231
232		/* udata value for unused drb should be zero. */
233		if (drb[j]->udata != NULL) {
234			printf("error @ %s:%d(%p, %u) at lcore %u: "
235				"erroneous drb@%p={udata=%p, size=%u,};\n",
236				__func__, __LINE__, dr, num, lc, drb[j],
237				drb[j]->udata, drb[j]->size);
238			return -EFAULT;
239		}
240
241		/* update udata value with current lcore id. */
242		drb[j]->udata = (void *)(uintptr_t)(lc + 1);
243		k = drb[j]->size;
244		k = RTE_MIN(k, num - i);
245	}
246
247	/* no free drbs left. */
248	if (i == 0)
249		return 0;
250
251	/* fill objects to enqueue. */
252	test_fill_obj(obj, i);
253
254	/* enqueue into the dring. */
255	nb = j;
256	if (type == SINGLE)
257		k = tle_dring_sp_enqueue(dr, (const void **)obj, i, drb, &nb);
258	else if (type == MULTI)
259		k = tle_dring_mp_enqueue(dr, (const void **)obj, i, drb, &nb);
260	else
261		return -EINVAL;
262
263	if (k != i) {
264		printf("%s:%d(%p, %p, %u): failed to enqueue %u objects;\n",
265			__func__, __LINE__, dr, r, num, i);
266	}
267
268	/* free unused drbs */
269	for (i = j - nb; i != j; i++) {
270		if ((uintptr_t)drb[i]->udata != lc + 1) {
271			printf("error @ %s:%d(%p, %u) at lcore %u: "
272				"erroneous drb@%p={udata=%p, size=%u,};\n",
273				__func__, __LINE__, dr, num, lc, drb[i],
274				drb[i]->udata, drb[i]->size);
275			return -EFAULT;
276		}
277		drb[i]->udata = NULL;
278		rte_ring_enqueue(r, drb[i]);
279	}
280
281	return k;
282}
283
284static int
285test_dring_enq_deq(struct dring_arg *arg)
286{
287	int32_t rc;
288	uint32_t i, lc, n;
289
290	rc = 0;
291	arg->enq = 0;
292	arg->deq = 0;
293	lc = rte_lcore_id();
294
295	for (i = 0; i != arg->iter; i++) {
296
297		/* try to enqueue random number of objects. */
298		if (arg->enq_type != NONE) {
299			n = rte_rand() % (UINT8_MAX + 1);
300			rc = test_dring_enqueue(arg->dr, arg->r, n,
301				arg->enq_type);
302			if (rc < 0)
303				break;
304			arg->enq += rc;
305		}
306
307		/* try to dequeue random number of objects. */
308		if (arg->deq_type != NONE) {
309			n = rte_rand() % (UINT8_MAX + 1);
310			rc = test_dring_dequeue(arg->dr, arg->r, n,
311				arg->deq_type);
312			if (rc < 0)
313				break;
314			arg->deq += rc;
315		}
316	}
317
318	if (rc < 0)
319		return rc;
320
321	/* dequeue remaining objects. */
322	while (arg->deq_type != NONE && arg->enq != arg->deq) {
323
324		/* try to dequeue random number of objects. */
325		n = rte_rand() % (UINT8_MAX + 1) + 1;
326		rc = test_dring_dequeue(arg->dr, arg->r, n, arg->deq_type);
327		if (rc <= 0)
328			break;
329		arg->deq += rc;
330	}
331
332	printf("%s:%d(lcore=%u, enq_type=%d, deq_type=%d): "
333		"%u objects enqueued, %u objects dequeued\n",
334		__func__, __LINE__, lc, arg->enq_type, arg->deq_type,
335		arg->enq, arg->deq);
336	return 0;
337}
338
339/*
340 * enqueue/dequeue by single thread.
341 */
342static int
343test_dring_st(void)
344{
345	int32_t rc;
346	struct rte_ring *r;
347	struct tle_dring dr;
348	struct dring_arg arg;
349
350	printf("%s started;\n", __func__);
351
352	tle_dring_reset(&dr);
353	r = init_drb_ring(OBJ_NUM);
354	if (r == NULL)
355		return -ENOMEM;
356
357	tle_dring_dump(stdout, 1, &dr);
358
359	memset(&arg, 0, sizeof(arg));
360	arg.dr = &dr;
361	arg.r = r;
362	arg.iter = ITER_NUM;
363	arg.enq_type = SINGLE;
364	arg.deq_type = SINGLE;
365	rc = test_dring_enq_deq(&arg);
366
367	rc = (rc != 0) ? rc : (arg.enq != arg.deq);
368	printf("%s finished with status: %s(%d);\n",
369		__func__, strerror(-rc), rc);
370
371	tle_dring_dump(stdout, rc != 0, &dr);
372	fini_drb_ring(r);
373
374	return rc;
375}
376
377static int
378test_dring_worker(void *arg)
379{
380	struct dring_arg *p;
381
382	p = (struct dring_arg *)arg;
383	return test_dring_enq_deq(p);
384}
385
386/*
387 * enqueue/dequeue by multiple threads.
388 */
389static int
390test_dring_mt(int32_t master_enq_type, int32_t master_deq_type,
391	int32_t slave_enq_type, int32_t slave_deq_type)
392{
393	int32_t rc;
394	uint32_t lc;
395	uint64_t deq, enq;
396	struct rte_ring *r;
397	struct tle_dring dr;
398	struct dring_arg arg[RTE_MAX_LCORE];
399
400	tle_dring_reset(&dr);
401	r = init_drb_ring(OBJ_NUM);
402	if (r == NULL)
403		return -ENOMEM;
404
405	memset(arg, 0, sizeof(arg));
406
407	/* launch on all slaves */
408	RTE_LCORE_FOREACH_SLAVE(lc) {
409		arg[lc].dr = &dr;
410		arg[lc].r = r;
411		arg[lc].iter = ITER_NUM;
412		arg[lc].enq_type = slave_enq_type;
413		arg[lc].deq_type = slave_deq_type;
414		rte_eal_remote_launch(test_dring_worker, &arg[lc], lc);
415	}
416
417	/* launch on master */
418	lc = rte_lcore_id();
419	arg[lc].dr = &dr;
420	arg[lc].r = r;
421	arg[lc].iter = ITER_NUM;
422	arg[lc].enq_type = master_enq_type;
423	arg[lc].deq_type = master_deq_type;
424	rc = test_dring_worker(&arg[lc]);
425	enq = arg[lc].enq;
426	deq = arg[lc].deq;
427
428	/* wait for slaves. */
429	RTE_LCORE_FOREACH_SLAVE(lc) {
430		rc |= rte_eal_wait_lcore(lc);
431		enq += arg[lc].enq;
432		deq += arg[lc].deq;
433	}
434
435	printf("%s:%d: total %" PRIu64 " objects enqueued, %"
436		PRIu64 " objects dequeued\n",
437		__func__, __LINE__, enq, deq);
438
439	rc = (rc != 0) ? rc : (enq != deq);
440	if (rc != 0)
441		tle_dring_dump(stdout, 1, &dr);
442
443	fini_drb_ring(r);
444	return rc;
445}
446
447static int
448test_dring_mp_mc(void)
449{
450	int32_t rc;
451
452	printf("%s started;\n", __func__);
453	rc = test_dring_mt(MULTI, MULTI, MULTI, MULTI);
454	printf("%s finished with status: %s(%d);\n",
455		__func__, strerror(-rc), rc);
456	return rc;
457}
458
459static int
460test_dring_mp_sc(void)
461{
462	int32_t rc;
463
464	printf("%s started;\n", __func__);
465	rc = test_dring_mt(MULTI, SINGLE, MULTI, NONE);
466	printf("%s finished with status: %s(%d);\n",
467		__func__, strerror(-rc), rc);
468	return rc;
469}
470
471static int
472test_dring_sp_mc(void)
473{
474	int32_t rc;
475
476	printf("%s started;\n", __func__);
477	rc = test_dring_mt(SINGLE, MULTI, NONE, MULTI);
478	printf("%s finished with status: %s(%d);\n",
479		__func__, strerror(-rc), rc);
480	return rc;
481}
482
483static int
484test_dring(void)
485{
486	int32_t rc;
487
488	rc = test_dring_st();
489	if (rc != 0)
490		return rc;
491
492	rc = test_dring_mp_mc();
493	if (rc != 0)
494		return rc;
495
496	rc = test_dring_mp_sc();
497	if (rc != 0)
498		return rc;
499
500	rc = test_dring_sp_mc();
501	if (rc != 0)
502		return rc;
503
504	return 0;
505}
506
507int
508main(int argc, char *argv[])
509{
510	int32_t rc;
511
512	rc = rte_eal_init(argc, argv);
513	if (rc < 0)
514		rte_exit(EXIT_FAILURE,
515			"%s: rte_eal_init failed with error code: %d\n",
516			__func__, rc);
517
518	rc = test_dring();
519	if (rc != 0)
520		printf("TEST FAILED\n");
521	else
522		printf("TEST OK\n");
523
524	return rc;
525}
526