lthread_sched.c revision 809f0800
1/*-
2 *   BSD LICENSE
3 *
4 *   Copyright(c) 2015 Intel Corporation. All rights reserved.
5 *   All rights reserved.
6 *
7 *   Redistribution and use in source and binary forms, with or without
8 *   modification, are permitted provided that the following conditions
9 *   are met:
10 *
11 *     * Redistributions of source code must retain the above copyright
12 *       notice, this list of conditions and the following disclaimer.
13 *     * Redistributions in binary form must reproduce the above copyright
14 *       notice, this list of conditions and the following disclaimer in
15 *       the documentation and/or other materials provided with the
16 *       distribution.
17 *     * Neither the name of Intel Corporation nor the names of its
18 *       contributors may be used to endorse or promote products derived
19 *       from this software without specific prior written permission.
20 *
21 *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 */
33
34/*
35 * Some portions of this software is derived from the
36 * https://github.com/halayli/lthread which carrys the following license.
37 *
38 * Copyright (C) 2012, Hasan Alayli <halayli@gmail.com>
39 *
40 * Redistribution and use in source and binary forms, with or without
41 * modification, are permitted provided that the following conditions
42 * are met:
43 * 1. Redistributions of source code must retain the above copyright
44 *    notice, this list of conditions and the following disclaimer.
45 * 2. Redistributions in binary form must reproduce the above copyright
46 *    notice, this list of conditions and the following disclaimer in the
47 *    documentation and/or other materials provided with the distribution.
48 *
49 * THIS SOFTWARE IS PROVIDED BY AUTHOR AND CONTRIBUTORS ``AS IS'' AND
50 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
51 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
52 * ARE DISCLAIMED.  IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
53 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
54 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
55 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
56 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
57 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
58 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
59 * SUCH DAMAGE.
60 */
61
62
63#define RTE_MEM 1
64
65#include <stdio.h>
66#include <stdlib.h>
67#include <string.h>
68#include <stdint.h>
69#include <stddef.h>
70#include <limits.h>
71#include <inttypes.h>
72#include <unistd.h>
73#include <pthread.h>
74#include <fcntl.h>
75#include <sys/time.h>
76#include <sys/mman.h>
77#include <sched.h>
78
79#include <rte_prefetch.h>
80#include <rte_per_lcore.h>
81#include <rte_atomic.h>
82#include <rte_atomic_64.h>
83#include <rte_log.h>
84#include <rte_common.h>
85#include <rte_branch_prediction.h>
86
87#include "lthread_api.h"
88#include "lthread_int.h"
89#include "lthread_sched.h"
90#include "lthread_objcache.h"
91#include "lthread_timer.h"
92#include "lthread_mutex.h"
93#include "lthread_cond.h"
94#include "lthread_tls.h"
95#include "lthread_diag.h"
96
97/*
98 * This file implements the lthread scheduler
99 * The scheduler is the function lthread_run()
100 * This must be run as the main loop of an EAL thread.
101 *
102 * Currently once a scheduler is created it cannot be destroyed
103 * When a scheduler shuts down it is assumed that the application is terminating
104 */
105
106static rte_atomic16_t num_schedulers;
107static rte_atomic16_t active_schedulers;
108
109/* one scheduler per lcore */
110RTE_DEFINE_PER_LCORE(struct lthread_sched *, this_sched) = NULL;
111
112struct lthread_sched *schedcore[LTHREAD_MAX_LCORES];
113
114diag_callback diag_cb;
115
116uint64_t diag_mask;
117
118
119/* constructor */
120void lthread_sched_ctor(void) __attribute__ ((constructor));
121void lthread_sched_ctor(void)
122{
123	memset(schedcore, 0, sizeof(schedcore));
124	rte_atomic16_init(&num_schedulers);
125	rte_atomic16_set(&num_schedulers, 1);
126	rte_atomic16_init(&active_schedulers);
127	rte_atomic16_set(&active_schedulers, 0);
128	diag_cb = NULL;
129}
130
131
132enum sched_alloc_phase {
133	SCHED_ALLOC_OK,
134	SCHED_ALLOC_QNODE_POOL,
135	SCHED_ALLOC_READY_QUEUE,
136	SCHED_ALLOC_PREADY_QUEUE,
137	SCHED_ALLOC_LTHREAD_CACHE,
138	SCHED_ALLOC_STACK_CACHE,
139	SCHED_ALLOC_PERLT_CACHE,
140	SCHED_ALLOC_TLS_CACHE,
141	SCHED_ALLOC_COND_CACHE,
142	SCHED_ALLOC_MUTEX_CACHE,
143};
144
145static int
146_lthread_sched_alloc_resources(struct lthread_sched *new_sched)
147{
148	int alloc_status;
149
150	do {
151		/* Initialize per scheduler queue node pool */
152		alloc_status = SCHED_ALLOC_QNODE_POOL;
153		new_sched->qnode_pool =
154			_qnode_pool_create("qnode pool", LTHREAD_PREALLOC);
155		if (new_sched->qnode_pool == NULL)
156			break;
157
158		/* Initialize per scheduler local ready queue */
159		alloc_status = SCHED_ALLOC_READY_QUEUE;
160		new_sched->ready = _lthread_queue_create("ready queue");
161		if (new_sched->ready == NULL)
162			break;
163
164		/* Initialize per scheduler local peer ready queue */
165		alloc_status = SCHED_ALLOC_PREADY_QUEUE;
166		new_sched->pready = _lthread_queue_create("pready queue");
167		if (new_sched->pready == NULL)
168			break;
169
170		/* Initialize per scheduler local free lthread cache */
171		alloc_status = SCHED_ALLOC_LTHREAD_CACHE;
172		new_sched->lthread_cache =
173			_lthread_objcache_create("lthread cache",
174						sizeof(struct lthread),
175						LTHREAD_PREALLOC);
176		if (new_sched->lthread_cache == NULL)
177			break;
178
179		/* Initialize per scheduler local free stack cache */
180		alloc_status = SCHED_ALLOC_STACK_CACHE;
181		new_sched->stack_cache =
182			_lthread_objcache_create("stack_cache",
183						sizeof(struct lthread_stack),
184						LTHREAD_PREALLOC);
185		if (new_sched->stack_cache == NULL)
186			break;
187
188		/* Initialize per scheduler local free per lthread data cache */
189		alloc_status = SCHED_ALLOC_PERLT_CACHE;
190		new_sched->per_lthread_cache =
191			_lthread_objcache_create("per_lt cache",
192						RTE_PER_LTHREAD_SECTION_SIZE,
193						LTHREAD_PREALLOC);
194		if (new_sched->per_lthread_cache == NULL)
195			break;
196
197		/* Initialize per scheduler local free tls cache */
198		alloc_status = SCHED_ALLOC_TLS_CACHE;
199		new_sched->tls_cache =
200			_lthread_objcache_create("TLS cache",
201						sizeof(struct lthread_tls),
202						LTHREAD_PREALLOC);
203		if (new_sched->tls_cache == NULL)
204			break;
205
206		/* Initialize per scheduler local free cond var cache */
207		alloc_status = SCHED_ALLOC_COND_CACHE;
208		new_sched->cond_cache =
209			_lthread_objcache_create("cond cache",
210						sizeof(struct lthread_cond),
211						LTHREAD_PREALLOC);
212		if (new_sched->cond_cache == NULL)
213			break;
214
215		/* Initialize per scheduler local free mutex cache */
216		alloc_status = SCHED_ALLOC_MUTEX_CACHE;
217		new_sched->mutex_cache =
218			_lthread_objcache_create("mutex cache",
219						sizeof(struct lthread_mutex),
220						LTHREAD_PREALLOC);
221		if (new_sched->mutex_cache == NULL)
222			break;
223
224		alloc_status = SCHED_ALLOC_OK;
225	} while (0);
226
227	/* roll back on any failure */
228	switch (alloc_status) {
229	case SCHED_ALLOC_MUTEX_CACHE:
230		_lthread_objcache_destroy(new_sched->cond_cache);
231		/* fall through */
232	case SCHED_ALLOC_COND_CACHE:
233		_lthread_objcache_destroy(new_sched->tls_cache);
234		/* fall through */
235	case SCHED_ALLOC_TLS_CACHE:
236		_lthread_objcache_destroy(new_sched->per_lthread_cache);
237		/* fall through */
238	case SCHED_ALLOC_PERLT_CACHE:
239		_lthread_objcache_destroy(new_sched->stack_cache);
240		/* fall through */
241	case SCHED_ALLOC_STACK_CACHE:
242		_lthread_objcache_destroy(new_sched->lthread_cache);
243		/* fall through */
244	case SCHED_ALLOC_LTHREAD_CACHE:
245		_lthread_queue_destroy(new_sched->pready);
246		/* fall through */
247	case SCHED_ALLOC_PREADY_QUEUE:
248		_lthread_queue_destroy(new_sched->ready);
249		/* fall through */
250	case SCHED_ALLOC_READY_QUEUE:
251		_qnode_pool_destroy(new_sched->qnode_pool);
252		/* fall through */
253	case SCHED_ALLOC_QNODE_POOL:
254		/* fall through */
255	case SCHED_ALLOC_OK:
256		break;
257	}
258	return alloc_status;
259}
260
261
262/*
263 * Create a scheduler on the current lcore
264 */
265struct lthread_sched *_lthread_sched_create(size_t stack_size)
266{
267	int status;
268	struct lthread_sched *new_sched;
269	unsigned lcoreid = rte_lcore_id();
270
271	RTE_ASSERT(stack_size <= LTHREAD_MAX_STACK_SIZE);
272
273	if (stack_size == 0)
274		stack_size = LTHREAD_MAX_STACK_SIZE;
275
276	new_sched =
277	     rte_calloc_socket(NULL, 1, sizeof(struct lthread_sched),
278				RTE_CACHE_LINE_SIZE,
279				rte_socket_id());
280	if (new_sched == NULL) {
281		RTE_LOG(CRIT, LTHREAD,
282			"Failed to allocate memory for scheduler\n");
283		return NULL;
284	}
285
286	_lthread_key_pool_init();
287
288	new_sched->stack_size = stack_size;
289	new_sched->birth = rte_rdtsc();
290	THIS_SCHED = new_sched;
291
292	status = _lthread_sched_alloc_resources(new_sched);
293	if (status != SCHED_ALLOC_OK) {
294		RTE_LOG(CRIT, LTHREAD,
295			"Failed to allocate resources for scheduler code = %d\n",
296			status);
297		rte_free(new_sched);
298		return NULL;
299	}
300
301	bzero(&new_sched->ctx, sizeof(struct ctx));
302
303	new_sched->lcore_id = lcoreid;
304
305	schedcore[lcoreid] = new_sched;
306
307	new_sched->run_flag = 1;
308
309	DIAG_EVENT(new_sched, LT_DIAG_SCHED_CREATE, rte_lcore_id(), 0);
310
311	rte_wmb();
312	return new_sched;
313}
314
315/*
316 * Set the number of schedulers in the system
317 */
318int lthread_num_schedulers_set(int num)
319{
320	rte_atomic16_set(&num_schedulers, num);
321	return (int)rte_atomic16_read(&num_schedulers);
322}
323
324/*
325 * Return the number of schedulers active
326 */
327int lthread_active_schedulers(void)
328{
329	return (int)rte_atomic16_read(&active_schedulers);
330}
331
332
333/**
334 * shutdown the scheduler running on the specified lcore
335 */
336void lthread_scheduler_shutdown(unsigned lcoreid)
337{
338	uint64_t coreid = (uint64_t) lcoreid;
339
340	if (coreid < LTHREAD_MAX_LCORES) {
341		if (schedcore[coreid] != NULL)
342			schedcore[coreid]->run_flag = 0;
343	}
344}
345
346/**
347 * shutdown all schedulers
348 */
349void lthread_scheduler_shutdown_all(void)
350{
351	uint64_t i;
352
353	/*
354	 * give time for all schedulers to have started
355	 * Note we use sched_yield() rather than pthread_yield() to allow
356	 * for the possibility of a pthread wrapper on lthread_yield(),
357	 * something that is not possible unless the scheduler is running.
358	 */
359	while (rte_atomic16_read(&active_schedulers) <
360	       rte_atomic16_read(&num_schedulers))
361		sched_yield();
362
363	for (i = 0; i < LTHREAD_MAX_LCORES; i++) {
364		if (schedcore[i] != NULL)
365			schedcore[i]->run_flag = 0;
366	}
367}
368
369/*
370 * Resume a suspended lthread
371 */
372static inline void
373_lthread_resume(struct lthread *lt) __attribute__ ((always_inline));
374static inline void _lthread_resume(struct lthread *lt)
375{
376	struct lthread_sched *sched = THIS_SCHED;
377	struct lthread_stack *s;
378	uint64_t state = lt->state;
379#if LTHREAD_DIAG
380	int init = 0;
381#endif
382
383	sched->current_lthread = lt;
384
385	if (state & (BIT(ST_LT_CANCELLED) | BIT(ST_LT_EXITED))) {
386		/* if detached we can free the thread now */
387		if (state & BIT(ST_LT_DETACH)) {
388			_lthread_free(lt);
389			sched->current_lthread = NULL;
390			return;
391		}
392	}
393
394	if (state & BIT(ST_LT_INIT)) {
395		/* first time this thread has been run */
396		/* assign thread to this scheduler */
397		lt->sched = THIS_SCHED;
398
399		/* allocate stack */
400		s = _stack_alloc();
401
402		lt->stack_container = s;
403		_lthread_set_stack(lt, s->stack, s->stack_size);
404
405		/* allocate memory for TLS used by this thread */
406		_lthread_tls_alloc(lt);
407
408		lt->state = BIT(ST_LT_READY);
409#if LTHREAD_DIAG
410		init = 1;
411#endif
412	}
413
414	DIAG_EVENT(lt, LT_DIAG_LTHREAD_RESUMED, init, lt);
415
416	/* switch to the new thread */
417	ctx_switch(&lt->ctx, &sched->ctx);
418
419	/* If posting to a queue that could be read by another lcore
420	 * we defer the queue write till now to ensure the context has been
421	 * saved before the other core tries to resume it
422	 * This applies to blocking on mutex, cond, and to set_affinity
423	 */
424	if (lt->pending_wr_queue != NULL) {
425		struct lthread_queue *dest = lt->pending_wr_queue;
426
427		lt->pending_wr_queue = NULL;
428
429		/* queue the current thread to the specified queue */
430		_lthread_queue_insert_mp(dest, lt);
431	}
432
433	sched->current_lthread = NULL;
434}
435
436/*
437 * Handle sleep timer expiry
438*/
439void
440_sched_timer_cb(struct rte_timer *tim, void *arg)
441{
442	struct lthread *lt = (struct lthread *) arg;
443	uint64_t state = lt->state;
444
445	DIAG_EVENT(lt, LT_DIAG_LTHREAD_TMR_EXPIRED, &lt->tim, 0);
446
447	rte_timer_stop(tim);
448
449	if (lt->state & BIT(ST_LT_CANCELLED))
450		(THIS_SCHED)->nb_blocked_threads--;
451
452	lt->state = state | BIT(ST_LT_EXPIRED);
453	_lthread_resume(lt);
454	lt->state = state & CLEARBIT(ST_LT_EXPIRED);
455}
456
457
458
459/*
460 * Returns 0 if there is a pending job in scheduler or 1 if done and can exit.
461 */
462static inline int _lthread_sched_isdone(struct lthread_sched *sched)
463{
464	return (sched->run_flag == 0) &&
465			(_lthread_queue_empty(sched->ready)) &&
466			(_lthread_queue_empty(sched->pready)) &&
467			(sched->nb_blocked_threads == 0);
468}
469
470/*
471 * Wait for all schedulers to start
472 */
473static inline void _lthread_schedulers_sync_start(void)
474{
475	rte_atomic16_inc(&active_schedulers);
476
477	/* wait for lthread schedulers
478	 * Note we use sched_yield() rather than pthread_yield() to allow
479	 * for the possibility of a pthread wrapper on lthread_yield(),
480	 * something that is not possible unless the scheduler is running.
481	 */
482	while (rte_atomic16_read(&active_schedulers) <
483	       rte_atomic16_read(&num_schedulers))
484		sched_yield();
485
486}
487
488/*
489 * Wait for all schedulers to stop
490 */
491static inline void _lthread_schedulers_sync_stop(void)
492{
493	rte_atomic16_dec(&active_schedulers);
494	rte_atomic16_dec(&num_schedulers);
495
496	/* wait for schedulers
497	 * Note we use sched_yield() rather than pthread_yield() to allow
498	 * for the possibility of a pthread wrapper on lthread_yield(),
499	 * something that is not possible unless the scheduler is running.
500	 */
501	while (rte_atomic16_read(&active_schedulers) > 0)
502		sched_yield();
503
504}
505
506
507/*
508 * Run the lthread scheduler
509 * This loop is the heart of the system
510 */
511void lthread_run(void)
512{
513
514	struct lthread_sched *sched = THIS_SCHED;
515	struct lthread *lt = NULL;
516
517	RTE_LOG(INFO, LTHREAD,
518		"starting scheduler %p on lcore %u phys core %u\n",
519		sched, rte_lcore_id(),
520		rte_lcore_index(rte_lcore_id()));
521
522	/* if more than one, wait for all schedulers to start */
523	_lthread_schedulers_sync_start();
524
525
526	/*
527	 * This is the main scheduling loop
528	 * So long as there are tasks in existence we run this loop.
529	 * We check for:-
530	 *   expired timers,
531	 *   the local ready queue,
532	 *   and the peer ready queue,
533	 *
534	 * and resume lthreads ad infinitum.
535	 */
536	while (!_lthread_sched_isdone(sched)) {
537
538		rte_timer_manage();
539
540		lt = _lthread_queue_poll(sched->ready);
541		if (lt != NULL)
542			_lthread_resume(lt);
543		lt = _lthread_queue_poll(sched->pready);
544		if (lt != NULL)
545			_lthread_resume(lt);
546	}
547
548
549	/* if more than one wait for all schedulers to stop */
550	_lthread_schedulers_sync_stop();
551
552	(THIS_SCHED) = NULL;
553
554	RTE_LOG(INFO, LTHREAD,
555		"stopping scheduler %p on lcore %u phys core %u\n",
556		sched, rte_lcore_id(),
557		rte_lcore_index(rte_lcore_id()));
558	fflush(stdout);
559}
560
561/*
562 * Return the scheduler for this lcore
563 *
564 */
565struct lthread_sched *_lthread_sched_get(int lcore_id)
566{
567	if (lcore_id > LTHREAD_MAX_LCORES)
568		return NULL;
569	return schedcore[lcore_id];
570}
571
572/*
573 * migrate the current thread to another scheduler running
574 * on the specified lcore.
575 */
576int lthread_set_affinity(unsigned lcoreid)
577{
578	struct lthread *lt = THIS_LTHREAD;
579	struct lthread_sched *dest_sched;
580
581	if (unlikely(lcoreid > LTHREAD_MAX_LCORES))
582		return POSIX_ERRNO(EINVAL);
583
584
585	DIAG_EVENT(lt, LT_DIAG_LTHREAD_AFFINITY, lcoreid, 0);
586
587	dest_sched = schedcore[lcoreid];
588
589	if (unlikely(dest_sched == NULL))
590		return POSIX_ERRNO(EINVAL);
591
592	if (likely(dest_sched != THIS_SCHED)) {
593		lt->sched = dest_sched;
594		lt->pending_wr_queue = dest_sched->pready;
595		_affinitize();
596		return 0;
597	}
598	return 0;
599}
600