test_distributor.c revision 5d4e5dcd
1/*-
2 *   BSD LICENSE
3 *
4 *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
5 *   All rights reserved.
6 *
7 *   Redistribution and use in source and binary forms, with or without
8 *   modification, are permitted provided that the following conditions
9 *   are met:
10 *
11 *     * Redistributions of source code must retain the above copyright
12 *       notice, this list of conditions and the following disclaimer.
13 *     * Redistributions in binary form must reproduce the above copyright
14 *       notice, this list of conditions and the following disclaimer in
15 *       the documentation and/or other materials provided with the
16 *       distribution.
17 *     * Neither the name of Intel Corporation nor the names of its
18 *       contributors may be used to endorse or promote products derived
19 *       from this software without specific prior written permission.
20 *
21 *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 */
33
34#include "test.h"
35
36#include <unistd.h>
37#include <string.h>
38#include <rte_cycles.h>
39#include <rte_errno.h>
40#include <rte_mempool.h>
41#include <rte_mbuf.h>
42#include <rte_distributor.h>
43
44#define ITER_POWER 20 /* log 2 of how many iterations we do when timing. */
45#define BURST 32
46#define BIG_BATCH 1024
47
48/* statics - all zero-initialized by default */
49static volatile int quit;      /**< general quit variable for all threads */
50static volatile int zero_quit; /**< var for when we just want thr0 to quit*/
51static volatile unsigned worker_idx;
52
53struct worker_stats {
54	volatile unsigned handled_packets;
55} __rte_cache_aligned;
56struct worker_stats worker_stats[RTE_MAX_LCORE];
57
58/* returns the total count of the number of packets handled by the worker
59 * functions given below.
60 */
61static inline unsigned
62total_packet_count(void)
63{
64	unsigned i, count = 0;
65	for (i = 0; i < worker_idx; i++)
66		count += worker_stats[i].handled_packets;
67	return count;
68}
69
70/* resets the packet counts for a new test */
71static inline void
72clear_packet_count(void)
73{
74	memset(&worker_stats, 0, sizeof(worker_stats));
75}
76
77/* this is the basic worker function for sanity test
78 * it does nothing but return packets and count them.
79 */
80static int
81handle_work(void *arg)
82{
83	struct rte_mbuf *pkt = NULL;
84	struct rte_distributor *d = arg;
85	unsigned count = 0;
86	unsigned id = __sync_fetch_and_add(&worker_idx, 1);
87
88	pkt = rte_distributor_get_pkt(d, id, NULL);
89	while (!quit) {
90		worker_stats[id].handled_packets++, count++;
91		pkt = rte_distributor_get_pkt(d, id, pkt);
92	}
93	worker_stats[id].handled_packets++, count++;
94	rte_distributor_return_pkt(d, id, pkt);
95	return 0;
96}
97
98/* do basic sanity testing of the distributor. This test tests the following:
99 * - send 32 packets through distributor with the same tag and ensure they
100 *   all go to the one worker
101 * - send 32 packets throught the distributor with two different tags and
102 *   verify that they go equally to two different workers.
103 * - send 32 packets with different tags through the distributors and
104 *   just verify we get all packets back.
105 * - send 1024 packets through the distributor, gathering the returned packets
106 *   as we go. Then verify that we correctly got all 1024 pointers back again,
107 *   not necessarily in the same order (as different flows).
108 */
109static int
110sanity_test(struct rte_distributor *d, struct rte_mempool *p)
111{
112	struct rte_mbuf *bufs[BURST];
113	unsigned i;
114
115	printf("=== Basic distributor sanity tests ===\n");
116	clear_packet_count();
117	if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
118		printf("line %d: Error getting mbufs from pool\n", __LINE__);
119		return -1;
120	}
121
122	/* now set all hash values in all buffers to zero, so all pkts go to the
123	 * one worker thread */
124	for (i = 0; i < BURST; i++)
125		bufs[i]->hash.usr = 0;
126
127	rte_distributor_process(d, bufs, BURST);
128	rte_distributor_flush(d);
129	if (total_packet_count() != BURST) {
130		printf("Line %d: Error, not all packets flushed. "
131				"Expected %u, got %u\n",
132				__LINE__, BURST, total_packet_count());
133		return -1;
134	}
135
136	for (i = 0; i < rte_lcore_count() - 1; i++)
137		printf("Worker %u handled %u packets\n", i,
138				worker_stats[i].handled_packets);
139	printf("Sanity test with all zero hashes done.\n");
140	if (worker_stats[0].handled_packets != BURST)
141		return -1;
142
143	/* pick two flows and check they go correctly */
144	if (rte_lcore_count() >= 3) {
145		clear_packet_count();
146		for (i = 0; i < BURST; i++)
147			bufs[i]->hash.usr = (i & 1) << 8;
148
149		rte_distributor_process(d, bufs, BURST);
150		rte_distributor_flush(d);
151		if (total_packet_count() != BURST) {
152			printf("Line %d: Error, not all packets flushed. "
153					"Expected %u, got %u\n",
154					__LINE__, BURST, total_packet_count());
155			return -1;
156		}
157
158		for (i = 0; i < rte_lcore_count() - 1; i++)
159			printf("Worker %u handled %u packets\n", i,
160					worker_stats[i].handled_packets);
161		printf("Sanity test with two hash values done\n");
162
163		if (worker_stats[0].handled_packets != 16 ||
164				worker_stats[1].handled_packets != 16)
165			return -1;
166	}
167
168	/* give a different hash value to each packet,
169	 * so load gets distributed */
170	clear_packet_count();
171	for (i = 0; i < BURST; i++)
172		bufs[i]->hash.usr = i;
173
174	rte_distributor_process(d, bufs, BURST);
175	rte_distributor_flush(d);
176	if (total_packet_count() != BURST) {
177		printf("Line %d: Error, not all packets flushed. "
178				"Expected %u, got %u\n",
179				__LINE__, BURST, total_packet_count());
180		return -1;
181	}
182
183	for (i = 0; i < rte_lcore_count() - 1; i++)
184		printf("Worker %u handled %u packets\n", i,
185				worker_stats[i].handled_packets);
186	printf("Sanity test with non-zero hashes done\n");
187
188	rte_mempool_put_bulk(p, (void *)bufs, BURST);
189
190	/* sanity test with BIG_BATCH packets to ensure they all arrived back
191	 * from the returned packets function */
192	clear_packet_count();
193	struct rte_mbuf *many_bufs[BIG_BATCH], *return_bufs[BIG_BATCH];
194	unsigned num_returned = 0;
195
196	/* flush out any remaining packets */
197	rte_distributor_flush(d);
198	rte_distributor_clear_returns(d);
199	if (rte_mempool_get_bulk(p, (void *)many_bufs, BIG_BATCH) != 0) {
200		printf("line %d: Error getting mbufs from pool\n", __LINE__);
201		return -1;
202	}
203	for (i = 0; i < BIG_BATCH; i++)
204		many_bufs[i]->hash.usr = i << 2;
205
206	for (i = 0; i < BIG_BATCH/BURST; i++) {
207		rte_distributor_process(d, &many_bufs[i*BURST], BURST);
208		num_returned += rte_distributor_returned_pkts(d,
209				&return_bufs[num_returned],
210				BIG_BATCH - num_returned);
211	}
212	rte_distributor_flush(d);
213	num_returned += rte_distributor_returned_pkts(d,
214			&return_bufs[num_returned], BIG_BATCH - num_returned);
215
216	if (num_returned != BIG_BATCH) {
217		printf("line %d: Number returned is not the same as "
218				"number sent\n", __LINE__);
219		return -1;
220	}
221	/* big check -  make sure all packets made it back!! */
222	for (i = 0; i < BIG_BATCH; i++) {
223		unsigned j;
224		struct rte_mbuf *src = many_bufs[i];
225		for (j = 0; j < BIG_BATCH; j++)
226			if (return_bufs[j] == src)
227				break;
228
229		if (j == BIG_BATCH) {
230			printf("Error: could not find source packet #%u\n", i);
231			return -1;
232		}
233	}
234	printf("Sanity test of returned packets done\n");
235
236	rte_mempool_put_bulk(p, (void *)many_bufs, BIG_BATCH);
237
238	printf("\n");
239	return 0;
240}
241
242
243/* to test that the distributor does not lose packets, we use this worker
244 * function which frees mbufs when it gets them. The distributor thread does
245 * the mbuf allocation. If distributor drops packets we'll eventually run out
246 * of mbufs.
247 */
248static int
249handle_work_with_free_mbufs(void *arg)
250{
251	struct rte_mbuf *pkt = NULL;
252	struct rte_distributor *d = arg;
253	unsigned count = 0;
254	unsigned id = __sync_fetch_and_add(&worker_idx, 1);
255
256	pkt = rte_distributor_get_pkt(d, id, NULL);
257	while (!quit) {
258		worker_stats[id].handled_packets++, count++;
259		rte_pktmbuf_free(pkt);
260		pkt = rte_distributor_get_pkt(d, id, pkt);
261	}
262	worker_stats[id].handled_packets++, count++;
263	rte_distributor_return_pkt(d, id, pkt);
264	return 0;
265}
266
267/* Perform a sanity test of the distributor with a large number of packets,
268 * where we allocate a new set of mbufs for each burst. The workers then
269 * free the mbufs. This ensures that we don't have any packet leaks in the
270 * library.
271 */
272static int
273sanity_test_with_mbuf_alloc(struct rte_distributor *d, struct rte_mempool *p)
274{
275	unsigned i;
276	struct rte_mbuf *bufs[BURST];
277
278	printf("=== Sanity test with mbuf alloc/free  ===\n");
279	clear_packet_count();
280	for (i = 0; i < ((1<<ITER_POWER)); i += BURST) {
281		unsigned j;
282		while (rte_mempool_get_bulk(p, (void *)bufs, BURST) < 0)
283			rte_distributor_process(d, NULL, 0);
284		for (j = 0; j < BURST; j++) {
285			bufs[j]->hash.usr = (i+j) << 1;
286			rte_mbuf_refcnt_set(bufs[j], 1);
287		}
288
289		rte_distributor_process(d, bufs, BURST);
290	}
291
292	rte_distributor_flush(d);
293	if (total_packet_count() < (1<<ITER_POWER)) {
294		printf("Line %u: Packet count is incorrect, %u, expected %u\n",
295				__LINE__, total_packet_count(),
296				(1<<ITER_POWER));
297		return -1;
298	}
299
300	printf("Sanity test with mbuf alloc/free passed\n\n");
301	return 0;
302}
303
304static int
305handle_work_for_shutdown_test(void *arg)
306{
307	struct rte_mbuf *pkt = NULL;
308	struct rte_distributor *d = arg;
309	unsigned count = 0;
310	const unsigned id = __sync_fetch_and_add(&worker_idx, 1);
311
312	pkt = rte_distributor_get_pkt(d, id, NULL);
313	/* wait for quit single globally, or for worker zero, wait
314	 * for zero_quit */
315	while (!quit && !(id == 0 && zero_quit)) {
316		worker_stats[id].handled_packets++, count++;
317		rte_pktmbuf_free(pkt);
318		pkt = rte_distributor_get_pkt(d, id, NULL);
319	}
320	worker_stats[id].handled_packets++, count++;
321	rte_distributor_return_pkt(d, id, pkt);
322
323	if (id == 0) {
324		/* for worker zero, allow it to restart to pick up last packet
325		 * when all workers are shutting down.
326		 */
327		while (zero_quit)
328			usleep(100);
329		pkt = rte_distributor_get_pkt(d, id, NULL);
330		while (!quit) {
331			worker_stats[id].handled_packets++, count++;
332			rte_pktmbuf_free(pkt);
333			pkt = rte_distributor_get_pkt(d, id, NULL);
334		}
335		rte_distributor_return_pkt(d, id, pkt);
336	}
337	return 0;
338}
339
340
341/* Perform a sanity test of the distributor with a large number of packets,
342 * where we allocate a new set of mbufs for each burst. The workers then
343 * free the mbufs. This ensures that we don't have any packet leaks in the
344 * library.
345 */
346static int
347sanity_test_with_worker_shutdown(struct rte_distributor *d,
348		struct rte_mempool *p)
349{
350	struct rte_mbuf *bufs[BURST];
351	unsigned i;
352
353	printf("=== Sanity test of worker shutdown ===\n");
354
355	clear_packet_count();
356	if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
357		printf("line %d: Error getting mbufs from pool\n", __LINE__);
358		return -1;
359	}
360
361	/* now set all hash values in all buffers to zero, so all pkts go to the
362	 * one worker thread */
363	for (i = 0; i < BURST; i++)
364		bufs[i]->hash.usr = 0;
365
366	rte_distributor_process(d, bufs, BURST);
367	/* at this point, we will have processed some packets and have a full
368	 * backlog for the other ones at worker 0.
369	 */
370
371	/* get more buffers to queue up, again setting them to the same flow */
372	if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
373		printf("line %d: Error getting mbufs from pool\n", __LINE__);
374		return -1;
375	}
376	for (i = 0; i < BURST; i++)
377		bufs[i]->hash.usr = 0;
378
379	/* get worker zero to quit */
380	zero_quit = 1;
381	rte_distributor_process(d, bufs, BURST);
382
383	/* flush the distributor */
384	rte_distributor_flush(d);
385	if (total_packet_count() != BURST * 2) {
386		printf("Line %d: Error, not all packets flushed. "
387				"Expected %u, got %u\n",
388				__LINE__, BURST * 2, total_packet_count());
389		return -1;
390	}
391
392	for (i = 0; i < rte_lcore_count() - 1; i++)
393		printf("Worker %u handled %u packets\n", i,
394				worker_stats[i].handled_packets);
395
396	printf("Sanity test with worker shutdown passed\n\n");
397	return 0;
398}
399
400/* Test that the flush function is able to move packets between workers when
401 * one worker shuts down..
402 */
403static int
404test_flush_with_worker_shutdown(struct rte_distributor *d,
405		struct rte_mempool *p)
406{
407	struct rte_mbuf *bufs[BURST];
408	unsigned i;
409
410	printf("=== Test flush fn with worker shutdown ===\n");
411
412	clear_packet_count();
413	if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
414		printf("line %d: Error getting mbufs from pool\n", __LINE__);
415		return -1;
416	}
417
418	/* now set all hash values in all buffers to zero, so all pkts go to the
419	 * one worker thread */
420	for (i = 0; i < BURST; i++)
421		bufs[i]->hash.usr = 0;
422
423	rte_distributor_process(d, bufs, BURST);
424	/* at this point, we will have processed some packets and have a full
425	 * backlog for the other ones at worker 0.
426	 */
427
428	/* get worker zero to quit */
429	zero_quit = 1;
430
431	/* flush the distributor */
432	rte_distributor_flush(d);
433
434	zero_quit = 0;
435	if (total_packet_count() != BURST) {
436		printf("Line %d: Error, not all packets flushed. "
437				"Expected %u, got %u\n",
438				__LINE__, BURST, total_packet_count());
439		return -1;
440	}
441
442	for (i = 0; i < rte_lcore_count() - 1; i++)
443		printf("Worker %u handled %u packets\n", i,
444				worker_stats[i].handled_packets);
445
446	printf("Flush test with worker shutdown passed\n\n");
447	return 0;
448}
449
450static
451int test_error_distributor_create_name(void)
452{
453	struct rte_distributor *d = NULL;
454	char *name = NULL;
455
456	d = rte_distributor_create(name, rte_socket_id(),
457			rte_lcore_count() - 1);
458	if (d != NULL || rte_errno != EINVAL) {
459		printf("ERROR: No error on create() with NULL name param\n");
460		return -1;
461	}
462
463	return 0;
464}
465
466
467static
468int test_error_distributor_create_numworkers(void)
469{
470	struct rte_distributor *d = NULL;
471	d = rte_distributor_create("test_numworkers", rte_socket_id(),
472			RTE_MAX_LCORE + 10);
473	if (d != NULL || rte_errno != EINVAL) {
474		printf("ERROR: No error on create() with num_workers > MAX\n");
475		return -1;
476	}
477	return 0;
478}
479
480
481/* Useful function which ensures that all worker functions terminate */
482static void
483quit_workers(struct rte_distributor *d, struct rte_mempool *p)
484{
485	const unsigned num_workers = rte_lcore_count() - 1;
486	unsigned i;
487	struct rte_mbuf *bufs[RTE_MAX_LCORE];
488	rte_mempool_get_bulk(p, (void *)bufs, num_workers);
489
490	zero_quit = 0;
491	quit = 1;
492	for (i = 0; i < num_workers; i++)
493		bufs[i]->hash.usr = i << 1;
494	rte_distributor_process(d, bufs, num_workers);
495
496	rte_mempool_put_bulk(p, (void *)bufs, num_workers);
497
498	rte_distributor_process(d, NULL, 0);
499	rte_distributor_flush(d);
500	rte_eal_mp_wait_lcore();
501	quit = 0;
502	worker_idx = 0;
503}
504
505static int
506test_distributor(void)
507{
508	static struct rte_distributor *d;
509	static struct rte_mempool *p;
510
511	if (rte_lcore_count() < 2) {
512		printf("ERROR: not enough cores to test distributor\n");
513		return -1;
514	}
515
516	if (d == NULL) {
517		d = rte_distributor_create("Test_distributor", rte_socket_id(),
518				rte_lcore_count() - 1);
519		if (d == NULL) {
520			printf("Error creating distributor\n");
521			return -1;
522		}
523	} else {
524		rte_distributor_flush(d);
525		rte_distributor_clear_returns(d);
526	}
527
528	const unsigned nb_bufs = (511 * rte_lcore_count()) < BIG_BATCH ?
529			(BIG_BATCH * 2) - 1 : (511 * rte_lcore_count());
530	if (p == NULL) {
531		p = rte_pktmbuf_pool_create("DT_MBUF_POOL", nb_bufs, BURST,
532			0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
533		if (p == NULL) {
534			printf("Error creating mempool\n");
535			return -1;
536		}
537	}
538
539	rte_eal_mp_remote_launch(handle_work, d, SKIP_MASTER);
540	if (sanity_test(d, p) < 0)
541		goto err;
542	quit_workers(d, p);
543
544	rte_eal_mp_remote_launch(handle_work_with_free_mbufs, d, SKIP_MASTER);
545	if (sanity_test_with_mbuf_alloc(d, p) < 0)
546		goto err;
547	quit_workers(d, p);
548
549	if (rte_lcore_count() > 2) {
550		rte_eal_mp_remote_launch(handle_work_for_shutdown_test, d,
551				SKIP_MASTER);
552		if (sanity_test_with_worker_shutdown(d, p) < 0)
553			goto err;
554		quit_workers(d, p);
555
556		rte_eal_mp_remote_launch(handle_work_for_shutdown_test, d,
557				SKIP_MASTER);
558		if (test_flush_with_worker_shutdown(d, p) < 0)
559			goto err;
560		quit_workers(d, p);
561
562	} else {
563		printf("Not enough cores to run tests for worker shutdown\n");
564	}
565
566	if (test_error_distributor_create_numworkers() == -1 ||
567			test_error_distributor_create_name() == -1) {
568		printf("rte_distributor_create parameter check tests failed");
569		return -1;
570	}
571
572	return 0;
573
574err:
575	quit_workers(d, p);
576	return -1;
577}
578
579REGISTER_TEST_COMMAND(distributor_autotest, test_distributor);
580