]> git.sur5r.net Git - openldap/blob - libraries/libldap_r/tpool.c
e8536010af356d942eb086d4acaec79623c5f7f5
[openldap] / libraries / libldap_r / tpool.c
1 /* $OpenLDAP$ */
2 /* This work is part of OpenLDAP Software <http://www.openldap.org/>.
3  *
4  * Copyright 1998-2008 The OpenLDAP Foundation.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted only as authorized by the OpenLDAP
9  * Public License.
10  *
11  * A copy of this license is available in file LICENSE in the
12  * top-level directory of the distribution or, alternatively, at
13  * <http://www.OpenLDAP.org/license.html>.
14  */
15
16 #include "portable.h"
17
18 #include <stdio.h>
19
20 #include <ac/signal.h>
21 #include <ac/stdarg.h>
22 #include <ac/stdlib.h>
23 #include <ac/string.h>
24 #include <ac/time.h>
25 #include <ac/errno.h>
26
27 #include "ldap-int.h"
28 #include "ldap_pvt_thread.h" /* Get the thread interface */
29 #include "ldap_queue.h"
30 #define LDAP_THREAD_POOL_IMPLEMENTATION
31 #include "ldap_thr_debug.h"  /* May rename symbols defined below */
32
33 #ifndef LDAP_THREAD_HAVE_TPOOL
34
35 /* Thread-specific key with data and optional free function */
36 typedef struct ldap_int_tpool_key_s {
37         void *ltk_key;
38         void *ltk_data;
39         ldap_pvt_thread_pool_keyfree_t *ltk_free;
40 } ldap_int_tpool_key_t;
41
42 /* Max number of thread-specific keys we store per thread.
43  * We don't expect to use many...
44  */
45 #define MAXKEYS 32
46
47 /* Max number of threads */
48 #define LDAP_MAXTHR     1024    /* must be a power of 2 */
49
50 /* (Theoretical) max number of pending requests */
51 #define MAX_PENDING (INT_MAX/2) /* INT_MAX - (room to avoid overflow) */
52
53 /* Context: thread ID and thread-specific key/data pairs */
54 typedef struct ldap_int_thread_userctx_s {
55         ldap_pvt_thread_t ltu_id;
56         ldap_int_tpool_key_t ltu_key[MAXKEYS];
57 } ldap_int_thread_userctx_t;
58
59
60 /* Simple {thread ID -> context} hash table; key=ctx->ltu_id.
61  * Protected by ldap_pvt_thread_pool_mutex except during pauses,
62  * when it is read-only (used by pool_purgekey and pool_context).
63  * Protected by tpool->ltp_mutex during pauses.
64  */
65 static struct {
66         ldap_int_thread_userctx_t *ctx;
67         /* ctx is valid when not NULL or DELETED_THREAD_CTX */
68 #       define DELETED_THREAD_CTX (&ldap_int_main_thrctx + 1) /* dummy addr */
69 } thread_keys[LDAP_MAXTHR];
70
71 #define TID_HASH(tid, hash) do { \
72         unsigned const char *ptr_ = (unsigned const char *)&(tid); \
73         unsigned i_; \
74         for (i_ = 0, (hash) = ptr_[0]; ++i_ < sizeof(tid);) \
75                 (hash) += ((hash) << 5) ^ ptr_[i_]; \
76 } while(0)
77
78
79 /* Task for a thread to perform */
80 typedef struct ldap_int_thread_task_s {
81         union {
82                 LDAP_STAILQ_ENTRY(ldap_int_thread_task_s) q;
83                 LDAP_SLIST_ENTRY(ldap_int_thread_task_s) l;
84         } ltt_next;
85         ldap_pvt_thread_start_t *ltt_start_routine;
86         void *ltt_arg;
87 } ldap_int_thread_task_t;
88
89 typedef LDAP_STAILQ_HEAD(tcq, ldap_int_thread_task_s) ldap_int_tpool_plist_t;
90
91 struct ldap_int_thread_pool_s {
92         LDAP_STAILQ_ENTRY(ldap_int_thread_pool_s) ltp_next;
93
94         /* protect members below, and protect thread_keys[] during pauses */
95         ldap_pvt_thread_mutex_t ltp_mutex;
96
97         /* not paused and something to do for pool_<wrapper/pause/destroy>() */
98         ldap_pvt_thread_cond_t ltp_cond;
99
100         /* ltp_active_count <= 1 && ltp_pause */
101         ldap_pvt_thread_cond_t ltp_pcond;
102
103         /* pending tasks, and unused task objects */
104         ldap_int_tpool_plist_t ltp_pending_list;
105         LDAP_SLIST_HEAD(tcl, ldap_int_thread_task_s) ltp_free_list;
106
107         /* The pool is finishing, waiting for its threads to close.
108          * They close when ltp_pending_list is done.  pool_submit()
109          * rejects new tasks.  ltp_max_pending = -(its old value).
110          */
111         int ltp_finishing;
112
113         /* Some active task needs to be the sole active task.
114          * Atomic variable so ldap_pvt_thread_pool_pausing() can read it.
115          */
116         volatile sig_atomic_t ltp_pause;
117
118         /* Max number of threads in pool, or 0 for default (LDAP_MAXTHR) */
119         int ltp_max_count;
120
121         /* Max number of pending + paused requests, negated when ltp_finishing */
122         int ltp_max_pending;
123
124         int ltp_pending_count;          /* Pending or paused requests */
125         int ltp_active_count;           /* Active, not paused requests */
126         int ltp_open_count;                     /* Number of threads */
127         int ltp_starting;                       /* Currenlty starting threads */
128
129         /*
130          * State maintained to reduce the time ltp_mutex must be locked
131          * in ldap_pvt_thread_pool_<submit/wrapper>().
132          */
133
134         /* ltp_pause == 0 ? &ltp_pending_list : &empty_pending_list */
135         ldap_int_tpool_plist_t *ltp_work_list;
136
137         /* >0 if paused or we may open a thread, <0 if we should close a thread. */
138         /* Updated when ltp_<finishing/pause/max_count/open_count> change. */
139         int ltp_vary_open_count;
140 #       define SET_VARY_OPEN_COUNT(pool)        \
141                 ((pool)->ltp_vary_open_count =  \
142                  (pool)->ltp_pause      ?  1 :  \
143                  (pool)->ltp_finishing  ? -1 :  \
144                  ((pool)->ltp_max_count ? (pool)->ltp_max_count : LDAP_MAXTHR) \
145                  - (pool)->ltp_open_count)
146 };
147
148 static ldap_int_tpool_plist_t empty_pending_list =
149         LDAP_STAILQ_HEAD_INITIALIZER(empty_pending_list);
150
151 static int ldap_int_has_thread_pool = 0;
152 static LDAP_STAILQ_HEAD(tpq, ldap_int_thread_pool_s)
153         ldap_int_thread_pool_list =
154         LDAP_STAILQ_HEAD_INITIALIZER(ldap_int_thread_pool_list);
155
156 static ldap_pvt_thread_mutex_t ldap_pvt_thread_pool_mutex;
157
158 static void *ldap_int_thread_pool_wrapper( void *pool );
159
160 static ldap_pvt_thread_key_t    ldap_tpool_key;
161
162 /* Context of the main thread */
163 static ldap_int_thread_userctx_t ldap_int_main_thrctx;
164
165 int
166 ldap_int_thread_pool_startup ( void )
167 {
168         ldap_int_main_thrctx.ltu_id = ldap_pvt_thread_self();
169         ldap_pvt_thread_key_create( &ldap_tpool_key );
170         return ldap_pvt_thread_mutex_init(&ldap_pvt_thread_pool_mutex);
171 }
172
173 int
174 ldap_int_thread_pool_shutdown ( void )
175 {
176         struct ldap_int_thread_pool_s *pool;
177
178         while ((pool = LDAP_STAILQ_FIRST(&ldap_int_thread_pool_list)) != NULL) {
179                 (ldap_pvt_thread_pool_destroy)(&pool, 0); /* ignore thr_debug macro */
180         }
181         ldap_pvt_thread_mutex_destroy(&ldap_pvt_thread_pool_mutex);
182         ldap_pvt_thread_key_destroy( ldap_tpool_key );
183         return(0);
184 }
185
186
187 /* Create a thread pool */
188 int
189 ldap_pvt_thread_pool_init (
190         ldap_pvt_thread_pool_t *tpool,
191         int max_threads,
192         int max_pending )
193 {
194         ldap_pvt_thread_pool_t pool;
195         int rc;
196
197         /* multiple pools are currently not supported (ITS#4943) */
198         assert(!ldap_int_has_thread_pool);
199
200         if (! (0 <= max_threads && max_threads <= LDAP_MAXTHR))
201                 max_threads = 0;
202         if (! (1 <= max_pending && max_pending <= MAX_PENDING))
203                 max_pending = MAX_PENDING;
204
205         *tpool = NULL;
206         pool = (ldap_pvt_thread_pool_t) LDAP_CALLOC(1,
207                 sizeof(struct ldap_int_thread_pool_s));
208
209         if (pool == NULL) return(-1);
210
211         rc = ldap_pvt_thread_mutex_init(&pool->ltp_mutex);
212         if (rc != 0)
213                 return(rc);
214         rc = ldap_pvt_thread_cond_init(&pool->ltp_cond);
215         if (rc != 0)
216                 return(rc);
217         rc = ldap_pvt_thread_cond_init(&pool->ltp_pcond);
218         if (rc != 0)
219                 return(rc);
220
221         ldap_int_has_thread_pool = 1;
222
223         pool->ltp_max_count = max_threads;
224         SET_VARY_OPEN_COUNT(pool);
225         pool->ltp_max_pending = max_pending;
226
227         LDAP_STAILQ_INIT(&pool->ltp_pending_list);
228         pool->ltp_work_list = &pool->ltp_pending_list;
229         LDAP_SLIST_INIT(&pool->ltp_free_list);
230
231         ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
232         LDAP_STAILQ_INSERT_TAIL(&ldap_int_thread_pool_list, pool, ltp_next);
233         ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
234
235 #if 0
236         /* THIS WILL NOT WORK on some systems.  If the process
237          * forks after starting a thread, there is no guarantee
238          * that the thread will survive the fork.  For example,
239          * slapd forks in order to daemonize, and does so after
240          * calling ldap_pvt_thread_pool_init.  On some systems,
241          * this initial thread does not run in the child process,
242          * but ltp_open_count == 1, so two things happen: 
243          * 1) the first client connection fails, and 2) when
244          * slapd is kill'ed, it never terminates since it waits
245          * for all worker threads to exit. */
246
247         /* start up one thread, just so there is one. no need to
248          * lock the mutex right now, since no threads are running.
249          */
250         pool->ltp_open_count++;
251         SET_VARY_OPEN_COUNT(pool);
252
253         ldap_pvt_thread_t thr;
254         rc = ldap_pvt_thread_create( &thr, 1, ldap_int_thread_pool_wrapper, pool );
255
256         if( rc != 0) {
257                 /* couldn't start one?  then don't start any */
258                 ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
259                 LDAP_STAILQ_REMOVE(ldap_int_thread_pool_list, pool, 
260                         ldap_int_thread_pool_s, ltp_next);
261                 ldap_int_has_thread_pool = 0;
262                 ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
263                 ldap_pvt_thread_cond_destroy(&pool->ltp_pcond);
264                 ldap_pvt_thread_cond_destroy(&pool->ltp_cond);
265                 ldap_pvt_thread_mutex_destroy(&pool->ltp_mutex);
266                 LDAP_FREE(pool);
267                 return(-1);
268         }
269 #endif
270
271         *tpool = pool;
272         return(0);
273 }
274
275
276 /* Submit a task to be performed by the thread pool */
277 int
278 ldap_pvt_thread_pool_submit (
279         ldap_pvt_thread_pool_t *tpool,
280         ldap_pvt_thread_start_t *start_routine, void *arg )
281 {
282         struct ldap_int_thread_pool_s *pool;
283         ldap_int_thread_task_t *task;
284         ldap_pvt_thread_t thr;
285
286         if (tpool == NULL)
287                 return(-1);
288
289         pool = *tpool;
290
291         if (pool == NULL)
292                 return(-1);
293
294         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
295
296         if (pool->ltp_pending_count >= pool->ltp_max_pending)
297                 goto failed;
298
299         task = LDAP_SLIST_FIRST(&pool->ltp_free_list);
300         if (task) {
301                 LDAP_SLIST_REMOVE_HEAD(&pool->ltp_free_list, ltt_next.l);
302         } else {
303                 task = (ldap_int_thread_task_t *) LDAP_MALLOC(sizeof(*task));
304                 if (task == NULL)
305                         goto failed;
306         }
307
308         task->ltt_start_routine = start_routine;
309         task->ltt_arg = arg;
310
311         pool->ltp_pending_count++;
312         LDAP_STAILQ_INSERT_TAIL(&pool->ltp_pending_list, task, ltt_next.q);
313         if (pool->ltp_pause) {
314                 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
315                 return(0);
316         }
317         ldap_pvt_thread_cond_signal(&pool->ltp_cond);
318         if (pool->ltp_vary_open_count > 0 &&
319                 pool->ltp_open_count < pool->ltp_active_count+pool->ltp_pending_count)
320         {
321                 pool->ltp_starting++;
322                 pool->ltp_open_count++;
323                 SET_VARY_OPEN_COUNT(pool);
324
325                 if (0 != ldap_pvt_thread_create(
326                         &thr, 1, ldap_int_thread_pool_wrapper, pool))
327                 {
328                         /* couldn't create thread.  back out of
329                          * ltp_open_count and check for even worse things.
330                          */
331                         pool->ltp_starting--;
332                         pool->ltp_open_count--;
333                         SET_VARY_OPEN_COUNT(pool);
334
335                         if (pool->ltp_open_count == 0) {
336                                 /* no open threads at all?!?
337                                  */
338                                 ldap_int_thread_task_t *ptr;
339
340                                 /* let pool_destroy know there are no more threads */
341                                 ldap_pvt_thread_cond_signal(&pool->ltp_cond);
342
343                                 LDAP_STAILQ_FOREACH(ptr, &pool->ltp_pending_list, ltt_next.q)
344                                         if (ptr == task) break;
345                                 if (ptr == task) {
346                                         /* no open threads, task not handled, so
347                                          * back out of ltp_pending_count, free the task,
348                                          * report the error.
349                                          */
350                                         pool->ltp_pending_count--;
351                                         LDAP_STAILQ_REMOVE(&pool->ltp_pending_list, task,
352                                                 ldap_int_thread_task_s, ltt_next.q);
353                                         LDAP_SLIST_INSERT_HEAD(&pool->ltp_free_list, task,
354                                                 ltt_next.l);
355                                         goto failed;
356                                 }
357                         }
358                         /* there is another open thread, so this
359                          * task will be handled eventually.
360                          * continue on, we have signalled that
361                          * the task is waiting.
362                          */
363                 }
364         }
365
366         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
367         return(0);
368
369  failed:
370         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
371         return(-1);
372 }
373
374 /* Set max #threads.  value <= 0 means max supported #threads (LDAP_MAXTHR) */
375 int
376 ldap_pvt_thread_pool_maxthreads(
377         ldap_pvt_thread_pool_t *tpool,
378         int max_threads )
379 {
380         struct ldap_int_thread_pool_s *pool;
381
382         if (! (0 <= max_threads && max_threads <= LDAP_MAXTHR))
383                 max_threads = 0;
384
385         if (tpool == NULL)
386                 return(-1);
387
388         pool = *tpool;
389
390         if (pool == NULL)
391                 return(-1);
392
393         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
394
395         pool->ltp_max_count = max_threads;
396         SET_VARY_OPEN_COUNT(pool);
397
398         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
399         return(0);
400 }
401
402 /* Inspect the pool */
403 int
404 ldap_pvt_thread_pool_query(
405         ldap_pvt_thread_pool_t *tpool,
406         ldap_pvt_thread_pool_param_t param,
407         void *value )
408 {
409         struct ldap_int_thread_pool_s   *pool;
410         int                             count = -1;
411
412         if ( tpool == NULL || value == NULL ) {
413                 return -1;
414         }
415
416         pool = *tpool;
417
418         if ( pool == NULL ) {
419                 return 0;
420         }
421
422         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
423         switch ( param ) {
424         case LDAP_PVT_THREAD_POOL_PARAM_MAX:
425                 count = pool->ltp_max_count;
426                 break;
427
428         case LDAP_PVT_THREAD_POOL_PARAM_MAX_PENDING:
429                 count = pool->ltp_max_pending;
430                 if (count < 0)
431                         count = -count;
432                 if (count == MAX_PENDING)
433                         count = 0;
434                 break;
435
436         case LDAP_PVT_THREAD_POOL_PARAM_OPEN:
437                 count = pool->ltp_open_count;
438                 break;
439
440         case LDAP_PVT_THREAD_POOL_PARAM_STARTING:
441                 count = pool->ltp_starting;
442                 break;
443
444         case LDAP_PVT_THREAD_POOL_PARAM_ACTIVE:
445                 count = pool->ltp_active_count;
446                 break;
447
448         case LDAP_PVT_THREAD_POOL_PARAM_PAUSING:
449                 count = pool->ltp_pause;
450                 break;
451
452         case LDAP_PVT_THREAD_POOL_PARAM_PENDING:
453                 count = pool->ltp_pending_count;
454                 break;
455
456         case LDAP_PVT_THREAD_POOL_PARAM_BACKLOAD:
457                 count = pool->ltp_pending_count + pool->ltp_active_count;
458                 break;
459
460         case LDAP_PVT_THREAD_POOL_PARAM_ACTIVE_MAX:
461                 break;
462
463         case LDAP_PVT_THREAD_POOL_PARAM_PENDING_MAX:
464                 break;
465
466         case LDAP_PVT_THREAD_POOL_PARAM_BACKLOAD_MAX:
467                 break;
468
469         case LDAP_PVT_THREAD_POOL_PARAM_STATE:
470                 *((char **)value) =
471                         pool->ltp_pause ? "pausing" :
472                         !pool->ltp_finishing ? "running" :
473                         pool->ltp_pending_count ? "finishing" : "stopping";
474                 break;
475
476         case LDAP_PVT_THREAD_POOL_PARAM_UNKNOWN:
477                 break;
478         }
479         ldap_pvt_thread_mutex_unlock( &pool->ltp_mutex );
480
481         if ( count > -1 ) {
482                 *((int *)value) = count;
483         }
484
485         return ( count == -1 ? -1 : 0 );
486 }
487
488 /*
489  * true if pool is pausing; does not lock any mutex to check.
490  * 0 if not pause, 1 if pause, -1 if error or no pool.
491  */
492 int
493 ldap_pvt_thread_pool_pausing( ldap_pvt_thread_pool_t *tpool )
494 {
495         int rc = -1;
496         struct ldap_int_thread_pool_s *pool;
497
498         if ( tpool != NULL && (pool = *tpool) != NULL ) {
499                 rc = pool->ltp_pause;
500         }
501
502         return rc;
503 }
504
505 /*
506  * wrapper for ldap_pvt_thread_pool_query(), left around
507  * for backwards compatibility
508  */
509 int
510 ldap_pvt_thread_pool_backload ( ldap_pvt_thread_pool_t *tpool )
511 {
512         int     rc, count;
513
514         rc = ldap_pvt_thread_pool_query( tpool,
515                 LDAP_PVT_THREAD_POOL_PARAM_BACKLOAD, (void *)&count );
516
517         if ( rc == 0 ) {
518                 return count;
519         }
520
521         return rc;
522 }
523
524 static void
525 flush_pending_list( struct ldap_int_thread_pool_s *pool )
526 {
527         ldap_int_thread_task_t *task;
528
529         while ((task = LDAP_STAILQ_FIRST(&pool->ltp_pending_list)) != NULL) {
530                 LDAP_STAILQ_REMOVE_HEAD(&pool->ltp_pending_list, ltt_next.q);
531                 LDAP_FREE(task);
532         }
533 }
534
535 /* Destroy the pool after making its threads finish */
536 int
537 ldap_pvt_thread_pool_destroy ( ldap_pvt_thread_pool_t *tpool, int run_pending )
538 {
539         struct ldap_int_thread_pool_s *pool, *pptr;
540         ldap_int_thread_task_t *task;
541
542         if (tpool == NULL)
543                 return(-1);
544
545         pool = *tpool;
546
547         if (pool == NULL) return(-1);
548
549         ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
550         LDAP_STAILQ_FOREACH(pptr, &ldap_int_thread_pool_list, ltp_next)
551                 if (pptr == pool) break;
552         if (pptr == pool)
553                 LDAP_STAILQ_REMOVE(&ldap_int_thread_pool_list, pool,
554                         ldap_int_thread_pool_s, ltp_next);
555         ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
556
557         if (pool != pptr) return(-1);
558
559         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
560
561         pool->ltp_finishing = 1;
562         SET_VARY_OPEN_COUNT(pool);
563         if (pool->ltp_max_pending > 0)
564                 pool->ltp_max_pending = -pool->ltp_max_pending;
565         if (!run_pending)
566                 flush_pending_list(pool);
567
568         while (pool->ltp_open_count) {
569                 if (!pool->ltp_pause)
570                         ldap_pvt_thread_cond_broadcast(&pool->ltp_cond);
571                 ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
572         }
573         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
574
575         flush_pending_list(pool);
576
577         while ((task = LDAP_SLIST_FIRST(&pool->ltp_free_list)) != NULL)
578         {
579                 LDAP_SLIST_REMOVE_HEAD(&pool->ltp_free_list, ltt_next.l);
580                 LDAP_FREE(task);
581         }
582
583         ldap_pvt_thread_cond_destroy(&pool->ltp_pcond);
584         ldap_pvt_thread_cond_destroy(&pool->ltp_cond);
585         ldap_pvt_thread_mutex_destroy(&pool->ltp_mutex);
586         LDAP_FREE(pool);
587         *tpool = NULL;
588         ldap_int_has_thread_pool = 0;
589         return(0);
590 }
591
592 /* Thread loop.  Accept and handle submitted tasks. */
593 static void *
594 ldap_int_thread_pool_wrapper ( 
595         void *xpool )
596 {
597         struct ldap_int_thread_pool_s *pool = xpool;
598         ldap_int_thread_task_t *task;
599         ldap_int_thread_userctx_t ctx, *kctx;
600         unsigned i, keyslot, hash;
601
602         assert(pool != NULL);
603
604         for ( i=0; i<MAXKEYS; i++ ) {
605                 ctx.ltu_key[i].ltk_key = NULL;
606         }
607
608         ctx.ltu_id = ldap_pvt_thread_self();
609         TID_HASH(ctx.ltu_id, hash);
610
611         ldap_pvt_thread_key_setdata( ldap_tpool_key, &ctx );
612
613         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
614
615         /* thread_keys[] is read-only when paused */
616         while (pool->ltp_pause)
617                 ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
618
619         /* find a key slot to give this thread ID and store a
620          * pointer to our keys there; start at the thread ID
621          * itself (mod LDAP_MAXTHR) and look for an empty slot.
622          */
623         ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
624         for (keyslot = hash & (LDAP_MAXTHR-1);
625                 (kctx = thread_keys[keyslot].ctx) && kctx != DELETED_THREAD_CTX;
626                 keyslot = (keyslot+1) & (LDAP_MAXTHR-1));
627         thread_keys[keyslot].ctx = &ctx;
628         ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
629
630         pool->ltp_starting--;
631
632         for (;;) {
633                 task = LDAP_STAILQ_FIRST(pool->ltp_work_list);
634                 if (task == NULL) {     /* paused or no pending tasks */
635                         if (pool->ltp_vary_open_count < 0) {
636                                 /* not paused, and either finishing or too many
637                                  * threads running (can happen if ltp_max_count
638                                  * was reduced) so let this thread die.
639                                  */
640                                 break;
641                         }
642
643                         /* we could check an idle timer here, and let the
644                          * thread die if it has been inactive for a while.
645                          * only die if there are other open threads (i.e.,
646                          * always have at least one thread open).  the check
647                          * should be like this:
648                          *   if (pool->ltp_open_count > 1 && pool->ltp_starting == 0)
649                          *       check timer, wait if ltp_pause, leave thread (break;)
650                          *
651                          * Just use pthread_cond_timedwait if we want to
652                          * check idle time.
653                          */
654
655                         ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
656                         continue;
657                 }
658
659                 LDAP_STAILQ_REMOVE_HEAD(&pool->ltp_pending_list, ltt_next.q);
660                 pool->ltp_pending_count--;
661                 pool->ltp_active_count++;
662                 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
663
664                 task->ltt_start_routine(&ctx, task->ltt_arg);
665
666                 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
667                 LDAP_SLIST_INSERT_HEAD(&pool->ltp_free_list, task, ltt_next.l);
668                 pool->ltp_active_count--;
669                 /* let pool_pause know when it is the sole active thread */
670                 if (pool->ltp_active_count < 2)
671                         ldap_pvt_thread_cond_signal(&pool->ltp_pcond);
672         }
673
674         /* The ltp_mutex lock protects ctx->ltu_key from pool_purgekey()
675          * during this call, since it prevents new pauses. */
676         ldap_pvt_thread_pool_context_reset(&ctx);
677
678         ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
679         thread_keys[keyslot].ctx = DELETED_THREAD_CTX;
680         ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
681
682         pool->ltp_open_count--;
683         SET_VARY_OPEN_COUNT(pool);
684         /* let pool_destroy know we're all done */
685         if (pool->ltp_open_count < 1)
686                 ldap_pvt_thread_cond_signal(&pool->ltp_cond);
687
688         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
689
690         ldap_pvt_thread_exit(NULL);
691         return(NULL);
692 }
693
694 /* Pause the pool.  Return when all other threads are paused. */
695 int
696 ldap_pvt_thread_pool_pause ( 
697         ldap_pvt_thread_pool_t *tpool )
698 {
699         struct ldap_int_thread_pool_s *pool;
700
701         if (tpool == NULL)
702                 return(-1);
703
704         pool = *tpool;
705
706         if (pool == NULL)
707                 return(0);
708
709         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
710
711         /* If someone else has already requested a pause, we have to wait */
712         if (pool->ltp_pause) {
713                 pool->ltp_pending_count++;
714                 pool->ltp_active_count--;
715                 /* let the other pool_pause() know when it can proceed */
716                 if (pool->ltp_active_count < 2)
717                         ldap_pvt_thread_cond_signal(&pool->ltp_pcond);
718                 do {
719                         ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
720                 } while (pool->ltp_pause);
721                 pool->ltp_pending_count--;
722                 pool->ltp_active_count++;
723         }
724
725         /* Wait for everyone else to pause or finish */
726         pool->ltp_pause = 1;
727         SET_VARY_OPEN_COUNT(pool);
728         /* Hide pending tasks from ldap_pvt_thread_pool_wrapper() */
729         pool->ltp_work_list = &empty_pending_list;
730
731         while (pool->ltp_active_count > 1) {
732                 ldap_pvt_thread_cond_wait(&pool->ltp_pcond, &pool->ltp_mutex);
733         }
734
735         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
736         return(0);
737 }
738
739 /* End a pause */
740 int
741 ldap_pvt_thread_pool_resume ( 
742         ldap_pvt_thread_pool_t *tpool )
743 {
744         struct ldap_int_thread_pool_s *pool;
745
746         if (tpool == NULL)
747                 return(-1);
748
749         pool = *tpool;
750
751         if (pool == NULL)
752                 return(0);
753
754         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
755
756         pool->ltp_pause = 0;
757         SET_VARY_OPEN_COUNT(pool);
758         pool->ltp_work_list = &pool->ltp_pending_list;
759
760         if (!pool->ltp_finishing)
761                 ldap_pvt_thread_cond_broadcast(&pool->ltp_cond);
762
763         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
764         return(0);
765 }
766
767 /*
768  * Get the key's data and optionally free function in the given context.
769  */
770 int ldap_pvt_thread_pool_getkey(
771         void *xctx,
772         void *key,
773         void **data,
774         ldap_pvt_thread_pool_keyfree_t **kfree )
775 {
776         ldap_int_thread_userctx_t *ctx = xctx;
777         int i;
778
779         if ( !ctx || !key || !data ) return EINVAL;
780
781         for ( i=0; i<MAXKEYS && ctx->ltu_key[i].ltk_key; i++ ) {
782                 if ( ctx->ltu_key[i].ltk_key == key ) {
783                         *data = ctx->ltu_key[i].ltk_data;
784                         if ( kfree ) *kfree = ctx->ltu_key[i].ltk_free;
785                         return 0;
786                 }
787         }
788         return ENOENT;
789 }
790
791 static void
792 clear_key_idx( ldap_int_thread_userctx_t *ctx, int i )
793 {
794         for ( ; i < MAXKEYS-1 && ctx->ltu_key[i+1].ltk_key; i++ )
795                 ctx->ltu_key[i] = ctx->ltu_key[i+1];
796         ctx->ltu_key[i].ltk_key = NULL;
797 }
798
799 /*
800  * Set or remove data for the key in the given context.
801  * key can be any unique pointer.
802  * kfree() is an optional function to free the data (but not the key):
803  *   pool_context_reset() and pool_purgekey() call kfree(key, data),
804  *   but pool_setkey() does not.  For pool_setkey() it is the caller's
805  *   responsibility to free any existing data with the same key.
806  *   kfree() must not call functions taking a tpool argument.
807  */
808 int ldap_pvt_thread_pool_setkey(
809         void *xctx,
810         void *key,
811         void *data,
812         ldap_pvt_thread_pool_keyfree_t *kfree,
813         void **olddatap,
814         ldap_pvt_thread_pool_keyfree_t **oldkfreep )
815 {
816         ldap_int_thread_userctx_t *ctx = xctx;
817         int i, found;
818
819         if ( !ctx || !key ) return EINVAL;
820
821         for ( i=found=0; i<MAXKEYS; i++ ) {
822                 if ( ctx->ltu_key[i].ltk_key == key ) {
823                         found = 1;
824                         break;
825                 } else if ( !ctx->ltu_key[i].ltk_key ) {
826                         break;
827                 }
828         }
829
830         if ( olddatap ) {
831                 if ( found ) {
832                         *olddatap = ctx->ltu_key[i].ltk_data;
833                 } else {
834                         *olddatap = NULL;
835                 }
836         }
837
838         if ( oldkfreep ) {
839                 if ( found ) {
840                         *oldkfreep = ctx->ltu_key[i].ltk_free;
841                 } else {
842                         *oldkfreep = 0;
843                 }
844         }
845
846         if ( data || kfree ) {
847                 if ( i>=MAXKEYS )
848                         return ENOMEM;
849                 ctx->ltu_key[i].ltk_key = key;
850                 ctx->ltu_key[i].ltk_data = data;
851                 ctx->ltu_key[i].ltk_free = kfree;
852         } else if ( found ) {
853                 clear_key_idx( ctx, i );
854         }
855
856         return 0;
857 }
858
859 /* Free all elements with this key, no matter which thread they're in.
860  * May only be called while the pool is paused.
861  */
862 void ldap_pvt_thread_pool_purgekey( void *key )
863 {
864         int i, j;
865         ldap_int_thread_userctx_t *ctx;
866
867         assert ( key != NULL );
868
869         for ( i=0; i<LDAP_MAXTHR; i++ ) {
870                 ctx = thread_keys[i].ctx;
871                 if ( ctx && ctx != DELETED_THREAD_CTX ) {
872                         for ( j=0; j<MAXKEYS && ctx->ltu_key[j].ltk_key; j++ ) {
873                                 if ( ctx->ltu_key[j].ltk_key == key ) {
874                                         if (ctx->ltu_key[j].ltk_free)
875                                                 ctx->ltu_key[j].ltk_free( ctx->ltu_key[j].ltk_key,
876                                                 ctx->ltu_key[j].ltk_data );
877                                         clear_key_idx( ctx, j );
878                                         break;
879                                 }
880                         }
881                 }
882         }
883 }
884
885 /*
886  * Find the context of the current thread.
887  * This is necessary if the caller does not have access to the
888  * thread context handle (for example, a slapd plugin calling
889  * slapi_search_internal()). No doubt it is more efficient
890  * for the application to keep track of the thread context
891  * handles itself.
892  */
893 void *ldap_pvt_thread_pool_context( )
894 {
895         void *ctx = NULL;
896
897         ldap_pvt_thread_key_getdata( ldap_tpool_key, &ctx );
898         return ctx ? ctx : &ldap_int_main_thrctx;
899 }
900
901 /*
902  * Free the context's keys.
903  * Must not call functions taking a tpool argument (because this
904  * thread already holds ltp_mutex when called from pool_wrapper()).
905  */
906 void ldap_pvt_thread_pool_context_reset( void *vctx )
907 {
908         ldap_int_thread_userctx_t *ctx = vctx;
909         int i;
910
911         for ( i=MAXKEYS-1; i>=0; i--) {
912                 if ( !ctx->ltu_key[i].ltk_key )
913                         continue;
914                 if ( ctx->ltu_key[i].ltk_free )
915                         ctx->ltu_key[i].ltk_free( ctx->ltu_key[i].ltk_key,
916                         ctx->ltu_key[i].ltk_data );
917                 ctx->ltu_key[i].ltk_key = NULL;
918         }
919 }
920
921 ldap_pvt_thread_t ldap_pvt_thread_pool_tid( void *vctx )
922 {
923         ldap_int_thread_userctx_t *ctx = vctx;
924
925         return ctx->ltu_id;
926 }
927 #endif /* LDAP_THREAD_HAVE_TPOOL */