]> git.sur5r.net Git - openldap/blob - libraries/libldap_r/tpool.c
Merge remote branch 'origin/mdb.master'
[openldap] / libraries / libldap_r / tpool.c
1 /* $OpenLDAP$ */
2 /* This work is part of OpenLDAP Software <http://www.openldap.org/>.
3  *
4  * Copyright 1998-2012 The OpenLDAP Foundation.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted only as authorized by the OpenLDAP
9  * Public License.
10  *
11  * A copy of this license is available in file LICENSE in the
12  * top-level directory of the distribution or, alternatively, at
13  * <http://www.OpenLDAP.org/license.html>.
14  */
15
16 #include "portable.h"
17
18 #include <stdio.h>
19
20 #include <ac/signal.h>
21 #include <ac/stdarg.h>
22 #include <ac/stdlib.h>
23 #include <ac/string.h>
24 #include <ac/time.h>
25 #include <ac/errno.h>
26
27 #include "ldap-int.h"
28 #include "ldap_pvt_thread.h" /* Get the thread interface */
29 #include "ldap_queue.h"
30 #define LDAP_THREAD_POOL_IMPLEMENTATION
31 #include "ldap_thr_debug.h"  /* May rename symbols defined below */
32
33 #ifndef LDAP_THREAD_HAVE_TPOOL
34
35 /* Thread-specific key with data and optional free function */
36 typedef struct ldap_int_tpool_key_s {
37         void *ltk_key;
38         void *ltk_data;
39         ldap_pvt_thread_pool_keyfree_t *ltk_free;
40 } ldap_int_tpool_key_t;
41
42 /* Max number of thread-specific keys we store per thread.
43  * We don't expect to use many...
44  */
45 #define MAXKEYS 32
46
47 /* Max number of threads */
48 #define LDAP_MAXTHR     1024    /* must be a power of 2 */
49
50 /* (Theoretical) max number of pending requests */
51 #define MAX_PENDING (INT_MAX/2) /* INT_MAX - (room to avoid overflow) */
52
53 /* pool->ltp_pause values */
54 enum { NOT_PAUSED = 0, WANT_PAUSE = 1, PAUSED = 2 };
55
56 /* Context: thread ID and thread-specific key/data pairs */
57 typedef struct ldap_int_thread_userctx_s {
58         ldap_pvt_thread_t ltu_id;
59         ldap_int_tpool_key_t ltu_key[MAXKEYS];
60 } ldap_int_thread_userctx_t;
61
62
63 /* Simple {thread ID -> context} hash table; key=ctx->ltu_id.
64  * Protected by ldap_pvt_thread_pool_mutex except during pauses,
65  * when it is read-only (used by pool_purgekey and pool_context).
66  * Protected by tpool->ltp_mutex during pauses.
67  */
68 static struct {
69         ldap_int_thread_userctx_t *ctx;
70         /* ctx is valid when not NULL or DELETED_THREAD_CTX */
71 #       define DELETED_THREAD_CTX (&ldap_int_main_thrctx + 1) /* dummy addr */
72 } thread_keys[LDAP_MAXTHR];
73
74 #define TID_HASH(tid, hash) do { \
75         unsigned const char *ptr_ = (unsigned const char *)&(tid); \
76         unsigned i_; \
77         for (i_ = 0, (hash) = ptr_[0]; ++i_ < sizeof(tid);) \
78                 (hash) += ((hash) << 5) ^ ptr_[i_]; \
79 } while(0)
80
81
82 /* Task for a thread to perform */
83 typedef struct ldap_int_thread_task_s {
84         union {
85                 LDAP_STAILQ_ENTRY(ldap_int_thread_task_s) q;
86                 LDAP_SLIST_ENTRY(ldap_int_thread_task_s) l;
87         } ltt_next;
88         ldap_pvt_thread_start_t *ltt_start_routine;
89         void *ltt_arg;
90 } ldap_int_thread_task_t;
91
92 typedef LDAP_STAILQ_HEAD(tcq, ldap_int_thread_task_s) ldap_int_tpool_plist_t;
93
94 struct ldap_int_thread_pool_s {
95         LDAP_STAILQ_ENTRY(ldap_int_thread_pool_s) ltp_next;
96
97         /* protect members below, and protect thread_keys[] during pauses */
98         ldap_pvt_thread_mutex_t ltp_mutex;
99
100         /* not paused and something to do for pool_<wrapper/pause/destroy>() */
101         ldap_pvt_thread_cond_t ltp_cond;
102
103         /* ltp_active_count <= 1 && ltp_pause */
104         ldap_pvt_thread_cond_t ltp_pcond;
105
106         /* ltp_pause == 0 ? &ltp_pending_list : &empty_pending_list,
107          * maintaned to reduce work for pool_wrapper()
108          */
109         ldap_int_tpool_plist_t *ltp_work_list;
110
111         /* pending tasks, and unused task objects */
112         ldap_int_tpool_plist_t ltp_pending_list;
113         LDAP_SLIST_HEAD(tcl, ldap_int_thread_task_s) ltp_free_list;
114
115         /* The pool is finishing, waiting for its threads to close.
116          * They close when ltp_pending_list is done.  pool_submit()
117          * rejects new tasks.  ltp_max_pending = -(its old value).
118          */
119         int ltp_finishing;
120
121         /* Some active task needs to be the sole active task.
122          * Atomic variable so ldap_pvt_thread_pool_pausing() can read it.
123          * Note: Pauses adjust ltp_<open_count/vary_open_count/work_list>,
124          * so pool_<submit/wrapper>() mostly can avoid testing ltp_pause.
125          */
126         volatile sig_atomic_t ltp_pause;
127
128         /* Max number of threads in pool, or 0 for default (LDAP_MAXTHR) */
129         int ltp_max_count;
130
131         /* Max pending + paused + idle tasks, negated when ltp_finishing */
132         int ltp_max_pending;
133
134         int ltp_pending_count;          /* Pending + paused + idle tasks */
135         int ltp_active_count;           /* Active, not paused/idle tasks */
136         int ltp_open_count;                     /* Number of threads, negated when ltp_pause */
137         int ltp_starting;                       /* Currenlty starting threads */
138
139         /* >0 if paused or we may open a thread, <0 if we should close a thread.
140          * Updated when ltp_<finishing/pause/max_count/open_count> change.
141          * Maintained to reduce the time ltp_mutex must be locked in
142          * ldap_pvt_thread_pool_<submit/wrapper>().
143          */
144         int ltp_vary_open_count;
145 #       define SET_VARY_OPEN_COUNT(pool)        \
146                 ((pool)->ltp_vary_open_count =  \
147                  (pool)->ltp_pause      ?  1 :  \
148                  (pool)->ltp_finishing  ? -1 :  \
149                  ((pool)->ltp_max_count ? (pool)->ltp_max_count : LDAP_MAXTHR) \
150                  - (pool)->ltp_open_count)
151 };
152
153 static ldap_int_tpool_plist_t empty_pending_list =
154         LDAP_STAILQ_HEAD_INITIALIZER(empty_pending_list);
155
156 static int ldap_int_has_thread_pool = 0;
157 static LDAP_STAILQ_HEAD(tpq, ldap_int_thread_pool_s)
158         ldap_int_thread_pool_list =
159         LDAP_STAILQ_HEAD_INITIALIZER(ldap_int_thread_pool_list);
160
161 static ldap_pvt_thread_mutex_t ldap_pvt_thread_pool_mutex;
162
163 static void *ldap_int_thread_pool_wrapper( void *pool );
164
165 static ldap_pvt_thread_key_t    ldap_tpool_key;
166
167 /* Context of the main thread */
168 static ldap_int_thread_userctx_t ldap_int_main_thrctx;
169
170 int
171 ldap_int_thread_pool_startup ( void )
172 {
173         ldap_int_main_thrctx.ltu_id = ldap_pvt_thread_self();
174         ldap_pvt_thread_key_create( &ldap_tpool_key );
175         return ldap_pvt_thread_mutex_init(&ldap_pvt_thread_pool_mutex);
176 }
177
178 int
179 ldap_int_thread_pool_shutdown ( void )
180 {
181         struct ldap_int_thread_pool_s *pool;
182
183         while ((pool = LDAP_STAILQ_FIRST(&ldap_int_thread_pool_list)) != NULL) {
184                 (ldap_pvt_thread_pool_destroy)(&pool, 0); /* ignore thr_debug macro */
185         }
186         ldap_pvt_thread_mutex_destroy(&ldap_pvt_thread_pool_mutex);
187         ldap_pvt_thread_key_destroy( ldap_tpool_key );
188         return(0);
189 }
190
191
192 /* Create a thread pool */
193 int
194 ldap_pvt_thread_pool_init (
195         ldap_pvt_thread_pool_t *tpool,
196         int max_threads,
197         int max_pending )
198 {
199         ldap_pvt_thread_pool_t pool;
200         int rc;
201
202         /* multiple pools are currently not supported (ITS#4943) */
203         assert(!ldap_int_has_thread_pool);
204
205         if (! (0 <= max_threads && max_threads <= LDAP_MAXTHR))
206                 max_threads = 0;
207         if (! (1 <= max_pending && max_pending <= MAX_PENDING))
208                 max_pending = MAX_PENDING;
209
210         *tpool = NULL;
211         pool = (ldap_pvt_thread_pool_t) LDAP_CALLOC(1,
212                 sizeof(struct ldap_int_thread_pool_s));
213
214         if (pool == NULL) return(-1);
215
216         rc = ldap_pvt_thread_mutex_init(&pool->ltp_mutex);
217         if (rc != 0)
218                 return(rc);
219         rc = ldap_pvt_thread_cond_init(&pool->ltp_cond);
220         if (rc != 0)
221                 return(rc);
222         rc = ldap_pvt_thread_cond_init(&pool->ltp_pcond);
223         if (rc != 0)
224                 return(rc);
225
226         ldap_int_has_thread_pool = 1;
227
228         pool->ltp_max_count = max_threads;
229         SET_VARY_OPEN_COUNT(pool);
230         pool->ltp_max_pending = max_pending;
231
232         LDAP_STAILQ_INIT(&pool->ltp_pending_list);
233         pool->ltp_work_list = &pool->ltp_pending_list;
234         LDAP_SLIST_INIT(&pool->ltp_free_list);
235
236         ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
237         LDAP_STAILQ_INSERT_TAIL(&ldap_int_thread_pool_list, pool, ltp_next);
238         ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
239
240         /* Start no threads just yet.  That can break if the process forks
241          * later, as slapd does in order to daemonize.  On at least POSIX,
242          * only the forking thread would survive in the child.  Yet fork()
243          * can't unlock/clean up other threads' locks and data structures,
244          * unless pthread_atfork() handlers have been set up to do so.
245          */
246
247         *tpool = pool;
248         return(0);
249 }
250
251
252 /* Submit a task to be performed by the thread pool */
253 int
254 ldap_pvt_thread_pool_submit (
255         ldap_pvt_thread_pool_t *tpool,
256         ldap_pvt_thread_start_t *start_routine, void *arg )
257 {
258         struct ldap_int_thread_pool_s *pool;
259         ldap_int_thread_task_t *task;
260         ldap_pvt_thread_t thr;
261
262         if (tpool == NULL)
263                 return(-1);
264
265         pool = *tpool;
266
267         if (pool == NULL)
268                 return(-1);
269
270         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
271
272         if (pool->ltp_pending_count >= pool->ltp_max_pending)
273                 goto failed;
274
275         task = LDAP_SLIST_FIRST(&pool->ltp_free_list);
276         if (task) {
277                 LDAP_SLIST_REMOVE_HEAD(&pool->ltp_free_list, ltt_next.l);
278         } else {
279                 task = (ldap_int_thread_task_t *) LDAP_MALLOC(sizeof(*task));
280                 if (task == NULL)
281                         goto failed;
282         }
283
284         task->ltt_start_routine = start_routine;
285         task->ltt_arg = arg;
286
287         pool->ltp_pending_count++;
288         LDAP_STAILQ_INSERT_TAIL(&pool->ltp_pending_list, task, ltt_next.q);
289
290         /* true if ltp_pause != 0 or we should open (create) a thread */
291         if (pool->ltp_vary_open_count > 0 &&
292                 pool->ltp_open_count < pool->ltp_active_count+pool->ltp_pending_count)
293         {
294                 if (pool->ltp_pause)
295                         goto done;
296
297                 pool->ltp_starting++;
298                 pool->ltp_open_count++;
299                 SET_VARY_OPEN_COUNT(pool);
300
301                 if (0 != ldap_pvt_thread_create(
302                         &thr, 1, ldap_int_thread_pool_wrapper, pool))
303                 {
304                         /* couldn't create thread.  back out of
305                          * ltp_open_count and check for even worse things.
306                          */
307                         pool->ltp_starting--;
308                         pool->ltp_open_count--;
309                         SET_VARY_OPEN_COUNT(pool);
310
311                         if (pool->ltp_open_count == 0) {
312                                 /* no open threads at all?!?
313                                  */
314                                 ldap_int_thread_task_t *ptr;
315
316                                 /* let pool_destroy know there are no more threads */
317                                 ldap_pvt_thread_cond_signal(&pool->ltp_cond);
318
319                                 LDAP_STAILQ_FOREACH(ptr, &pool->ltp_pending_list, ltt_next.q)
320                                         if (ptr == task) break;
321                                 if (ptr == task) {
322                                         /* no open threads, task not handled, so
323                                          * back out of ltp_pending_count, free the task,
324                                          * report the error.
325                                          */
326                                         pool->ltp_pending_count--;
327                                         LDAP_STAILQ_REMOVE(&pool->ltp_pending_list, task,
328                                                 ldap_int_thread_task_s, ltt_next.q);
329                                         LDAP_SLIST_INSERT_HEAD(&pool->ltp_free_list, task,
330                                                 ltt_next.l);
331                                         goto failed;
332                                 }
333                         }
334                         /* there is another open thread, so this
335                          * task will be handled eventually.
336                          */
337                 }
338         }
339         ldap_pvt_thread_cond_signal(&pool->ltp_cond);
340
341  done:
342         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
343         return(0);
344
345  failed:
346         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
347         return(-1);
348 }
349
350 static void *
351 no_task( void *ctx, void *arg )
352 {
353         return NULL;
354 }
355
356 /* Cancel a pending task that was previously submitted.
357  * Return 1 if the task was successfully cancelled, 0 if
358  * not found, -1 for invalid parameters
359  */
360 int
361 ldap_pvt_thread_pool_retract (
362         ldap_pvt_thread_pool_t *tpool,
363         ldap_pvt_thread_start_t *start_routine, void *arg )
364 {
365         struct ldap_int_thread_pool_s *pool;
366         ldap_int_thread_task_t *task;
367
368         if (tpool == NULL)
369                 return(-1);
370
371         pool = *tpool;
372
373         if (pool == NULL)
374                 return(-1);
375
376         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
377         LDAP_STAILQ_FOREACH(task, &pool->ltp_pending_list, ltt_next.q)
378                 if (task->ltt_start_routine == start_routine &&
379                         task->ltt_arg == arg) {
380                         /* Could LDAP_STAILQ_REMOVE the task, but that
381                          * walks ltp_pending_list again to find it.
382                          */
383                         task->ltt_start_routine = no_task;
384                         task->ltt_arg = NULL;
385                         break;
386                 }
387         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
388         return task != NULL;
389 }
390
391 /* Set max #threads.  value <= 0 means max supported #threads (LDAP_MAXTHR) */
392 int
393 ldap_pvt_thread_pool_maxthreads(
394         ldap_pvt_thread_pool_t *tpool,
395         int max_threads )
396 {
397         struct ldap_int_thread_pool_s *pool;
398
399         if (! (0 <= max_threads && max_threads <= LDAP_MAXTHR))
400                 max_threads = 0;
401
402         if (tpool == NULL)
403                 return(-1);
404
405         pool = *tpool;
406
407         if (pool == NULL)
408                 return(-1);
409
410         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
411
412         pool->ltp_max_count = max_threads;
413         SET_VARY_OPEN_COUNT(pool);
414
415         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
416         return(0);
417 }
418
419 /* Inspect the pool */
420 int
421 ldap_pvt_thread_pool_query(
422         ldap_pvt_thread_pool_t *tpool,
423         ldap_pvt_thread_pool_param_t param,
424         void *value )
425 {
426         struct ldap_int_thread_pool_s   *pool;
427         int                             count = -1;
428
429         if ( tpool == NULL || value == NULL ) {
430                 return -1;
431         }
432
433         pool = *tpool;
434
435         if ( pool == NULL ) {
436                 return 0;
437         }
438
439         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
440         switch ( param ) {
441         case LDAP_PVT_THREAD_POOL_PARAM_MAX:
442                 count = pool->ltp_max_count;
443                 break;
444
445         case LDAP_PVT_THREAD_POOL_PARAM_MAX_PENDING:
446                 count = pool->ltp_max_pending;
447                 if (count < 0)
448                         count = -count;
449                 if (count == MAX_PENDING)
450                         count = 0;
451                 break;
452
453         case LDAP_PVT_THREAD_POOL_PARAM_OPEN:
454                 count = pool->ltp_open_count;
455                 if (count < 0)
456                         count = -count;
457                 break;
458
459         case LDAP_PVT_THREAD_POOL_PARAM_STARTING:
460                 count = pool->ltp_starting;
461                 break;
462
463         case LDAP_PVT_THREAD_POOL_PARAM_ACTIVE:
464                 count = pool->ltp_active_count;
465                 break;
466
467         case LDAP_PVT_THREAD_POOL_PARAM_PAUSING:
468                 count = (pool->ltp_pause != 0);
469                 break;
470
471         case LDAP_PVT_THREAD_POOL_PARAM_PENDING:
472                 count = pool->ltp_pending_count;
473                 break;
474
475         case LDAP_PVT_THREAD_POOL_PARAM_BACKLOAD:
476                 count = pool->ltp_pending_count + pool->ltp_active_count;
477                 break;
478
479         case LDAP_PVT_THREAD_POOL_PARAM_ACTIVE_MAX:
480                 break;
481
482         case LDAP_PVT_THREAD_POOL_PARAM_PENDING_MAX:
483                 break;
484
485         case LDAP_PVT_THREAD_POOL_PARAM_BACKLOAD_MAX:
486                 break;
487
488         case LDAP_PVT_THREAD_POOL_PARAM_STATE:
489                 *((char **)value) =
490                         pool->ltp_pause ? "pausing" :
491                         !pool->ltp_finishing ? "running" :
492                         pool->ltp_pending_count ? "finishing" : "stopping";
493                 break;
494
495         case LDAP_PVT_THREAD_POOL_PARAM_UNKNOWN:
496                 break;
497         }
498         ldap_pvt_thread_mutex_unlock( &pool->ltp_mutex );
499
500         if ( count > -1 ) {
501                 *((int *)value) = count;
502         }
503
504         return ( count == -1 ? -1 : 0 );
505 }
506
507 /*
508  * true if pool is pausing; does not lock any mutex to check.
509  * 0 if not pause, 1 if pause, -1 if error or no pool.
510  */
511 int
512 ldap_pvt_thread_pool_pausing( ldap_pvt_thread_pool_t *tpool )
513 {
514         int rc = -1;
515         struct ldap_int_thread_pool_s *pool;
516
517         if ( tpool != NULL && (pool = *tpool) != NULL ) {
518                 rc = (pool->ltp_pause != 0);
519         }
520
521         return rc;
522 }
523
524 /*
525  * wrapper for ldap_pvt_thread_pool_query(), left around
526  * for backwards compatibility
527  */
528 int
529 ldap_pvt_thread_pool_backload ( ldap_pvt_thread_pool_t *tpool )
530 {
531         int     rc, count;
532
533         rc = ldap_pvt_thread_pool_query( tpool,
534                 LDAP_PVT_THREAD_POOL_PARAM_BACKLOAD, (void *)&count );
535
536         if ( rc == 0 ) {
537                 return count;
538         }
539
540         return rc;
541 }
542
543 /* Destroy the pool after making its threads finish */
544 int
545 ldap_pvt_thread_pool_destroy ( ldap_pvt_thread_pool_t *tpool, int run_pending )
546 {
547         struct ldap_int_thread_pool_s *pool, *pptr;
548         ldap_int_thread_task_t *task;
549
550         if (tpool == NULL)
551                 return(-1);
552
553         pool = *tpool;
554
555         if (pool == NULL) return(-1);
556
557         ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
558         LDAP_STAILQ_FOREACH(pptr, &ldap_int_thread_pool_list, ltp_next)
559                 if (pptr == pool) break;
560         if (pptr == pool)
561                 LDAP_STAILQ_REMOVE(&ldap_int_thread_pool_list, pool,
562                         ldap_int_thread_pool_s, ltp_next);
563         ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
564
565         if (pool != pptr) return(-1);
566
567         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
568
569         pool->ltp_finishing = 1;
570         SET_VARY_OPEN_COUNT(pool);
571         if (pool->ltp_max_pending > 0)
572                 pool->ltp_max_pending = -pool->ltp_max_pending;
573
574         if (!run_pending) {
575                 while ((task = LDAP_STAILQ_FIRST(&pool->ltp_pending_list)) != NULL) {
576                         LDAP_STAILQ_REMOVE_HEAD(&pool->ltp_pending_list, ltt_next.q);
577                         LDAP_FREE(task);
578                 }
579                 pool->ltp_pending_count = 0;
580         }
581
582         while (pool->ltp_open_count) {
583                 if (!pool->ltp_pause)
584                         ldap_pvt_thread_cond_broadcast(&pool->ltp_cond);
585                 ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
586         }
587
588         while ((task = LDAP_SLIST_FIRST(&pool->ltp_free_list)) != NULL)
589         {
590                 LDAP_SLIST_REMOVE_HEAD(&pool->ltp_free_list, ltt_next.l);
591                 LDAP_FREE(task);
592         }
593
594         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
595         ldap_pvt_thread_cond_destroy(&pool->ltp_pcond);
596         ldap_pvt_thread_cond_destroy(&pool->ltp_cond);
597         ldap_pvt_thread_mutex_destroy(&pool->ltp_mutex);
598         LDAP_FREE(pool);
599         *tpool = NULL;
600         ldap_int_has_thread_pool = 0;
601         return(0);
602 }
603
604 /* Thread loop.  Accept and handle submitted tasks. */
605 static void *
606 ldap_int_thread_pool_wrapper ( 
607         void *xpool )
608 {
609         struct ldap_int_thread_pool_s *pool = xpool;
610         ldap_int_thread_task_t *task;
611         ldap_int_tpool_plist_t *work_list;
612         ldap_int_thread_userctx_t ctx, *kctx;
613         unsigned i, keyslot, hash;
614
615         assert(pool != NULL);
616
617         for ( i=0; i<MAXKEYS; i++ ) {
618                 ctx.ltu_key[i].ltk_key = NULL;
619         }
620
621         ctx.ltu_id = ldap_pvt_thread_self();
622         TID_HASH(ctx.ltu_id, hash);
623
624         ldap_pvt_thread_key_setdata( ldap_tpool_key, &ctx );
625
626         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
627
628         /* thread_keys[] is read-only when paused */
629         while (pool->ltp_pause)
630                 ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
631
632         /* find a key slot to give this thread ID and store a
633          * pointer to our keys there; start at the thread ID
634          * itself (mod LDAP_MAXTHR) and look for an empty slot.
635          */
636         ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
637         for (keyslot = hash & (LDAP_MAXTHR-1);
638                 (kctx = thread_keys[keyslot].ctx) && kctx != DELETED_THREAD_CTX;
639                 keyslot = (keyslot+1) & (LDAP_MAXTHR-1));
640         thread_keys[keyslot].ctx = &ctx;
641         ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
642
643         pool->ltp_starting--;
644         pool->ltp_active_count++;
645
646         for (;;) {
647                 work_list = pool->ltp_work_list; /* help the compiler a bit */
648                 task = LDAP_STAILQ_FIRST(work_list);
649                 if (task == NULL) {     /* paused or no pending tasks */
650                         if (--(pool->ltp_active_count) < 2) {
651                                 /* Notify pool_pause it is the sole active thread. */
652                                 ldap_pvt_thread_cond_signal(&pool->ltp_pcond);
653                         }
654
655                         do {
656                                 if (pool->ltp_vary_open_count < 0) {
657                                         /* Not paused, and either finishing or too many
658                                          * threads running (can happen if ltp_max_count
659                                          * was reduced).  Let this thread die.
660                                          */
661                                         goto done;
662                                 }
663
664                                 /* We could check an idle timer here, and let the
665                                  * thread die if it has been inactive for a while.
666                                  * Only die if there are other open threads (i.e.,
667                                  * always have at least one thread open).
668                                  * The check should be like this:
669                                  *   if (pool->ltp_open_count>1 && pool->ltp_starting==0)
670                                  *       check timer, wait if ltp_pause, leave thread;
671                                  *
672                                  * Just use pthread_cond_timedwait() if we want to
673                                  * check idle time.
674                                  */
675                                 ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
676
677                                 work_list = pool->ltp_work_list;
678                                 task = LDAP_STAILQ_FIRST(work_list);
679                         } while (task == NULL);
680
681                         pool->ltp_active_count++;
682                 }
683
684                 LDAP_STAILQ_REMOVE_HEAD(work_list, ltt_next.q);
685                 pool->ltp_pending_count--;
686                 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
687
688                 task->ltt_start_routine(&ctx, task->ltt_arg);
689
690                 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
691                 LDAP_SLIST_INSERT_HEAD(&pool->ltp_free_list, task, ltt_next.l);
692         }
693  done:
694
695         assert(!pool->ltp_pause); /* thread_keys writable, ltp_open_count >= 0 */
696
697         /* The ltp_mutex lock protects ctx->ltu_key from pool_purgekey()
698          * during this call, since it prevents new pauses. */
699         ldap_pvt_thread_pool_context_reset(&ctx);
700
701         ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
702         thread_keys[keyslot].ctx = DELETED_THREAD_CTX;
703         ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
704
705         pool->ltp_open_count--;
706         SET_VARY_OPEN_COUNT(pool);
707         /* let pool_destroy know we're all done */
708         if (pool->ltp_open_count == 0)
709                 ldap_pvt_thread_cond_signal(&pool->ltp_cond);
710
711         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
712
713         ldap_pvt_thread_exit(NULL);
714         return(NULL);
715 }
716
717 /* Arguments > ltp_pause to handle_pause(,PAUSE_ARG()).  arg=PAUSE_ARG
718  * ensures (arg-ltp_pause) sets GO_* at need and keeps DO_PAUSE/GO_*.
719  */
720 #define GO_IDLE         8
721 #define GO_UNIDLE       16
722 #define CHECK_PAUSE     32      /* if ltp_pause: GO_IDLE; wait; GO_UNIDLE */
723 #define DO_PAUSE        64      /* CHECK_PAUSE; pause the pool */
724 #define PAUSE_ARG(a) \
725                 ((a) | ((a) & (GO_IDLE|GO_UNIDLE) ? GO_IDLE-1 : CHECK_PAUSE))
726
727 static int
728 handle_pause( ldap_pvt_thread_pool_t *tpool, int pause_type )
729 {
730         struct ldap_int_thread_pool_s *pool;
731         int ret = 0, pause, max_ltp_pause;
732
733         if (tpool == NULL)
734                 return(-1);
735
736         pool = *tpool;
737
738         if (pool == NULL)
739                 return(0);
740
741         if (pause_type == CHECK_PAUSE && !pool->ltp_pause)
742                 return(0);
743
744         /* Let pool_unidle() ignore requests for new pauses */
745         max_ltp_pause = pause_type==PAUSE_ARG(GO_UNIDLE) ? WANT_PAUSE : NOT_PAUSED;
746
747         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
748
749         pause = pool->ltp_pause;        /* NOT_PAUSED, WANT_PAUSE or PAUSED */
750
751         /* If ltp_pause and not GO_IDLE|GO_UNIDLE: Set GO_IDLE,GO_UNIDLE */
752         pause_type -= pause;
753
754         if (pause_type & GO_IDLE) {
755                 pool->ltp_pending_count++;
756                 pool->ltp_active_count--;
757                 if (pause && pool->ltp_active_count < 2) {
758                         /* Tell the task waiting to DO_PAUSE it can proceed */
759                         ldap_pvt_thread_cond_signal(&pool->ltp_pcond);
760                 }
761         }
762
763         if (pause_type & GO_UNIDLE) {
764                 /* Wait out pause if any, then cancel GO_IDLE */
765                 if (pause > max_ltp_pause) {
766                         ret = 1;
767                         do {
768                                 ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
769                         } while (pool->ltp_pause > max_ltp_pause);
770                 }
771                 pool->ltp_pending_count--;
772                 pool->ltp_active_count++;
773         }
774
775         if (pause_type & DO_PAUSE) {
776                 /* Tell everyone else to pause or finish, then await that */
777                 ret = 0;
778                 assert(!pool->ltp_pause);
779                 pool->ltp_pause = WANT_PAUSE;
780                 /* Let ldap_pvt_thread_pool_submit() through to its ltp_pause test,
781                  * and do not finish threads in ldap_pvt_thread_pool_wrapper() */
782                 pool->ltp_open_count = -pool->ltp_open_count;
783                 SET_VARY_OPEN_COUNT(pool);
784                 /* Hide pending tasks from ldap_pvt_thread_pool_wrapper() */
785                 pool->ltp_work_list = &empty_pending_list;
786                 /* Wait for this task to become the sole active task */
787                 while (pool->ltp_active_count > 1) {
788                         ldap_pvt_thread_cond_wait(&pool->ltp_pcond, &pool->ltp_mutex);
789                 }
790                 assert(pool->ltp_pause == WANT_PAUSE);
791                 pool->ltp_pause = PAUSED;
792         }
793
794         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
795         return(ret);
796 }
797
798 /* Consider this task idle: It will not block pool_pause() in other tasks. */
799 void
800 ldap_pvt_thread_pool_idle( ldap_pvt_thread_pool_t *tpool )
801 {
802         handle_pause(tpool, PAUSE_ARG(GO_IDLE));
803 }
804
805 /* Cancel pool_idle(). If the pool is paused, wait it out first. */
806 void
807 ldap_pvt_thread_pool_unidle( ldap_pvt_thread_pool_t *tpool )
808 {
809         handle_pause(tpool, PAUSE_ARG(GO_UNIDLE));
810 }
811
812 /*
813  * If a pause was requested, wait for it.  If several threads
814  * are waiting to pause, let through one or more pauses.
815  * The calling task must be active, not idle.
816  * Return 1 if we waited, 0 if not, -1 at parameter error.
817  */
818 int
819 ldap_pvt_thread_pool_pausecheck( ldap_pvt_thread_pool_t *tpool )
820 {
821         return handle_pause(tpool, PAUSE_ARG(CHECK_PAUSE));
822 }
823
824 /*
825  * Pause the pool.  The calling task must be active, not idle.
826  * Return when all other tasks are paused or idle.
827  */
828 int
829 ldap_pvt_thread_pool_pause( ldap_pvt_thread_pool_t *tpool )
830 {
831         return handle_pause(tpool, PAUSE_ARG(DO_PAUSE));
832 }
833
834 /* End a pause */
835 int
836 ldap_pvt_thread_pool_resume ( 
837         ldap_pvt_thread_pool_t *tpool )
838 {
839         struct ldap_int_thread_pool_s *pool;
840
841         if (tpool == NULL)
842                 return(-1);
843
844         pool = *tpool;
845
846         if (pool == NULL)
847                 return(0);
848
849         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
850
851         assert(pool->ltp_pause == PAUSED);
852         pool->ltp_pause = 0;
853         if (pool->ltp_open_count <= 0) /* true when paused, but be paranoid */
854                 pool->ltp_open_count = -pool->ltp_open_count;
855         SET_VARY_OPEN_COUNT(pool);
856         pool->ltp_work_list = &pool->ltp_pending_list;
857
858         ldap_pvt_thread_cond_broadcast(&pool->ltp_cond);
859
860         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
861         return(0);
862 }
863
864 /*
865  * Get the key's data and optionally free function in the given context.
866  */
867 int ldap_pvt_thread_pool_getkey(
868         void *xctx,
869         void *key,
870         void **data,
871         ldap_pvt_thread_pool_keyfree_t **kfree )
872 {
873         ldap_int_thread_userctx_t *ctx = xctx;
874         int i;
875
876         if ( !ctx || !key || !data ) return EINVAL;
877
878         for ( i=0; i<MAXKEYS && ctx->ltu_key[i].ltk_key; i++ ) {
879                 if ( ctx->ltu_key[i].ltk_key == key ) {
880                         *data = ctx->ltu_key[i].ltk_data;
881                         if ( kfree ) *kfree = ctx->ltu_key[i].ltk_free;
882                         return 0;
883                 }
884         }
885         return ENOENT;
886 }
887
888 static void
889 clear_key_idx( ldap_int_thread_userctx_t *ctx, int i )
890 {
891         for ( ; i < MAXKEYS-1 && ctx->ltu_key[i+1].ltk_key; i++ )
892                 ctx->ltu_key[i] = ctx->ltu_key[i+1];
893         ctx->ltu_key[i].ltk_key = NULL;
894 }
895
896 /*
897  * Set or remove data for the key in the given context.
898  * key can be any unique pointer.
899  * kfree() is an optional function to free the data (but not the key):
900  *   pool_context_reset() and pool_purgekey() call kfree(key, data),
901  *   but pool_setkey() does not.  For pool_setkey() it is the caller's
902  *   responsibility to free any existing data with the same key.
903  *   kfree() must not call functions taking a tpool argument.
904  */
905 int ldap_pvt_thread_pool_setkey(
906         void *xctx,
907         void *key,
908         void *data,
909         ldap_pvt_thread_pool_keyfree_t *kfree,
910         void **olddatap,
911         ldap_pvt_thread_pool_keyfree_t **oldkfreep )
912 {
913         ldap_int_thread_userctx_t *ctx = xctx;
914         int i, found;
915
916         if ( !ctx || !key ) return EINVAL;
917
918         for ( i=found=0; i<MAXKEYS; i++ ) {
919                 if ( ctx->ltu_key[i].ltk_key == key ) {
920                         found = 1;
921                         break;
922                 } else if ( !ctx->ltu_key[i].ltk_key ) {
923                         break;
924                 }
925         }
926
927         if ( olddatap ) {
928                 if ( found ) {
929                         *olddatap = ctx->ltu_key[i].ltk_data;
930                 } else {
931                         *olddatap = NULL;
932                 }
933         }
934
935         if ( oldkfreep ) {
936                 if ( found ) {
937                         *oldkfreep = ctx->ltu_key[i].ltk_free;
938                 } else {
939                         *oldkfreep = 0;
940                 }
941         }
942
943         if ( data || kfree ) {
944                 if ( i>=MAXKEYS )
945                         return ENOMEM;
946                 ctx->ltu_key[i].ltk_key = key;
947                 ctx->ltu_key[i].ltk_data = data;
948                 ctx->ltu_key[i].ltk_free = kfree;
949         } else if ( found ) {
950                 clear_key_idx( ctx, i );
951         }
952
953         return 0;
954 }
955
956 /* Free all elements with this key, no matter which thread they're in.
957  * May only be called while the pool is paused.
958  */
959 void ldap_pvt_thread_pool_purgekey( void *key )
960 {
961         int i, j;
962         ldap_int_thread_userctx_t *ctx;
963
964         assert ( key != NULL );
965
966         for ( i=0; i<LDAP_MAXTHR; i++ ) {
967                 ctx = thread_keys[i].ctx;
968                 if ( ctx && ctx != DELETED_THREAD_CTX ) {
969                         for ( j=0; j<MAXKEYS && ctx->ltu_key[j].ltk_key; j++ ) {
970                                 if ( ctx->ltu_key[j].ltk_key == key ) {
971                                         if (ctx->ltu_key[j].ltk_free)
972                                                 ctx->ltu_key[j].ltk_free( ctx->ltu_key[j].ltk_key,
973                                                 ctx->ltu_key[j].ltk_data );
974                                         clear_key_idx( ctx, j );
975                                         break;
976                                 }
977                         }
978                 }
979         }
980 }
981
982 /*
983  * Find the context of the current thread.
984  * This is necessary if the caller does not have access to the
985  * thread context handle (for example, a slapd plugin calling
986  * slapi_search_internal()). No doubt it is more efficient
987  * for the application to keep track of the thread context
988  * handles itself.
989  */
990 void *ldap_pvt_thread_pool_context( )
991 {
992         void *ctx = NULL;
993
994         ldap_pvt_thread_key_getdata( ldap_tpool_key, &ctx );
995         return ctx ? ctx : (void *) &ldap_int_main_thrctx;
996 }
997
998 /*
999  * Free the context's keys.
1000  * Must not call functions taking a tpool argument (because this
1001  * thread already holds ltp_mutex when called from pool_wrapper()).
1002  */
1003 void ldap_pvt_thread_pool_context_reset( void *vctx )
1004 {
1005         ldap_int_thread_userctx_t *ctx = vctx;
1006         int i;
1007
1008         for ( i=MAXKEYS-1; i>=0; i--) {
1009                 if ( !ctx->ltu_key[i].ltk_key )
1010                         continue;
1011                 if ( ctx->ltu_key[i].ltk_free )
1012                         ctx->ltu_key[i].ltk_free( ctx->ltu_key[i].ltk_key,
1013                         ctx->ltu_key[i].ltk_data );
1014                 ctx->ltu_key[i].ltk_key = NULL;
1015         }
1016 }
1017
1018 ldap_pvt_thread_t ldap_pvt_thread_pool_tid( void *vctx )
1019 {
1020         ldap_int_thread_userctx_t *ctx = vctx;
1021
1022         return ctx->ltu_id;
1023 }
1024 #endif /* LDAP_THREAD_HAVE_TPOOL */