2 /* This work is part of OpenLDAP Software <http://www.openldap.org/>.
4 * Copyright 1998-2016 The OpenLDAP Foundation.
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted only as authorized by the OpenLDAP
11 * A copy of this license is available in file LICENSE in the
12 * top-level directory of the distribution or, alternatively, at
13 * <http://www.OpenLDAP.org/license.html>.
20 #include <ac/signal.h>
21 #include <ac/stdarg.h>
22 #include <ac/stdlib.h>
23 #include <ac/string.h>
28 #include "ldap_pvt_thread.h" /* Get the thread interface */
29 #include "ldap_queue.h"
30 #define LDAP_THREAD_POOL_IMPLEMENTATION
31 #include "ldap_thr_debug.h" /* May rename symbols defined below */
33 #ifndef LDAP_THREAD_HAVE_TPOOL
39 /* Thread-specific key with data and optional free function */
40 typedef struct ldap_int_tpool_key_s {
43 ldap_pvt_thread_pool_keyfree_t *ltk_free;
44 } ldap_int_tpool_key_t;
46 /* Max number of thread-specific keys we store per thread.
47 * We don't expect to use many...
51 /* Max number of threads */
52 #define LDAP_MAXTHR 1024 /* must be a power of 2 */
54 /* (Theoretical) max number of pending requests */
55 #define MAX_PENDING (INT_MAX/2) /* INT_MAX - (room to avoid overflow) */
57 /* pool->ltp_pause values */
58 enum { NOT_PAUSED = 0, WANT_PAUSE = 1, PAUSED = 2 };
60 /* Context: thread ID and thread-specific key/data pairs */
61 typedef struct ldap_int_thread_userctx_s {
62 struct ldap_int_thread_poolq_s *ltu_pq;
63 ldap_pvt_thread_t ltu_id;
64 ldap_int_tpool_key_t ltu_key[MAXKEYS];
65 } ldap_int_thread_userctx_t;
68 /* Simple {thread ID -> context} hash table; key=ctx->ltu_id.
69 * Protected by ldap_pvt_thread_pool_mutex.
72 ldap_int_thread_userctx_t *ctx;
73 /* ctx is valid when not NULL or DELETED_THREAD_CTX */
74 # define DELETED_THREAD_CTX (&ldap_int_main_thrctx + 1) /* dummy addr */
75 } thread_keys[LDAP_MAXTHR];
77 #define TID_HASH(tid, hash) do { \
78 unsigned const char *ptr_ = (unsigned const char *)&(tid); \
80 for (i_ = 0, (hash) = ptr_[0]; ++i_ < sizeof(tid);) \
81 (hash) += ((hash) << 5) ^ ptr_[i_]; \
85 /* Task for a thread to perform */
86 typedef struct ldap_int_thread_task_s {
88 LDAP_STAILQ_ENTRY(ldap_int_thread_task_s) q;
89 LDAP_SLIST_ENTRY(ldap_int_thread_task_s) l;
91 ldap_pvt_thread_start_t *ltt_start_routine;
93 } ldap_int_thread_task_t;
95 typedef LDAP_STAILQ_HEAD(tcq, ldap_int_thread_task_s) ldap_int_tpool_plist_t;
97 struct ldap_int_thread_poolq_s {
100 struct ldap_int_thread_pool_s *ltp_pool;
102 /* protect members below */
103 ldap_pvt_thread_mutex_t ltp_mutex;
105 /* not paused and something to do for pool_<wrapper/pause/destroy>()
106 * Used for normal pool operation, to synch between submitter and
107 * worker threads. Not used for pauses. In normal operation multiple
108 * queues can rendezvous without acquiring the main pool lock.
110 ldap_pvt_thread_cond_t ltp_cond;
112 /* ltp_pause == 0 ? <p_pending_list : &empty_pending_list,
113 * maintaned to reduce work for pool_wrapper()
115 ldap_int_tpool_plist_t *ltp_work_list;
117 /* pending tasks, and unused task objects */
118 ldap_int_tpool_plist_t ltp_pending_list;
119 LDAP_SLIST_HEAD(tcl, ldap_int_thread_task_s) ltp_free_list;
121 /* Max number of threads in this queue */
124 /* Max pending + paused + idle tasks, negated when ltp_finishing */
127 int ltp_pending_count; /* Pending + paused + idle tasks */
128 int ltp_active_count; /* Active, not paused/idle tasks */
129 int ltp_open_count; /* Number of threads */
130 int ltp_starting; /* Currently starting threads */
133 struct ldap_int_thread_pool_s {
134 LDAP_STAILQ_ENTRY(ldap_int_thread_pool_s) ltp_next;
136 struct ldap_int_thread_poolq_s **ltp_wqs;
138 /* number of poolqs */
141 /* protect members below */
142 ldap_pvt_thread_mutex_t ltp_mutex;
144 /* paused and waiting for resume
145 * When a pause is in effect all workers switch to waiting on
146 * this cond instead of their per-queue cond.
148 ldap_pvt_thread_cond_t ltp_cond;
150 /* ltp_active_queues < 1 && ltp_pause */
151 ldap_pvt_thread_cond_t ltp_pcond;
153 /* number of active queues */
154 int ltp_active_queues;
156 /* The pool is finishing, waiting for its threads to close.
157 * They close when ltp_pending_list is done. pool_submit()
158 * rejects new tasks. ltp_max_pending = -(its old value).
162 /* Some active task needs to be the sole active task.
163 * Atomic variable so ldap_pvt_thread_pool_pausing() can read it.
165 volatile sig_atomic_t ltp_pause;
167 /* Max number of threads in pool */
170 /* Configured max number of threads in pool, 0 for default (LDAP_MAXTHR) */
171 int ltp_conf_max_count;
173 /* Max pending + paused + idle tasks, negated when ltp_finishing */
177 static ldap_int_tpool_plist_t empty_pending_list =
178 LDAP_STAILQ_HEAD_INITIALIZER(empty_pending_list);
180 static int ldap_int_has_thread_pool = 0;
181 static LDAP_STAILQ_HEAD(tpq, ldap_int_thread_pool_s)
182 ldap_int_thread_pool_list =
183 LDAP_STAILQ_HEAD_INITIALIZER(ldap_int_thread_pool_list);
185 static ldap_pvt_thread_mutex_t ldap_pvt_thread_pool_mutex;
187 static void *ldap_int_thread_pool_wrapper( void *pool );
189 static ldap_pvt_thread_key_t ldap_tpool_key;
191 /* Context of the main thread */
192 static ldap_int_thread_userctx_t ldap_int_main_thrctx;
195 ldap_int_thread_pool_startup ( void )
197 ldap_int_main_thrctx.ltu_id = ldap_pvt_thread_self();
198 ldap_pvt_thread_key_create( &ldap_tpool_key );
199 return ldap_pvt_thread_mutex_init(&ldap_pvt_thread_pool_mutex);
203 ldap_int_thread_pool_shutdown ( void )
205 struct ldap_int_thread_pool_s *pool;
207 while ((pool = LDAP_STAILQ_FIRST(&ldap_int_thread_pool_list)) != NULL) {
208 (ldap_pvt_thread_pool_destroy)(&pool, 0); /* ignore thr_debug macro */
210 ldap_pvt_thread_mutex_destroy(&ldap_pvt_thread_pool_mutex);
211 ldap_pvt_thread_key_destroy( ldap_tpool_key );
216 /* Create a thread pool */
218 ldap_pvt_thread_pool_init_q (
219 ldap_pvt_thread_pool_t *tpool,
224 ldap_pvt_thread_pool_t pool;
225 struct ldap_int_thread_poolq_s *pq;
226 int i, rc, rem_thr, rem_pend;
228 /* multiple pools are currently not supported (ITS#4943) */
229 assert(!ldap_int_has_thread_pool);
231 if (! (0 <= max_threads && max_threads <= LDAP_MAXTHR))
233 if (! (1 <= max_pending && max_pending <= MAX_PENDING))
234 max_pending = MAX_PENDING;
237 pool = (ldap_pvt_thread_pool_t) LDAP_CALLOC(1,
238 sizeof(struct ldap_int_thread_pool_s));
240 if (pool == NULL) return(-1);
242 pool->ltp_wqs = LDAP_MALLOC(numqs * sizeof(struct ldap_int_thread_poolq_s *));
243 if (pool->ltp_wqs == NULL) {
248 for (i=0; i<numqs; i++) {
249 char *ptr = LDAP_CALLOC(1, sizeof(struct ldap_int_thread_poolq_s) + CACHELINE-1);
252 LDAP_FREE(pool->ltp_wqs[i]->ltp_free);
253 LDAP_FREE(pool->ltp_wqs);
257 pool->ltp_wqs[i] = (struct ldap_int_thread_poolq_s *)(((size_t)ptr + CACHELINE-1) & ~(CACHELINE-1));
258 pool->ltp_wqs[i]->ltp_free = ptr;
261 pool->ltp_numqs = numqs;
262 pool->ltp_conf_max_count = max_threads;
264 max_threads = LDAP_MAXTHR;
266 rc = ldap_pvt_thread_mutex_init(&pool->ltp_mutex);
269 for (i=0; i<numqs; i++)
270 LDAP_FREE(pool->ltp_wqs[i]->ltp_free);
271 LDAP_FREE(pool->ltp_wqs);
276 rc = ldap_pvt_thread_cond_init(&pool->ltp_cond);
280 rc = ldap_pvt_thread_cond_init(&pool->ltp_pcond);
284 rem_thr = max_threads % numqs;
285 rem_pend = max_pending % numqs;
286 for ( i=0; i<numqs; i++ ) {
287 pq = pool->ltp_wqs[i];
289 rc = ldap_pvt_thread_mutex_init(&pq->ltp_mutex);
292 rc = ldap_pvt_thread_cond_init(&pq->ltp_cond);
295 LDAP_STAILQ_INIT(&pq->ltp_pending_list);
296 pq->ltp_work_list = &pq->ltp_pending_list;
297 LDAP_SLIST_INIT(&pq->ltp_free_list);
299 pq->ltp_max_count = max_threads / numqs;
304 pq->ltp_max_pending = max_pending / numqs;
306 pq->ltp_max_pending++;
311 ldap_int_has_thread_pool = 1;
313 pool->ltp_max_count = max_threads;
314 pool->ltp_max_pending = max_pending;
316 ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
317 LDAP_STAILQ_INSERT_TAIL(&ldap_int_thread_pool_list, pool, ltp_next);
318 ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
320 /* Start no threads just yet. That can break if the process forks
321 * later, as slapd does in order to daemonize. On at least POSIX,
322 * only the forking thread would survive in the child. Yet fork()
323 * can't unlock/clean up other threads' locks and data structures,
324 * unless pthread_atfork() handlers have been set up to do so.
332 ldap_pvt_thread_pool_init (
333 ldap_pvt_thread_pool_t *tpool,
337 return ldap_pvt_thread_pool_init_q( tpool, max_threads, max_pending, 1 );
342 struct ldap_int_thread_pool_s *pool,
346 unsigned char *ptr = (unsigned char *)&arg;
347 /* dumb hash of arg to choose a queue */
348 for (j=0; j<sizeof(arg); j++)
350 i %= pool->ltp_numqs;
354 /* Submit a task to be performed by the thread pool */
356 ldap_pvt_thread_pool_submit (
357 ldap_pvt_thread_pool_t *tpool,
358 ldap_pvt_thread_start_t *start_routine, void *arg )
360 struct ldap_int_thread_pool_s *pool;
361 struct ldap_int_thread_poolq_s *pq;
362 ldap_int_thread_task_t *task;
363 ldap_pvt_thread_t thr;
374 if ( pool->ltp_numqs > 1 )
375 i = ldap_int_poolq_hash( pool, arg );
381 ldap_pvt_thread_mutex_lock(&pool->ltp_wqs[i]->ltp_mutex);
382 if (pool->ltp_wqs[i]->ltp_pending_count < pool->ltp_wqs[i]->ltp_max_pending) {
385 ldap_pvt_thread_mutex_unlock(&pool->ltp_wqs[i]->ltp_mutex);
387 i %= pool->ltp_numqs;
392 pq = pool->ltp_wqs[i];
393 task = LDAP_SLIST_FIRST(&pq->ltp_free_list);
395 LDAP_SLIST_REMOVE_HEAD(&pq->ltp_free_list, ltt_next.l);
397 task = (ldap_int_thread_task_t *) LDAP_MALLOC(sizeof(*task));
402 task->ltt_start_routine = start_routine;
405 pq->ltp_pending_count++;
406 LDAP_STAILQ_INSERT_TAIL(&pq->ltp_pending_list, task, ltt_next.q);
411 /* should we open (create) a thread? */
412 if (pq->ltp_open_count < pq->ltp_active_count+pq->ltp_pending_count &&
413 pq->ltp_open_count < pq->ltp_max_count)
416 pq->ltp_open_count++;
418 if (0 != ldap_pvt_thread_create(
419 &thr, 1, ldap_int_thread_pool_wrapper, pq))
421 /* couldn't create thread. back out of
422 * ltp_open_count and check for even worse things.
425 pq->ltp_open_count--;
427 if (pq->ltp_open_count == 0) {
428 /* no open threads at all?!?
430 ldap_int_thread_task_t *ptr;
432 /* let pool_destroy know there are no more threads */
433 ldap_pvt_thread_cond_signal(&pq->ltp_cond);
435 LDAP_STAILQ_FOREACH(ptr, &pq->ltp_pending_list, ltt_next.q)
436 if (ptr == task) break;
438 /* no open threads, task not handled, so
439 * back out of ltp_pending_count, free the task,
442 pq->ltp_pending_count--;
443 LDAP_STAILQ_REMOVE(&pq->ltp_pending_list, task,
444 ldap_int_thread_task_s, ltt_next.q);
445 LDAP_SLIST_INSERT_HEAD(&pq->ltp_free_list, task,
450 /* there is another open thread, so this
451 * task will be handled eventually.
455 ldap_pvt_thread_cond_signal(&pq->ltp_cond);
458 ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
462 ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
467 no_task( void *ctx, void *arg )
472 /* Cancel a pending task that was previously submitted.
473 * Return 1 if the task was successfully cancelled, 0 if
474 * not found, -1 for invalid parameters
477 ldap_pvt_thread_pool_retract (
478 ldap_pvt_thread_pool_t *tpool,
479 ldap_pvt_thread_start_t *start_routine, void *arg )
481 struct ldap_int_thread_pool_s *pool;
482 struct ldap_int_thread_poolq_s *pq;
483 ldap_int_thread_task_t *task;
494 i = ldap_int_poolq_hash( pool, arg );
495 pq = pool->ltp_wqs[i];
497 ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
498 LDAP_STAILQ_FOREACH(task, &pq->ltp_pending_list, ltt_next.q)
499 if (task->ltt_start_routine == start_routine &&
500 task->ltt_arg == arg) {
501 /* Could LDAP_STAILQ_REMOVE the task, but that
502 * walks ltp_pending_list again to find it.
504 task->ltt_start_routine = no_task;
505 task->ltt_arg = NULL;
508 ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
512 /* Set number of work queues in this pool. Should not be
513 * more than the number of CPUs. */
515 ldap_pvt_thread_pool_queues(
516 ldap_pvt_thread_pool_t *tpool,
519 struct ldap_int_thread_pool_s *pool;
520 struct ldap_int_thread_poolq_s *pq;
521 int i, rc, rem_thr, rem_pend;
523 if (numqs < 1 || tpool == NULL)
531 if (numqs < pool->ltp_numqs) {
532 for (i=numqs; i<pool->ltp_numqs; i++)
533 pool->ltp_wqs[i]->ltp_max_count = 0;
534 } else if (numqs > pool->ltp_numqs) {
535 struct ldap_int_thread_poolq_s **wqs;
536 wqs = LDAP_REALLOC(pool->ltp_wqs, numqs * sizeof(struct ldap_int_thread_poolq_s *));
540 for (i=pool->ltp_numqs; i<numqs; i++) {
541 char *ptr = LDAP_CALLOC(1, sizeof(struct ldap_int_thread_poolq_s) + CACHELINE-1);
544 pool->ltp_wqs[i] = NULL;
547 pq = (struct ldap_int_thread_poolq_s *)(((size_t)ptr + CACHELINE-1) & ~(CACHELINE-1));
549 pool->ltp_wqs[i] = pq;
551 rc = ldap_pvt_thread_mutex_init(&pq->ltp_mutex);
554 rc = ldap_pvt_thread_cond_init(&pq->ltp_cond);
557 LDAP_STAILQ_INIT(&pq->ltp_pending_list);
558 pq->ltp_work_list = &pq->ltp_pending_list;
559 LDAP_SLIST_INIT(&pq->ltp_free_list);
562 rem_thr = pool->ltp_max_count % numqs;
563 rem_pend = pool->ltp_max_pending % numqs;
564 for ( i=0; i<numqs; i++ ) {
565 pq = pool->ltp_wqs[i];
566 pq->ltp_max_count = pool->ltp_max_count / numqs;
571 pq->ltp_max_pending = pool->ltp_max_pending / numqs;
573 pq->ltp_max_pending++;
577 pool->ltp_numqs = numqs;
581 /* Set max #threads. value <= 0 means max supported #threads (LDAP_MAXTHR) */
583 ldap_pvt_thread_pool_maxthreads(
584 ldap_pvt_thread_pool_t *tpool,
587 struct ldap_int_thread_pool_s *pool;
588 struct ldap_int_thread_poolq_s *pq;
591 if (! (0 <= max_threads && max_threads <= LDAP_MAXTHR))
602 pool->ltp_conf_max_count = max_threads;
604 max_threads = LDAP_MAXTHR;
605 pool->ltp_max_count = max_threads;
607 remthr = max_threads % pool->ltp_numqs;
608 max_threads /= pool->ltp_numqs;
610 for (i=0; i<pool->ltp_numqs; i++) {
611 pq = pool->ltp_wqs[i];
612 pq->ltp_max_count = max_threads;
621 /* Inspect the pool */
623 ldap_pvt_thread_pool_query(
624 ldap_pvt_thread_pool_t *tpool,
625 ldap_pvt_thread_pool_param_t param,
628 struct ldap_int_thread_pool_s *pool;
631 if ( tpool == NULL || value == NULL ) {
637 if ( pool == NULL ) {
642 case LDAP_PVT_THREAD_POOL_PARAM_MAX:
643 count = pool->ltp_conf_max_count;
646 case LDAP_PVT_THREAD_POOL_PARAM_MAX_PENDING:
647 count = pool->ltp_max_pending;
650 if (count == MAX_PENDING)
654 case LDAP_PVT_THREAD_POOL_PARAM_PAUSING:
655 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
656 count = (pool->ltp_pause != 0);
657 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
660 case LDAP_PVT_THREAD_POOL_PARAM_OPEN:
661 case LDAP_PVT_THREAD_POOL_PARAM_STARTING:
662 case LDAP_PVT_THREAD_POOL_PARAM_ACTIVE:
663 case LDAP_PVT_THREAD_POOL_PARAM_PENDING:
664 case LDAP_PVT_THREAD_POOL_PARAM_BACKLOAD:
668 for (i=0; i<pool->ltp_numqs; i++) {
669 struct ldap_int_thread_poolq_s *pq = pool->ltp_wqs[i];
670 ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
672 case LDAP_PVT_THREAD_POOL_PARAM_OPEN:
673 count += pq->ltp_open_count;
675 case LDAP_PVT_THREAD_POOL_PARAM_STARTING:
676 count += pq->ltp_starting;
678 case LDAP_PVT_THREAD_POOL_PARAM_ACTIVE:
679 count += pq->ltp_active_count;
681 case LDAP_PVT_THREAD_POOL_PARAM_PENDING:
682 count += pq->ltp_pending_count;
684 case LDAP_PVT_THREAD_POOL_PARAM_BACKLOAD:
685 count += pq->ltp_pending_count + pq->ltp_active_count;
688 ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
695 case LDAP_PVT_THREAD_POOL_PARAM_ACTIVE_MAX:
698 case LDAP_PVT_THREAD_POOL_PARAM_PENDING_MAX:
701 case LDAP_PVT_THREAD_POOL_PARAM_BACKLOAD_MAX:
704 case LDAP_PVT_THREAD_POOL_PARAM_STATE:
706 *((char **)value) = "pausing";
707 else if (!pool->ltp_finishing)
708 *((char **)value) = "running";
711 for (i=0; i<pool->ltp_numqs; i++)
712 if (pool->ltp_wqs[i]->ltp_pending_count) break;
713 if (i<pool->ltp_numqs)
714 *((char **)value) = "finishing";
716 *((char **)value) = "stopping";
720 case LDAP_PVT_THREAD_POOL_PARAM_UNKNOWN:
725 *((int *)value) = count;
728 return ( count == -1 ? -1 : 0 );
732 * true if pool is pausing; does not lock any mutex to check.
733 * 0 if not pause, 1 if pause, -1 if error or no pool.
736 ldap_pvt_thread_pool_pausing( ldap_pvt_thread_pool_t *tpool )
739 struct ldap_int_thread_pool_s *pool;
741 if ( tpool != NULL && (pool = *tpool) != NULL ) {
742 rc = (pool->ltp_pause != 0);
749 * wrapper for ldap_pvt_thread_pool_query(), left around
750 * for backwards compatibility
753 ldap_pvt_thread_pool_backload ( ldap_pvt_thread_pool_t *tpool )
757 rc = ldap_pvt_thread_pool_query( tpool,
758 LDAP_PVT_THREAD_POOL_PARAM_BACKLOAD, (void *)&count );
767 /* Destroy the pool after making its threads finish */
769 ldap_pvt_thread_pool_destroy ( ldap_pvt_thread_pool_t *tpool, int run_pending )
771 struct ldap_int_thread_pool_s *pool, *pptr;
772 struct ldap_int_thread_poolq_s *pq;
773 ldap_int_thread_task_t *task;
781 if (pool == NULL) return(-1);
783 ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
784 LDAP_STAILQ_FOREACH(pptr, &ldap_int_thread_pool_list, ltp_next)
785 if (pptr == pool) break;
787 LDAP_STAILQ_REMOVE(&ldap_int_thread_pool_list, pool,
788 ldap_int_thread_pool_s, ltp_next);
789 ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
791 if (pool != pptr) return(-1);
793 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
795 pool->ltp_finishing = 1;
796 if (pool->ltp_max_pending > 0)
797 pool->ltp_max_pending = -pool->ltp_max_pending;
799 ldap_pvt_thread_cond_broadcast(&pool->ltp_cond);
800 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
802 for (i=0; i<pool->ltp_numqs; i++) {
803 pq = pool->ltp_wqs[i];
804 ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
805 if (pq->ltp_max_pending > 0)
806 pq->ltp_max_pending = -pq->ltp_max_pending;
808 while ((task = LDAP_STAILQ_FIRST(&pq->ltp_pending_list)) != NULL) {
809 LDAP_STAILQ_REMOVE_HEAD(&pq->ltp_pending_list, ltt_next.q);
812 pq->ltp_pending_count = 0;
815 while (pq->ltp_open_count) {
816 ldap_pvt_thread_cond_broadcast(&pq->ltp_cond);
817 ldap_pvt_thread_cond_wait(&pq->ltp_cond, &pq->ltp_mutex);
820 while ((task = LDAP_SLIST_FIRST(&pq->ltp_free_list)) != NULL)
822 LDAP_SLIST_REMOVE_HEAD(&pq->ltp_free_list, ltt_next.l);
825 ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
826 ldap_pvt_thread_cond_destroy(&pq->ltp_cond);
827 ldap_pvt_thread_mutex_destroy(&pq->ltp_mutex);
830 ldap_pvt_thread_cond_destroy(&pool->ltp_pcond);
831 ldap_pvt_thread_cond_destroy(&pool->ltp_cond);
832 ldap_pvt_thread_mutex_destroy(&pool->ltp_mutex);
833 for (i=0; i<pool->ltp_numqs; i++) {
834 pq = pool->ltp_wqs[i];
836 LDAP_FREE(pq->ltp_free);
839 LDAP_FREE(pool->ltp_wqs);
842 ldap_int_has_thread_pool = 0;
846 /* Thread loop. Accept and handle submitted tasks. */
848 ldap_int_thread_pool_wrapper (
851 struct ldap_int_thread_poolq_s *pq = xpool;
852 struct ldap_int_thread_pool_s *pool = pq->ltp_pool;
853 ldap_int_thread_task_t *task;
854 ldap_int_tpool_plist_t *work_list;
855 ldap_int_thread_userctx_t ctx, *kctx;
856 unsigned i, keyslot, hash;
857 int pool_lock = 0, freeme = 0;
859 assert(pool != NULL);
861 for ( i=0; i<MAXKEYS; i++ ) {
862 ctx.ltu_key[i].ltk_key = NULL;
866 ctx.ltu_id = ldap_pvt_thread_self();
867 TID_HASH(ctx.ltu_id, hash);
869 ldap_pvt_thread_key_setdata( ldap_tpool_key, &ctx );
871 if (pool->ltp_pause) {
872 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
873 /* thread_keys[] is read-only when paused */
874 while (pool->ltp_pause)
875 ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
876 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
879 /* find a key slot to give this thread ID and store a
880 * pointer to our keys there; start at the thread ID
881 * itself (mod LDAP_MAXTHR) and look for an empty slot.
883 ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
884 for (keyslot = hash & (LDAP_MAXTHR-1);
885 (kctx = thread_keys[keyslot].ctx) && kctx != DELETED_THREAD_CTX;
886 keyslot = (keyslot+1) & (LDAP_MAXTHR-1));
887 thread_keys[keyslot].ctx = &ctx;
888 ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
890 ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
892 pq->ltp_active_count++;
895 work_list = pq->ltp_work_list; /* help the compiler a bit */
896 task = LDAP_STAILQ_FIRST(work_list);
897 if (task == NULL) { /* paused or no pending tasks */
898 if (--(pq->ltp_active_count) < 1) {
899 if (pool->ltp_pause) {
900 ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
901 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
903 if (--(pool->ltp_active_queues) < 1) {
904 /* Notify pool_pause it is the sole active thread. */
905 ldap_pvt_thread_cond_signal(&pool->ltp_pcond);
911 if (pool->ltp_finishing || pq->ltp_open_count > pq->ltp_max_count) {
912 /* Not paused, and either finishing or too many
913 * threads running (can happen if ltp_max_count
914 * was reduced). Let this thread die.
919 /* We could check an idle timer here, and let the
920 * thread die if it has been inactive for a while.
921 * Only die if there are other open threads (i.e.,
922 * always have at least one thread open).
923 * The check should be like this:
924 * if (pool->ltp_open_count>1 && pool->ltp_starting==0)
925 * check timer, wait if ltp_pause, leave thread;
927 * Just use pthread_cond_timedwait() if we want to
931 ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
932 if (!pool->ltp_pause) {
933 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
934 ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
938 ldap_pvt_thread_cond_wait(&pq->ltp_cond, &pq->ltp_mutex);
940 work_list = pq->ltp_work_list;
941 task = LDAP_STAILQ_FIRST(work_list);
942 } while (task == NULL);
945 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
946 ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
949 pq->ltp_active_count++;
952 LDAP_STAILQ_REMOVE_HEAD(work_list, ltt_next.q);
953 pq->ltp_pending_count--;
954 ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
956 task->ltt_start_routine(&ctx, task->ltt_arg);
958 ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
959 LDAP_SLIST_INSERT_HEAD(&pq->ltp_free_list, task, ltt_next.l);
963 ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
965 /* The pool_mutex lock protects ctx->ltu_key from pool_purgekey()
966 * during this call, since it prevents new pauses. */
967 ldap_pvt_thread_pool_context_reset(&ctx);
969 thread_keys[keyslot].ctx = DELETED_THREAD_CTX;
970 ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
972 pq->ltp_open_count--;
973 if (pq->ltp_open_count == 0) {
974 if (pool->ltp_finishing)
975 /* let pool_destroy know we're all done */
976 ldap_pvt_thread_cond_signal(&pq->ltp_cond);
982 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
984 ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
987 ldap_pvt_thread_cond_destroy(&pq->ltp_cond);
988 ldap_pvt_thread_mutex_destroy(&pq->ltp_mutex);
989 LDAP_FREE(pq->ltp_free);
992 ldap_pvt_thread_exit(NULL);
996 /* Arguments > ltp_pause to handle_pause(,PAUSE_ARG()). arg=PAUSE_ARG
997 * ensures (arg-ltp_pause) sets GO_* at need and keeps DO_PAUSE/GO_*.
1000 #define GO_UNIDLE 16
1001 #define CHECK_PAUSE 32 /* if ltp_pause: GO_IDLE; wait; GO_UNIDLE */
1002 #define DO_PAUSE 64 /* CHECK_PAUSE; pause the pool */
1003 #define PAUSE_ARG(a) \
1004 ((a) | ((a) & (GO_IDLE|GO_UNIDLE) ? GO_IDLE-1 : CHECK_PAUSE))
1007 handle_pause( ldap_pvt_thread_pool_t *tpool, int pause_type )
1009 struct ldap_int_thread_pool_s *pool;
1010 struct ldap_int_thread_poolq_s *pq;
1011 int ret = 0, pause, max_ltp_pause;
1021 if (pause_type == CHECK_PAUSE && !pool->ltp_pause)
1025 ldap_int_thread_userctx_t *ctx = ldap_pvt_thread_pool_context();
1029 /* Let pool_unidle() ignore requests for new pauses */
1030 max_ltp_pause = pause_type==PAUSE_ARG(GO_UNIDLE) ? WANT_PAUSE : NOT_PAUSED;
1032 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
1034 pause = pool->ltp_pause; /* NOT_PAUSED, WANT_PAUSE or PAUSED */
1036 /* If ltp_pause and not GO_IDLE|GO_UNIDLE: Set GO_IDLE,GO_UNIDLE */
1037 pause_type -= pause;
1039 if (pause_type & GO_IDLE) {
1041 ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
1042 pq->ltp_pending_count++;
1043 pq->ltp_active_count--;
1044 if (pause && pq->ltp_active_count < 1) {
1047 ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
1049 pool->ltp_active_queues--;
1050 if (pool->ltp_active_queues < 1)
1051 /* Tell the task waiting to DO_PAUSE it can proceed */
1052 ldap_pvt_thread_cond_signal(&pool->ltp_pcond);
1056 if (pause_type & GO_UNIDLE) {
1057 /* Wait out pause if any, then cancel GO_IDLE */
1058 if (pause > max_ltp_pause) {
1061 ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
1062 } while (pool->ltp_pause > max_ltp_pause);
1064 ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
1065 pq->ltp_pending_count--;
1066 pq->ltp_active_count++;
1067 ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
1070 if (pause_type & DO_PAUSE) {
1072 /* Tell everyone else to pause or finish, then await that */
1074 assert(!pool->ltp_pause);
1075 pool->ltp_pause = WANT_PAUSE;
1076 pool->ltp_active_queues = 0;
1078 for (i=0; i<pool->ltp_numqs; i++)
1079 if (pool->ltp_wqs[i] == pq) break;
1081 ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
1082 /* temporarily remove ourself from active count */
1083 pq->ltp_active_count--;
1087 pq = pool->ltp_wqs[j];
1089 ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
1091 /* Hide pending tasks from ldap_pvt_thread_pool_wrapper() */
1092 pq->ltp_work_list = &empty_pending_list;
1094 if (pq->ltp_active_count > 0)
1095 pool->ltp_active_queues++;
1097 ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
1098 if (pool->ltp_numqs > 1) {
1100 j %= pool->ltp_numqs;
1104 /* Wait for this task to become the sole active task */
1105 while (pool->ltp_active_queues > 0)
1106 ldap_pvt_thread_cond_wait(&pool->ltp_pcond, &pool->ltp_mutex);
1108 /* restore us to active count */
1109 pool->ltp_wqs[i]->ltp_active_count++;
1111 assert(pool->ltp_pause == WANT_PAUSE);
1112 pool->ltp_pause = PAUSED;
1114 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
1119 /* Consider this task idle: It will not block pool_pause() in other tasks. */
1121 ldap_pvt_thread_pool_idle( ldap_pvt_thread_pool_t *tpool )
1123 handle_pause(tpool, PAUSE_ARG(GO_IDLE));
1126 /* Cancel pool_idle(). If the pool is paused, wait it out first. */
1128 ldap_pvt_thread_pool_unidle( ldap_pvt_thread_pool_t *tpool )
1130 handle_pause(tpool, PAUSE_ARG(GO_UNIDLE));
1134 * If a pause was requested, wait for it. If several threads
1135 * are waiting to pause, let through one or more pauses.
1136 * The calling task must be active, not idle.
1137 * Return 1 if we waited, 0 if not, -1 at parameter error.
1140 ldap_pvt_thread_pool_pausecheck( ldap_pvt_thread_pool_t *tpool )
1142 return handle_pause(tpool, PAUSE_ARG(CHECK_PAUSE));
1146 * Pause the pool. The calling task must be active, not idle.
1147 * Return when all other tasks are paused or idle.
1150 ldap_pvt_thread_pool_pause( ldap_pvt_thread_pool_t *tpool )
1152 return handle_pause(tpool, PAUSE_ARG(DO_PAUSE));
1157 ldap_pvt_thread_pool_resume (
1158 ldap_pvt_thread_pool_t *tpool )
1160 struct ldap_int_thread_pool_s *pool;
1161 struct ldap_int_thread_poolq_s *pq;
1172 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
1173 assert(pool->ltp_pause == PAUSED);
1174 pool->ltp_pause = 0;
1175 for (i=0; i<pool->ltp_numqs; i++) {
1176 pq = pool->ltp_wqs[i];
1177 pq->ltp_work_list = &pq->ltp_pending_list;
1178 ldap_pvt_thread_cond_broadcast(&pq->ltp_cond);
1180 ldap_pvt_thread_cond_broadcast(&pool->ltp_cond);
1181 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
1186 * Get the key's data and optionally free function in the given context.
1188 int ldap_pvt_thread_pool_getkey(
1192 ldap_pvt_thread_pool_keyfree_t **kfree )
1194 ldap_int_thread_userctx_t *ctx = xctx;
1197 if ( !ctx || !key || !data ) return EINVAL;
1199 for ( i=0; i<MAXKEYS && ctx->ltu_key[i].ltk_key; i++ ) {
1200 if ( ctx->ltu_key[i].ltk_key == key ) {
1201 *data = ctx->ltu_key[i].ltk_data;
1202 if ( kfree ) *kfree = ctx->ltu_key[i].ltk_free;
1210 clear_key_idx( ldap_int_thread_userctx_t *ctx, int i )
1212 for ( ; i < MAXKEYS-1 && ctx->ltu_key[i+1].ltk_key; i++ )
1213 ctx->ltu_key[i] = ctx->ltu_key[i+1];
1214 ctx->ltu_key[i].ltk_key = NULL;
1218 * Set or remove data for the key in the given context.
1219 * key can be any unique pointer.
1220 * kfree() is an optional function to free the data (but not the key):
1221 * pool_context_reset() and pool_purgekey() call kfree(key, data),
1222 * but pool_setkey() does not. For pool_setkey() it is the caller's
1223 * responsibility to free any existing data with the same key.
1224 * kfree() must not call functions taking a tpool argument.
1226 int ldap_pvt_thread_pool_setkey(
1230 ldap_pvt_thread_pool_keyfree_t *kfree,
1232 ldap_pvt_thread_pool_keyfree_t **oldkfreep )
1234 ldap_int_thread_userctx_t *ctx = xctx;
1237 if ( !ctx || !key ) return EINVAL;
1239 for ( i=found=0; i<MAXKEYS; i++ ) {
1240 if ( ctx->ltu_key[i].ltk_key == key ) {
1243 } else if ( !ctx->ltu_key[i].ltk_key ) {
1250 *olddatap = ctx->ltu_key[i].ltk_data;
1258 *oldkfreep = ctx->ltu_key[i].ltk_free;
1264 if ( data || kfree ) {
1267 ctx->ltu_key[i].ltk_key = key;
1268 ctx->ltu_key[i].ltk_data = data;
1269 ctx->ltu_key[i].ltk_free = kfree;
1270 } else if ( found ) {
1271 clear_key_idx( ctx, i );
1277 /* Free all elements with this key, no matter which thread they're in.
1278 * May only be called while the pool is paused.
1280 void ldap_pvt_thread_pool_purgekey( void *key )
1283 ldap_int_thread_userctx_t *ctx;
1285 assert ( key != NULL );
1287 ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
1288 for ( i=0; i<LDAP_MAXTHR; i++ ) {
1289 ctx = thread_keys[i].ctx;
1290 if ( ctx && ctx != DELETED_THREAD_CTX ) {
1291 for ( j=0; j<MAXKEYS && ctx->ltu_key[j].ltk_key; j++ ) {
1292 if ( ctx->ltu_key[j].ltk_key == key ) {
1293 if (ctx->ltu_key[j].ltk_free)
1294 ctx->ltu_key[j].ltk_free( ctx->ltu_key[j].ltk_key,
1295 ctx->ltu_key[j].ltk_data );
1296 clear_key_idx( ctx, j );
1302 ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
1306 * Find the context of the current thread.
1307 * This is necessary if the caller does not have access to the
1308 * thread context handle (for example, a slapd plugin calling
1309 * slapi_search_internal()). No doubt it is more efficient
1310 * for the application to keep track of the thread context
1313 void *ldap_pvt_thread_pool_context( )
1317 ldap_pvt_thread_key_getdata( ldap_tpool_key, &ctx );
1318 return ctx ? ctx : (void *) &ldap_int_main_thrctx;
1322 * Free the context's keys.
1323 * Must not call functions taking a tpool argument (because this
1324 * thread already holds ltp_mutex when called from pool_wrapper()).
1326 void ldap_pvt_thread_pool_context_reset( void *vctx )
1328 ldap_int_thread_userctx_t *ctx = vctx;
1331 for ( i=MAXKEYS-1; i>=0; i--) {
1332 if ( !ctx->ltu_key[i].ltk_key )
1334 if ( ctx->ltu_key[i].ltk_free )
1335 ctx->ltu_key[i].ltk_free( ctx->ltu_key[i].ltk_key,
1336 ctx->ltu_key[i].ltk_data );
1337 ctx->ltu_key[i].ltk_key = NULL;
1341 ldap_pvt_thread_t ldap_pvt_thread_pool_tid( void *vctx )
1343 ldap_int_thread_userctx_t *ctx = vctx;
1347 #endif /* LDAP_THREAD_HAVE_TPOOL */