2 /* This work is part of OpenLDAP Software <http://www.openldap.org/>.
4 * Copyright 1998-2005 The OpenLDAP Foundation.
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted only as authorized by the OpenLDAP
11 * A copy of this license is available in file LICENSE in the
12 * top-level directory of the distribution or, alternatively, at
13 * <http://www.OpenLDAP.org/license.html>.
20 #include <ac/stdarg.h>
21 #include <ac/stdlib.h>
22 #include <ac/string.h>
27 #include "ldap_pvt_thread.h" /* Get the thread interface */
28 #include "ldap_queue.h"
29 #define LDAP_THREAD_POOL_IMPLEMENTATION
30 #include "ldap_thr_debug.h" /* May rename symbols defined below */
32 #ifndef LDAP_THREAD_HAVE_TPOOL
34 typedef enum ldap_int_thread_pool_state_e {
35 LDAP_INT_THREAD_POOL_RUNNING,
36 LDAP_INT_THREAD_POOL_FINISHING,
37 LDAP_INT_THREAD_POOL_STOPPING,
38 LDAP_INT_THREAD_POOL_PAUSING
39 } ldap_int_thread_pool_state_t;
41 typedef struct ldap_int_thread_key_s {
44 ldap_pvt_thread_pool_keyfree_t *ltk_free;
45 } ldap_int_thread_key_t;
47 /* Max number of thread-specific keys we store per thread.
48 * We don't expect to use many...
51 #define LDAP_MAXTHR 1024 /* must be a power of 2 */
53 static ldap_pvt_thread_t tid_zero;
57 ldap_int_thread_key_t *ctx;
58 } thread_keys[LDAP_MAXTHR];
61 typedef struct ldap_int_thread_ctx_s {
63 LDAP_STAILQ_ENTRY(ldap_int_thread_ctx_s) q;
64 LDAP_SLIST_ENTRY(ldap_int_thread_ctx_s) l;
65 LDAP_SLIST_ENTRY(ldap_int_thread_ctx_s) al;
67 ldap_pvt_thread_start_t *ltc_start_routine;
69 } ldap_int_thread_ctx_t;
71 struct ldap_int_thread_pool_s {
72 LDAP_STAILQ_ENTRY(ldap_int_thread_pool_s) ltp_next;
73 ldap_pvt_thread_mutex_t ltp_mutex;
74 ldap_pvt_thread_cond_t ltp_cond;
75 ldap_pvt_thread_cond_t ltp_pcond;
76 LDAP_STAILQ_HEAD(tcq, ldap_int_thread_ctx_s) ltp_pending_list;
77 LDAP_SLIST_HEAD(tcl, ldap_int_thread_ctx_s) ltp_free_list;
78 LDAP_SLIST_HEAD(tclq, ldap_int_thread_ctx_s) ltp_active_list;
79 ldap_int_thread_pool_state_t ltp_state;
82 long ltp_pending_count;
83 long ltp_active_count;
88 static LDAP_STAILQ_HEAD(tpq, ldap_int_thread_pool_s)
89 ldap_int_thread_pool_list =
90 LDAP_STAILQ_HEAD_INITIALIZER(ldap_int_thread_pool_list);
92 static ldap_pvt_thread_mutex_t ldap_pvt_thread_pool_mutex;
94 static void *ldap_int_thread_pool_wrapper( void *pool );
96 static ldap_pvt_thread_t ldap_int_main_tid;
98 static ldap_int_thread_key_t ldap_int_main_thrctx[LDAP_MAXTHR];
101 ldap_int_thread_pool_startup ( void )
103 ldap_int_main_tid = ldap_pvt_thread_self();
105 return ldap_pvt_thread_mutex_init(&ldap_pvt_thread_pool_mutex);
109 ldap_int_thread_pool_shutdown ( void )
111 struct ldap_int_thread_pool_s *pool;
113 while ((pool = LDAP_STAILQ_FIRST(&ldap_int_thread_pool_list)) != NULL) {
114 LDAP_STAILQ_REMOVE_HEAD(&ldap_int_thread_pool_list, ltp_next);
115 (ldap_pvt_thread_pool_destroy)(&pool, 0); /* ignore thr_debug macro */
117 ldap_pvt_thread_mutex_destroy(&ldap_pvt_thread_pool_mutex);
122 ldap_pvt_thread_pool_init (
123 ldap_pvt_thread_pool_t *tpool,
127 ldap_pvt_thread_pool_t pool;
131 pool = (ldap_pvt_thread_pool_t) LDAP_CALLOC(1,
132 sizeof(struct ldap_int_thread_pool_s));
134 if (pool == NULL) return(-1);
136 rc = ldap_pvt_thread_mutex_init(&pool->ltp_mutex);
139 rc = ldap_pvt_thread_cond_init(&pool->ltp_cond);
142 rc = ldap_pvt_thread_cond_init(&pool->ltp_pcond);
145 pool->ltp_state = LDAP_INT_THREAD_POOL_RUNNING;
146 pool->ltp_max_count = max_threads;
147 pool->ltp_max_pending = max_pending;
148 LDAP_STAILQ_INIT(&pool->ltp_pending_list);
149 LDAP_SLIST_INIT(&pool->ltp_free_list);
150 LDAP_SLIST_INIT(&pool->ltp_active_list);
151 ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
152 LDAP_STAILQ_INSERT_TAIL(&ldap_int_thread_pool_list, pool, ltp_next);
153 ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
156 /* THIS WILL NOT WORK on some systems. If the process
157 * forks after starting a thread, there is no guarantee
158 * that the thread will survive the fork. For example,
159 * slapd forks in order to daemonize, and does so after
160 * calling ldap_pvt_thread_pool_init. On some systems,
161 * this initial thread does not run in the child process,
162 * but ltp_open_count == 1, so two things happen:
163 * 1) the first client connection fails, and 2) when
164 * slapd is kill'ed, it never terminates since it waits
165 * for all worker threads to exit. */
167 /* start up one thread, just so there is one. no need to
168 * lock the mutex right now, since no threads are running.
170 pool->ltp_open_count++;
172 ldap_pvt_thread_t thr;
173 rc = ldap_pvt_thread_create( &thr, 1, ldap_int_thread_pool_wrapper, pool );
176 /* couldn't start one? then don't start any */
177 ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
178 LDAP_STAILQ_REMOVE(ldap_int_thread_pool_list, pool,
179 ldap_int_thread_pool_s, ltp_next);
180 ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
181 ldap_pvt_thread_cond_destroy(&pool->ltp_pcond);
182 ldap_pvt_thread_cond_destroy(&pool->ltp_cond);
183 ldap_pvt_thread_mutex_destroy(&pool->ltp_mutex);
193 #define TID_HASH(tid, hash) do { unsigned i; \
194 unsigned char *ptr = (unsigned char *)&(tid); \
195 for (i=0, hash=0; i<sizeof(tid); i++) hash += ptr[i]; } while(0)
198 ldap_pvt_thread_pool_submit (
199 ldap_pvt_thread_pool_t *tpool,
200 ldap_pvt_thread_start_t *start_routine, void *arg )
202 struct ldap_int_thread_pool_s *pool;
203 ldap_int_thread_ctx_t *ctx;
205 ldap_pvt_thread_t thr;
215 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
216 if ((pool->ltp_state != LDAP_INT_THREAD_POOL_RUNNING &&
217 pool->ltp_state != LDAP_INT_THREAD_POOL_PAUSING)
218 || (pool->ltp_max_pending > 0
219 && pool->ltp_pending_count >= pool->ltp_max_pending))
221 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
224 ctx = LDAP_SLIST_FIRST(&pool->ltp_free_list);
226 LDAP_SLIST_REMOVE_HEAD(&pool->ltp_free_list, ltc_next.l);
228 ctx = (ldap_int_thread_ctx_t *) LDAP_MALLOC(
229 sizeof(ldap_int_thread_ctx_t));
231 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
236 ctx->ltc_start_routine = start_routine;
239 pool->ltp_pending_count++;
240 LDAP_STAILQ_INSERT_TAIL(&pool->ltp_pending_list, ctx, ltc_next.q);
241 if (pool->ltp_state == LDAP_INT_THREAD_POOL_PAUSING) {
242 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
245 ldap_pvt_thread_cond_signal(&pool->ltp_cond);
246 if ((pool->ltp_open_count <= 0
248 || pool->ltp_pending_count > 1
250 || pool->ltp_open_count == pool->ltp_active_count)
251 && (pool->ltp_max_count <= 0
252 || pool->ltp_open_count < pool->ltp_max_count))
254 pool->ltp_open_count++;
255 pool->ltp_starting++;
258 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
263 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
265 rc = ldap_pvt_thread_create( &thr, 1,
266 ldap_int_thread_pool_wrapper, pool );
269 pool->ltp_starting--;
271 /* assign this thread ID to a key slot; start
272 * at the thread ID itself (mod LDAP_MAXTHR) and
273 * look for an empty slot.
276 for (rc = hash & (LDAP_MAXTHR-1);
277 !ldap_pvt_thread_equal(thread_keys[rc].id, tid_zero);
278 rc = (rc+1) & (LDAP_MAXTHR-1));
279 thread_keys[rc].id = thr;
281 /* couldn't create thread. back out of
282 * ltp_open_count and check for even worse things.
284 pool->ltp_open_count--;
285 pool->ltp_starting--;
286 if (pool->ltp_open_count == 0) {
287 /* no open threads at all?!?
289 ldap_int_thread_ctx_t *ptr;
290 LDAP_STAILQ_FOREACH(ptr, &pool->ltp_pending_list, ltc_next.q)
291 if (ptr == ctx) break;
293 /* no open threads, context not handled, so
294 * back out of ltp_pending_count, free the context,
297 LDAP_STAILQ_REMOVE(&pool->ltp_pending_list, ctx,
298 ldap_int_thread_ctx_s, ltc_next.q);
299 pool->ltp_pending_count++;
300 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
305 /* there is another open thread, so this
306 * context will be handled eventually.
307 * continue on and signal that the context
311 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
318 ldap_pvt_thread_pool_maxthreads ( ldap_pvt_thread_pool_t *tpool, int max_threads )
320 struct ldap_int_thread_pool_s *pool;
330 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
331 pool->ltp_max_count = max_threads;
332 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
337 ldap_pvt_thread_pool_backload ( ldap_pvt_thread_pool_t *tpool )
339 struct ldap_int_thread_pool_s *pool;
350 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
351 count = pool->ltp_pending_count + pool->ltp_active_count;
352 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
357 ldap_pvt_thread_pool_destroy ( ldap_pvt_thread_pool_t *tpool, int run_pending )
359 struct ldap_int_thread_pool_s *pool, *pptr;
361 ldap_int_thread_ctx_t *ctx;
368 if (pool == NULL) return(-1);
370 ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
371 LDAP_STAILQ_FOREACH(pptr, &ldap_int_thread_pool_list, ltp_next)
372 if (pptr == pool) break;
374 LDAP_STAILQ_REMOVE(&ldap_int_thread_pool_list, pool,
375 ldap_int_thread_pool_s, ltp_next);
376 ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
378 if (pool != pptr) return(-1);
380 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
381 pool->ltp_state = run_pending
382 ? LDAP_INT_THREAD_POOL_FINISHING
383 : LDAP_INT_THREAD_POOL_STOPPING;
385 ldap_pvt_thread_cond_broadcast(&pool->ltp_cond);
386 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
389 ldap_pvt_thread_yield();
390 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
391 waiting = pool->ltp_open_count;
392 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
393 } while (waiting > 0);
395 while ((ctx = LDAP_STAILQ_FIRST(&pool->ltp_pending_list)) != NULL)
397 LDAP_STAILQ_REMOVE_HEAD(&pool->ltp_pending_list, ltc_next.q);
401 while ((ctx = LDAP_SLIST_FIRST(&pool->ltp_free_list)) != NULL)
403 LDAP_SLIST_REMOVE_HEAD(&pool->ltp_free_list, ltc_next.l);
407 ldap_pvt_thread_cond_destroy(&pool->ltp_pcond);
408 ldap_pvt_thread_cond_destroy(&pool->ltp_cond);
409 ldap_pvt_thread_mutex_destroy(&pool->ltp_mutex);
415 ldap_int_thread_pool_wrapper (
418 struct ldap_int_thread_pool_s *pool = xpool;
419 ldap_int_thread_ctx_t *ctx;
420 ldap_int_thread_key_t ltc_key[MAXKEYS];
421 ldap_pvt_thread_t tid;
422 int i, keyslot, hash;
427 for ( i=0; i<MAXKEYS; i++ ) {
428 ltc_key[i].ltk_key = NULL;
431 tid = ldap_pvt_thread_self();
433 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
435 /* store pointer to our keys */
437 for (i = hash & (LDAP_MAXTHR-1);
438 !ldap_pvt_thread_equal(thread_keys[i].id, tid);
439 i = (i+1) & (LDAP_MAXTHR-1));
440 thread_keys[i].ctx = ltc_key;
443 while (pool->ltp_state != LDAP_INT_THREAD_POOL_STOPPING) {
444 ctx = LDAP_STAILQ_FIRST(&pool->ltp_pending_list);
446 LDAP_STAILQ_REMOVE_HEAD(&pool->ltp_pending_list, ltc_next.q);
448 if (pool->ltp_state == LDAP_INT_THREAD_POOL_FINISHING)
450 if (pool->ltp_max_count > 0
451 && pool->ltp_open_count > pool->ltp_max_count)
453 /* too many threads running (can happen if the
454 * maximum threads value is set during ongoing
455 * operation using ldap_pvt_thread_pool_maxthreads)
456 * so let this thread die.
461 /* we could check an idle timer here, and let the
462 * thread die if it has been inactive for a while.
463 * only die if there are other open threads (i.e.,
464 * always have at least one thread open). the check
465 * should be like this:
466 * if (pool->ltp_open_count > 1 && pool->ltp_starting == 0)
467 * check timer, leave thread (break;)
470 if (pool->ltp_state == LDAP_INT_THREAD_POOL_RUNNING
471 || pool->ltp_state == LDAP_INT_THREAD_POOL_PAUSING)
473 ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
479 pool->ltp_pending_count--;
481 LDAP_SLIST_INSERT_HEAD(&pool->ltp_active_list, ctx, ltc_next.al);
482 pool->ltp_active_count++;
483 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
485 ctx->ltc_start_routine(ltc_key, ctx->ltc_arg);
487 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
488 LDAP_SLIST_REMOVE(&pool->ltp_active_list, ctx,
489 ldap_int_thread_ctx_s, ltc_next.al);
490 LDAP_SLIST_INSERT_HEAD(&pool->ltp_free_list, ctx, ltc_next.l);
491 pool->ltp_active_count--;
493 if (pool->ltp_state == LDAP_INT_THREAD_POOL_PAUSING) {
494 if (pool->ltp_active_count < 2) {
495 ldap_pvt_thread_cond_signal(&pool->ltp_pcond);
497 ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
499 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
501 ldap_pvt_thread_yield();
503 /* if we use an idle timer, here's
504 * a good place to update it
507 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
510 for ( i=0; i<MAXKEYS && ltc_key[i].ltk_key; i++ ) {
511 if (ltc_key[i].ltk_free)
514 ltc_key[i].ltk_data );
517 thread_keys[keyslot].ctx = NULL;
518 thread_keys[keyslot].id = tid_zero;
520 pool->ltp_open_count--;
521 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
523 ldap_pvt_thread_exit(NULL);
528 ldap_pvt_thread_pool_pause (
529 ldap_pvt_thread_pool_t *tpool )
531 struct ldap_int_thread_pool_s *pool;
541 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
543 /* If someone else has already requested a pause, we have to wait */
544 while (pool->ltp_state == LDAP_INT_THREAD_POOL_PAUSING) {
545 pool->ltp_pending_count++;
546 pool->ltp_active_count--;
547 ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
548 pool->ltp_pending_count--;
549 pool->ltp_active_count++;
551 /* Wait for everyone else to finish */
552 pool->ltp_state = LDAP_INT_THREAD_POOL_PAUSING;
553 while (pool->ltp_active_count > 1) {
554 ldap_pvt_thread_cond_wait(&pool->ltp_pcond, &pool->ltp_mutex);
556 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
561 ldap_pvt_thread_pool_resume (
562 ldap_pvt_thread_pool_t *tpool )
564 struct ldap_int_thread_pool_s *pool;
574 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
576 pool->ltp_state = LDAP_INT_THREAD_POOL_RUNNING;
577 ldap_pvt_thread_cond_broadcast(&pool->ltp_cond);
578 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
582 int ldap_pvt_thread_pool_getkey(
586 ldap_pvt_thread_pool_keyfree_t **kfree )
588 ldap_int_thread_key_t *ctx = xctx;
591 if ( !ctx || !data ) return EINVAL;
593 for ( i=0; i<MAXKEYS && ctx[i].ltk_key; i++ ) {
594 if ( ctx[i].ltk_key == key ) {
595 *data = ctx[i].ltk_data;
596 if ( kfree ) *kfree = ctx[i].ltk_free;
603 int ldap_pvt_thread_pool_setkey(
607 ldap_pvt_thread_pool_keyfree_t *kfree )
609 ldap_int_thread_key_t *ctx = xctx;
612 if ( !ctx || !key ) return EINVAL;
614 for ( i=0; i<MAXKEYS; i++ ) {
615 if ( !ctx[i].ltk_key || ctx[i].ltk_key == key ) {
616 ctx[i].ltk_key = key;
617 ctx[i].ltk_data = data;
618 ctx[i].ltk_free = kfree;
625 /* Free all elements with this key, no matter which thread they're in.
626 * May only be called while the pool is paused.
628 void ldap_pvt_thread_pool_purgekey( void *key )
631 ldap_int_thread_key_t *ctx;
633 for ( i=0; i<LDAP_MAXTHR; i++ ) {
634 if ( thread_keys[i].ctx ) {
635 ctx = thread_keys[i].ctx;
636 for ( j=0; j<MAXKEYS; j++ ) {
637 if ( ctx[j].ltk_key == key ) {
639 ctx[j].ltk_free( ctx[j].ltk_key, ctx[j].ltk_data );
640 ctx[j].ltk_key = NULL;
641 ctx[j].ltk_free = NULL;
650 * This is necessary if the caller does not have access to the
651 * thread context handle (for example, a slapd plugin calling
652 * slapi_search_internal()). No doubt it is more efficient to
653 * for the application to keep track of the thread context
656 void *ldap_pvt_thread_pool_context( )
658 ldap_pvt_thread_t tid;
661 tid = ldap_pvt_thread_self();
662 if ( ldap_pvt_thread_equal( tid, ldap_int_main_tid ))
663 return ldap_int_main_thrctx;
665 TID_HASH( tid, hash );
666 for (i = hash & (LDAP_MAXTHR-1);
667 !ldap_pvt_thread_equal(thread_keys[i].id, tid_zero) &&
668 !ldap_pvt_thread_equal(thread_keys[i].id, tid);
669 i = (i+1) & (LDAP_MAXTHR-1));
671 return thread_keys[i].ctx;
674 void ldap_pvt_thread_pool_context_reset( void *vctx )
676 ldap_int_thread_key_t *ctx = vctx;
679 for ( i=0; i<MAXKEYS && ctx[i].ltk_key; i++) {
680 if ( ctx[i].ltk_free )
681 ctx[i].ltk_free( ctx[i].ltk_key, ctx[i].ltk_data );
682 ctx[i].ltk_key = NULL;
685 #endif /* LDAP_THREAD_HAVE_TPOOL */