]> git.sur5r.net Git - openldap/blob - libraries/libldap_r/tpool.c
More fixes for prev commit
[openldap] / libraries / libldap_r / tpool.c
1 /* $OpenLDAP$ */
2 /* This work is part of OpenLDAP Software <http://www.openldap.org/>.
3  *
4  * Copyright 1998-2013 The OpenLDAP Foundation.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted only as authorized by the OpenLDAP
9  * Public License.
10  *
11  * A copy of this license is available in file LICENSE in the
12  * top-level directory of the distribution or, alternatively, at
13  * <http://www.OpenLDAP.org/license.html>.
14  */
15
16 #include "portable.h"
17
18 #include <stdio.h>
19
20 #include <ac/signal.h>
21 #include <ac/stdarg.h>
22 #include <ac/stdlib.h>
23 #include <ac/string.h>
24 #include <ac/time.h>
25 #include <ac/errno.h>
26
27 #include "ldap-int.h"
28 #include "ldap_pvt_thread.h" /* Get the thread interface */
29 #include "ldap_queue.h"
30 #define LDAP_THREAD_POOL_IMPLEMENTATION
31 #include "ldap_thr_debug.h"  /* May rename symbols defined below */
32
33 #ifndef LDAP_THREAD_HAVE_TPOOL
34
35 /* Thread-specific key with data and optional free function */
36 typedef struct ldap_int_tpool_key_s {
37         void *ltk_key;
38         void *ltk_data;
39         ldap_pvt_thread_pool_keyfree_t *ltk_free;
40 } ldap_int_tpool_key_t;
41
42 /* Max number of thread-specific keys we store per thread.
43  * We don't expect to use many...
44  */
45 #define MAXKEYS 32
46
47 /* Max number of threads */
48 #define LDAP_MAXTHR     1024    /* must be a power of 2 */
49
50 /* (Theoretical) max number of pending requests */
51 #define MAX_PENDING (INT_MAX/2) /* INT_MAX - (room to avoid overflow) */
52
53 /* pool->ltp_pause values */
54 enum { NOT_PAUSED = 0, WANT_PAUSE = 1, PAUSED = 2 };
55
56 /* Context: thread ID and thread-specific key/data pairs */
57 typedef struct ldap_int_thread_userctx_s {
58         struct ldap_int_thread_poolq_s *ltu_pq;
59         ldap_pvt_thread_t ltu_id;
60         ldap_int_tpool_key_t ltu_key[MAXKEYS];
61 } ldap_int_thread_userctx_t;
62
63
64 /* Simple {thread ID -> context} hash table; key=ctx->ltu_id.
65  * Protected by ldap_pvt_thread_pool_mutex except during pauses,
66  * when it is read-only (used by pool_purgekey and pool_context).
67  * Protected by ldap_pvt_thread_pool_mutex.
68  */
69 static struct {
70         ldap_int_thread_userctx_t *ctx;
71         /* ctx is valid when not NULL or DELETED_THREAD_CTX */
72 #       define DELETED_THREAD_CTX (&ldap_int_main_thrctx + 1) /* dummy addr */
73 } thread_keys[LDAP_MAXTHR];
74
75 #define TID_HASH(tid, hash) do { \
76         unsigned const char *ptr_ = (unsigned const char *)&(tid); \
77         unsigned i_; \
78         for (i_ = 0, (hash) = ptr_[0]; ++i_ < sizeof(tid);) \
79                 (hash) += ((hash) << 5) ^ ptr_[i_]; \
80 } while(0)
81
82
83 /* Task for a thread to perform */
84 typedef struct ldap_int_thread_task_s {
85         union {
86                 LDAP_STAILQ_ENTRY(ldap_int_thread_task_s) q;
87                 LDAP_SLIST_ENTRY(ldap_int_thread_task_s) l;
88         } ltt_next;
89         ldap_pvt_thread_start_t *ltt_start_routine;
90         void *ltt_arg;
91 } ldap_int_thread_task_t;
92
93 typedef LDAP_STAILQ_HEAD(tcq, ldap_int_thread_task_s) ldap_int_tpool_plist_t;
94
95 struct ldap_int_thread_poolq_s {
96         struct ldap_int_thread_pool_s *ltp_pool;
97
98         /* protect members below, and protect thread_keys[] during pauses */
99         ldap_pvt_thread_mutex_t ltp_mutex;
100
101         /* not paused and something to do for pool_<wrapper/pause/destroy>() */
102         ldap_pvt_thread_cond_t ltp_cond;
103
104         /* ltp_pause == 0 ? &ltp_pending_list : &empty_pending_list,
105          * maintaned to reduce work for pool_wrapper()
106          */
107         ldap_int_tpool_plist_t *ltp_work_list;
108
109         /* pending tasks, and unused task objects */
110         ldap_int_tpool_plist_t ltp_pending_list;
111         LDAP_SLIST_HEAD(tcl, ldap_int_thread_task_s) ltp_free_list;
112
113         /* Max number of threads in pool, or 0 for default (LDAP_MAXTHR) */
114         int ltp_max_count;
115
116         /* Max pending + paused + idle tasks, negated when ltp_finishing */
117         int ltp_max_pending;
118
119         int ltp_pending_count;          /* Pending + paused + idle tasks */
120         int ltp_active_count;           /* Active, not paused/idle tasks */
121         int ltp_open_count;                     /* Number of threads, negated when ltp_pause */
122         int ltp_starting;                       /* Currently starting threads */
123 };
124
125 struct ldap_int_thread_pool_s {
126         LDAP_STAILQ_ENTRY(ldap_int_thread_pool_s) ltp_next;
127
128         struct ldap_int_thread_poolq_s *ltp_wqs;
129
130         /* number of poolqs */
131         int ltp_numqs;
132
133         /* protect members below, and protect thread_keys[] during pauses */
134         ldap_pvt_thread_mutex_t ltp_mutex;
135
136         /* not paused and something to do for pool_<wrapper/pause/destroy>() */
137         ldap_pvt_thread_cond_t ltp_cond;
138
139         /* ltp_active_count <= 1 && ltp_pause */
140         ldap_pvt_thread_cond_t ltp_pcond;
141
142         /* number of active queues */
143         int ltp_active_queues;
144
145         /* The pool is finishing, waiting for its threads to close.
146          * They close when ltp_pending_list is done.  pool_submit()
147          * rejects new tasks.  ltp_max_pending = -(its old value).
148          */
149         int ltp_finishing;
150
151         /* Some active task needs to be the sole active task.
152          * Atomic variable so ldap_pvt_thread_pool_pausing() can read it.
153          */
154         volatile sig_atomic_t ltp_pause;
155
156         /* Max number of threads in pool */
157         int ltp_max_count;
158
159         /* Configured max number of threads in pool, 0 for default (LDAP_MAXTHR) */
160         int ltp_conf_max_count;
161
162         /* Max pending + paused + idle tasks, negated when ltp_finishing */
163         int ltp_max_pending;
164 };
165
166 static ldap_int_tpool_plist_t empty_pending_list =
167         LDAP_STAILQ_HEAD_INITIALIZER(empty_pending_list);
168
169 static int ldap_int_has_thread_pool = 0;
170 static LDAP_STAILQ_HEAD(tpq, ldap_int_thread_pool_s)
171         ldap_int_thread_pool_list =
172         LDAP_STAILQ_HEAD_INITIALIZER(ldap_int_thread_pool_list);
173
174 static ldap_pvt_thread_mutex_t ldap_pvt_thread_pool_mutex;
175
176 static void *ldap_int_thread_pool_wrapper( void *pool );
177
178 static ldap_pvt_thread_key_t    ldap_tpool_key;
179
180 /* Context of the main thread */
181 static ldap_int_thread_userctx_t ldap_int_main_thrctx;
182
183 int
184 ldap_int_thread_pool_startup ( void )
185 {
186         ldap_int_main_thrctx.ltu_id = ldap_pvt_thread_self();
187         ldap_pvt_thread_key_create( &ldap_tpool_key );
188         return ldap_pvt_thread_mutex_init(&ldap_pvt_thread_pool_mutex);
189 }
190
191 int
192 ldap_int_thread_pool_shutdown ( void )
193 {
194         struct ldap_int_thread_pool_s *pool;
195
196         while ((pool = LDAP_STAILQ_FIRST(&ldap_int_thread_pool_list)) != NULL) {
197                 (ldap_pvt_thread_pool_destroy)(&pool, 0); /* ignore thr_debug macro */
198         }
199         ldap_pvt_thread_mutex_destroy(&ldap_pvt_thread_pool_mutex);
200         ldap_pvt_thread_key_destroy( ldap_tpool_key );
201         return(0);
202 }
203
204
205 /* Create a thread pool */
206 int
207 ldap_pvt_thread_pool_init_q (
208         ldap_pvt_thread_pool_t *tpool,
209         int max_threads,
210         int max_pending,
211         int numqs )
212 {
213         ldap_pvt_thread_pool_t pool;
214         struct ldap_int_thread_poolq_s *pq;
215         int i, rc, rem_thr, rem_pend;
216
217         /* multiple pools are currently not supported (ITS#4943) */
218         assert(!ldap_int_has_thread_pool);
219
220         if (! (0 <= max_threads && max_threads <= LDAP_MAXTHR))
221                 max_threads = 0;
222         if (! (1 <= max_pending && max_pending <= MAX_PENDING))
223                 max_pending = MAX_PENDING;
224
225         *tpool = NULL;
226         pool = (ldap_pvt_thread_pool_t) LDAP_CALLOC(1,
227                 sizeof(struct ldap_int_thread_pool_s) +
228                 numqs * sizeof(struct ldap_int_thread_poolq_s));
229
230         if (pool == NULL) return(-1);
231
232         pool->ltp_wqs = (struct ldap_int_thread_poolq_s *)(pool+1);
233         pool->ltp_numqs = numqs;
234         pool->ltp_conf_max_count = max_threads;
235         if ( !max_threads )
236                 max_threads = LDAP_MAXTHR;
237
238         rc = ldap_pvt_thread_mutex_init(&pool->ltp_mutex);
239         if (rc != 0)
240                 return(rc);
241
242         rc = ldap_pvt_thread_cond_init(&pool->ltp_cond);
243         if (rc != 0)
244                 return(rc);
245
246         rc = ldap_pvt_thread_cond_init(&pool->ltp_pcond);
247         if (rc != 0)
248                 return(rc);
249
250         rem_thr = max_threads % numqs;
251         rem_pend = max_pending % numqs;
252         for ( i=0; i<numqs; i++ ) {
253                 pq = &pool->ltp_wqs[i];
254                 pq->ltp_pool = pool;
255                 rc = ldap_pvt_thread_mutex_init(&pq->ltp_mutex);
256                 if (rc != 0)
257                         return(rc);
258                 rc = ldap_pvt_thread_cond_init(&pq->ltp_cond);
259                 if (rc != 0)
260                         return(rc);
261                 LDAP_STAILQ_INIT(&pq->ltp_pending_list);
262                 pq->ltp_work_list = &pq->ltp_pending_list;
263                 LDAP_SLIST_INIT(&pq->ltp_free_list);
264
265                 pq->ltp_max_count = max_threads / numqs;
266                 if ( rem_thr ) {
267                         pq->ltp_max_count++;
268                         rem_thr--;
269                 }
270                 pq->ltp_max_pending = max_pending / numqs;
271                 if ( rem_pend ) {
272                         pq->ltp_max_pending++;
273                         rem_pend--;
274                 }
275         }
276
277         ldap_int_has_thread_pool = 1;
278
279         pool->ltp_max_count = max_threads;
280         pool->ltp_max_pending = max_pending;
281
282         ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
283         LDAP_STAILQ_INSERT_TAIL(&ldap_int_thread_pool_list, pool, ltp_next);
284         ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
285
286         /* Start no threads just yet.  That can break if the process forks
287          * later, as slapd does in order to daemonize.  On at least POSIX,
288          * only the forking thread would survive in the child.  Yet fork()
289          * can't unlock/clean up other threads' locks and data structures,
290          * unless pthread_atfork() handlers have been set up to do so.
291          */
292
293         *tpool = pool;
294         return(0);
295 }
296
297 int
298 ldap_pvt_thread_pool_init (
299         ldap_pvt_thread_pool_t *tpool,
300         int max_threads,
301         int max_pending )
302 {
303         return ldap_pvt_thread_pool_init_q( tpool, max_threads, max_pending, 1 );
304 }
305
306 static int
307 ldap_int_poolq_hash(
308         struct ldap_int_thread_pool_s *pool,
309         void *arg )
310 {
311         int i = 0, j;
312         unsigned char *ptr = (unsigned char *)&arg;
313         /* dumb hash of arg to choose a queue */
314         for (j=0; j<sizeof(arg); j++)
315                 i += *ptr++;
316         i %= pool->ltp_numqs;
317         return i;
318 }
319
320 /* Submit a task to be performed by the thread pool */
321 int
322 ldap_pvt_thread_pool_submit (
323         ldap_pvt_thread_pool_t *tpool,
324         ldap_pvt_thread_start_t *start_routine, void *arg )
325 {
326         struct ldap_int_thread_pool_s *pool;
327         struct ldap_int_thread_poolq_s *pq;
328         ldap_int_thread_task_t *task;
329         ldap_pvt_thread_t thr;
330         int i, j;
331
332         if (tpool == NULL)
333                 return(-1);
334
335         pool = *tpool;
336
337         if (pool == NULL)
338                 return(-1);
339
340         if ( pool->ltp_numqs > 1 )
341                 i = ldap_int_poolq_hash( pool, arg );
342         else
343                 i = 0;
344
345         j = i;
346         while(1) {
347                 ldap_pvt_thread_mutex_lock(&pool->ltp_wqs[i].ltp_mutex);
348                 if (pool->ltp_wqs[i].ltp_pending_count < pool->ltp_wqs[i].ltp_max_pending) {
349                         break;
350                 }
351                 ldap_pvt_thread_mutex_unlock(&pool->ltp_wqs[i].ltp_mutex);
352                 i++;
353                 i %= pool->ltp_numqs;
354                 if ( i == j )
355                         return -1;
356         }
357
358         pq = &pool->ltp_wqs[i];
359         task = LDAP_SLIST_FIRST(&pq->ltp_free_list);
360         if (task) {
361                 LDAP_SLIST_REMOVE_HEAD(&pq->ltp_free_list, ltt_next.l);
362         } else {
363                 task = (ldap_int_thread_task_t *) LDAP_MALLOC(sizeof(*task));
364                 if (task == NULL)
365                         goto failed;
366         }
367
368         task->ltt_start_routine = start_routine;
369         task->ltt_arg = arg;
370
371         pq->ltp_pending_count++;
372         LDAP_STAILQ_INSERT_TAIL(&pq->ltp_pending_list, task, ltt_next.q);
373
374         if (pool->ltp_pause)
375                 goto done;
376
377         /* should we open (create) a thread? */
378         if (pq->ltp_open_count < pq->ltp_active_count+pq->ltp_pending_count &&
379                 pq->ltp_open_count < pq->ltp_max_count)
380         {
381                 pq->ltp_starting++;
382                 pq->ltp_open_count++;
383
384                 if (0 != ldap_pvt_thread_create(
385                         &thr, 1, ldap_int_thread_pool_wrapper, pq))
386                 {
387                         /* couldn't create thread.  back out of
388                          * ltp_open_count and check for even worse things.
389                          */
390                         pq->ltp_starting--;
391                         pq->ltp_open_count--;
392
393                         if (pq->ltp_open_count == 0) {
394                                 /* no open threads at all?!?
395                                  */
396                                 ldap_int_thread_task_t *ptr;
397
398                                 /* let pool_destroy know there are no more threads */
399                                 ldap_pvt_thread_cond_signal(&pq->ltp_cond);
400
401                                 LDAP_STAILQ_FOREACH(ptr, &pq->ltp_pending_list, ltt_next.q)
402                                         if (ptr == task) break;
403                                 if (ptr == task) {
404                                         /* no open threads, task not handled, so
405                                          * back out of ltp_pending_count, free the task,
406                                          * report the error.
407                                          */
408                                         pq->ltp_pending_count--;
409                                         LDAP_STAILQ_REMOVE(&pq->ltp_pending_list, task,
410                                                 ldap_int_thread_task_s, ltt_next.q);
411                                         LDAP_SLIST_INSERT_HEAD(&pq->ltp_free_list, task,
412                                                 ltt_next.l);
413                                         goto failed;
414                                 }
415                         }
416                         /* there is another open thread, so this
417                          * task will be handled eventually.
418                          */
419                 }
420         }
421         ldap_pvt_thread_cond_signal(&pq->ltp_cond);
422
423  done:
424         ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
425         return(0);
426
427  failed:
428         ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
429         return(-1);
430 }
431
432 static void *
433 no_task( void *ctx, void *arg )
434 {
435         return NULL;
436 }
437
438 /* Cancel a pending task that was previously submitted.
439  * Return 1 if the task was successfully cancelled, 0 if
440  * not found, -1 for invalid parameters
441  */
442 int
443 ldap_pvt_thread_pool_retract (
444         ldap_pvt_thread_pool_t *tpool,
445         ldap_pvt_thread_start_t *start_routine, void *arg )
446 {
447         struct ldap_int_thread_pool_s *pool;
448         struct ldap_int_thread_poolq_s *pq;
449         ldap_int_thread_task_t *task;
450         int i;
451
452         if (tpool == NULL)
453                 return(-1);
454
455         pool = *tpool;
456
457         if (pool == NULL)
458                 return(-1);
459
460         i = ldap_int_poolq_hash( pool, arg );
461         pq = &pool->ltp_wqs[i];
462
463         ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
464         LDAP_STAILQ_FOREACH(task, &pq->ltp_pending_list, ltt_next.q)
465                 if (task->ltt_start_routine == start_routine &&
466                         task->ltt_arg == arg) {
467                         /* Could LDAP_STAILQ_REMOVE the task, but that
468                          * walks ltp_pending_list again to find it.
469                          */
470                         task->ltt_start_routine = no_task;
471                         task->ltt_arg = NULL;
472                         break;
473                 }
474         ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
475         return task != NULL;
476 }
477
478 /* Set max #threads.  value <= 0 means max supported #threads (LDAP_MAXTHR) */
479 int
480 ldap_pvt_thread_pool_maxthreads(
481         ldap_pvt_thread_pool_t *tpool,
482         int max_threads )
483 {
484         struct ldap_int_thread_pool_s *pool;
485         struct ldap_int_thread_poolq_s *pq;
486         int remthr, i;
487
488         if (! (0 <= max_threads && max_threads <= LDAP_MAXTHR))
489                 max_threads = 0;
490
491         if (tpool == NULL)
492                 return(-1);
493
494         pool = *tpool;
495
496         if (pool == NULL)
497                 return(-1);
498
499         pool->ltp_conf_max_count = max_threads;
500         if ( !max_threads )
501                 max_threads = LDAP_MAXTHR;
502         pool->ltp_max_count = max_threads;
503
504         remthr = max_threads % pool->ltp_numqs;
505         max_threads /= pool->ltp_numqs;
506
507         for (i=0; i<pool->ltp_numqs; i++) {
508                 pq = &pool->ltp_wqs[i];
509                 ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
510                 pq->ltp_max_count = max_threads;
511                 if (remthr) {
512                         pq->ltp_max_count++;
513                         remthr--;
514                 }
515                 ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
516         }
517         return(0);
518 }
519
520 /* Inspect the pool */
521 int
522 ldap_pvt_thread_pool_query(
523         ldap_pvt_thread_pool_t *tpool,
524         ldap_pvt_thread_pool_param_t param,
525         void *value )
526 {
527         struct ldap_int_thread_pool_s   *pool;
528         int                             count = -1;
529
530         if ( tpool == NULL || value == NULL ) {
531                 return -1;
532         }
533
534         pool = *tpool;
535
536         if ( pool == NULL ) {
537                 return 0;
538         }
539
540         switch ( param ) {
541         case LDAP_PVT_THREAD_POOL_PARAM_MAX:
542                 count = pool->ltp_conf_max_count;
543                 break;
544
545         case LDAP_PVT_THREAD_POOL_PARAM_MAX_PENDING:
546                 count = pool->ltp_max_pending;
547                 if (count < 0)
548                         count = -count;
549                 if (count == MAX_PENDING)
550                         count = 0;
551                 break;
552
553         case LDAP_PVT_THREAD_POOL_PARAM_PAUSING:
554                 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
555                 count = (pool->ltp_pause != 0);
556                 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
557                 break;
558
559         case LDAP_PVT_THREAD_POOL_PARAM_OPEN:
560         case LDAP_PVT_THREAD_POOL_PARAM_STARTING:
561         case LDAP_PVT_THREAD_POOL_PARAM_ACTIVE:
562         case LDAP_PVT_THREAD_POOL_PARAM_PENDING:
563         case LDAP_PVT_THREAD_POOL_PARAM_BACKLOAD:
564                 {
565                         int i;
566                         count = 0;
567                         for (i=0; i<pool->ltp_numqs; i++) {
568                                 struct ldap_int_thread_poolq_s *pq = &pool->ltp_wqs[i];
569                                 ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
570                                 switch(param) {
571                                         case LDAP_PVT_THREAD_POOL_PARAM_OPEN:
572                                                 count += pq->ltp_open_count;
573                                                 break;
574                                         case LDAP_PVT_THREAD_POOL_PARAM_STARTING:
575                                                 count += pq->ltp_starting;
576                                                 break;
577                                         case LDAP_PVT_THREAD_POOL_PARAM_ACTIVE:
578                                                 count += pq->ltp_active_count;
579                                                 break;
580                                         case LDAP_PVT_THREAD_POOL_PARAM_PENDING:
581                                                 count += pq->ltp_pending_count;
582                                                 break;
583                                         case LDAP_PVT_THREAD_POOL_PARAM_BACKLOAD:
584                                                 count += pq->ltp_pending_count + pq->ltp_active_count;
585                                                 break;
586                                 }
587                                 ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
588                         }
589                         if (count < 0)
590                                 count = -count;
591                 }
592                 break;
593
594         case LDAP_PVT_THREAD_POOL_PARAM_ACTIVE_MAX:
595                 break;
596
597         case LDAP_PVT_THREAD_POOL_PARAM_PENDING_MAX:
598                 break;
599
600         case LDAP_PVT_THREAD_POOL_PARAM_BACKLOAD_MAX:
601                 break;
602
603         case LDAP_PVT_THREAD_POOL_PARAM_STATE:
604                 if (pool->ltp_pause)
605                         *((char **)value) = "pausing";
606                 else if (!pool->ltp_finishing)
607                         *((char **)value) = "running";
608                 else {
609                         int i;
610                         for (i=0; i<pool->ltp_numqs; i++)
611                                 if (pool->ltp_wqs[i].ltp_pending_count) break;
612                         if (i<pool->ltp_numqs)
613                                 *((char **)value) = "finishing";
614                         else
615                                 *((char **)value) = "stopping";
616                 }
617                 break;
618
619         case LDAP_PVT_THREAD_POOL_PARAM_UNKNOWN:
620                 break;
621         }
622
623         if ( count > -1 ) {
624                 *((int *)value) = count;
625         }
626
627         return ( count == -1 ? -1 : 0 );
628 }
629
630 /*
631  * true if pool is pausing; does not lock any mutex to check.
632  * 0 if not pause, 1 if pause, -1 if error or no pool.
633  */
634 int
635 ldap_pvt_thread_pool_pausing( ldap_pvt_thread_pool_t *tpool )
636 {
637         int rc = -1;
638         struct ldap_int_thread_pool_s *pool;
639
640         if ( tpool != NULL && (pool = *tpool) != NULL ) {
641                 rc = (pool->ltp_pause != 0);
642         }
643
644         return rc;
645 }
646
647 /*
648  * wrapper for ldap_pvt_thread_pool_query(), left around
649  * for backwards compatibility
650  */
651 int
652 ldap_pvt_thread_pool_backload ( ldap_pvt_thread_pool_t *tpool )
653 {
654         int     rc, count;
655
656         rc = ldap_pvt_thread_pool_query( tpool,
657                 LDAP_PVT_THREAD_POOL_PARAM_BACKLOAD, (void *)&count );
658
659         if ( rc == 0 ) {
660                 return count;
661         }
662
663         return rc;
664 }
665
666 /* Destroy the pool after making its threads finish */
667 int
668 ldap_pvt_thread_pool_destroy ( ldap_pvt_thread_pool_t *tpool, int run_pending )
669 {
670         struct ldap_int_thread_pool_s *pool, *pptr;
671         struct ldap_int_thread_poolq_s *pq;
672         ldap_int_thread_task_t *task;
673         int i;
674
675         if (tpool == NULL)
676                 return(-1);
677
678         pool = *tpool;
679
680         if (pool == NULL) return(-1);
681
682         ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
683         LDAP_STAILQ_FOREACH(pptr, &ldap_int_thread_pool_list, ltp_next)
684                 if (pptr == pool) break;
685         if (pptr == pool)
686                 LDAP_STAILQ_REMOVE(&ldap_int_thread_pool_list, pool,
687                         ldap_int_thread_pool_s, ltp_next);
688         ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
689
690         if (pool != pptr) return(-1);
691
692         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
693
694         pool->ltp_finishing = 1;
695         if (pool->ltp_max_pending > 0)
696                 pool->ltp_max_pending = -pool->ltp_max_pending;
697
698         ldap_pvt_thread_cond_broadcast(&pool->ltp_cond);
699         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
700
701         for (i=0; i<pool->ltp_numqs; i++) {
702                 pq = &pool->ltp_wqs[i];
703                 ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
704                 if (pq->ltp_max_pending > 0)
705                         pq->ltp_max_pending = -pq->ltp_max_pending;
706                 if (!run_pending) {
707                         while ((task = LDAP_STAILQ_FIRST(&pq->ltp_pending_list)) != NULL) {
708                                 LDAP_STAILQ_REMOVE_HEAD(&pq->ltp_pending_list, ltt_next.q);
709                                 LDAP_FREE(task);
710                         }
711                         pq->ltp_pending_count = 0;
712                 }
713
714                 while (pq->ltp_open_count) {
715                         ldap_pvt_thread_cond_broadcast(&pq->ltp_cond);
716                         ldap_pvt_thread_cond_wait(&pq->ltp_cond, &pq->ltp_mutex);
717                 }
718
719                 while ((task = LDAP_SLIST_FIRST(&pq->ltp_free_list)) != NULL)
720                 {
721                         LDAP_SLIST_REMOVE_HEAD(&pq->ltp_free_list, ltt_next.l);
722                         LDAP_FREE(task);
723                 }
724                 ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
725                 ldap_pvt_thread_cond_destroy(&pq->ltp_cond);
726                 ldap_pvt_thread_mutex_destroy(&pq->ltp_mutex);
727         }
728
729         ldap_pvt_thread_cond_destroy(&pool->ltp_pcond);
730         ldap_pvt_thread_cond_destroy(&pool->ltp_cond);
731         ldap_pvt_thread_mutex_destroy(&pool->ltp_mutex);
732         LDAP_FREE(pool);
733         *tpool = NULL;
734         ldap_int_has_thread_pool = 0;
735         return(0);
736 }
737
738 /* Thread loop.  Accept and handle submitted tasks. */
739 static void *
740 ldap_int_thread_pool_wrapper ( 
741         void *xpool )
742 {
743         struct ldap_int_thread_poolq_s *pq = xpool;
744         struct ldap_int_thread_pool_s *pool = pq->ltp_pool;
745         ldap_int_thread_task_t *task;
746         ldap_int_tpool_plist_t *work_list;
747         ldap_int_thread_userctx_t ctx, *kctx;
748         unsigned i, keyslot, hash;
749         int global_lock = 0;
750
751         assert(pool != NULL);
752
753         for ( i=0; i<MAXKEYS; i++ ) {
754                 ctx.ltu_key[i].ltk_key = NULL;
755         }
756
757         ctx.ltu_pq = pq;
758         ctx.ltu_id = ldap_pvt_thread_self();
759         TID_HASH(ctx.ltu_id, hash);
760
761         ldap_pvt_thread_key_setdata( ldap_tpool_key, &ctx );
762
763         if (pool->ltp_pause) {
764                 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
765                 /* thread_keys[] is read-only when paused */
766                 while (pool->ltp_pause)
767                         ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
768                 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
769         }
770
771         /* find a key slot to give this thread ID and store a
772          * pointer to our keys there; start at the thread ID
773          * itself (mod LDAP_MAXTHR) and look for an empty slot.
774          */
775         ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
776         for (keyslot = hash & (LDAP_MAXTHR-1);
777                 (kctx = thread_keys[keyslot].ctx) && kctx != DELETED_THREAD_CTX;
778                 keyslot = (keyslot+1) & (LDAP_MAXTHR-1));
779         thread_keys[keyslot].ctx = &ctx;
780         ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
781
782         ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
783         pq->ltp_starting--;
784         pq->ltp_active_count++;
785
786         for (;;) {
787                 work_list = pq->ltp_work_list; /* help the compiler a bit */
788                 task = LDAP_STAILQ_FIRST(work_list);
789                 if (task == NULL) {     /* paused or no pending tasks */
790                         if (--(pq->ltp_active_count) < 1) {
791                                 if (pool->ltp_pause) {
792                                         ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
793                                         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
794                                         global_lock = 1;
795                                         if (--(pool->ltp_active_queues) < 1) {
796                                                 /* Notify pool_pause it is the sole active thread. */
797                                                 ldap_pvt_thread_cond_signal(&pool->ltp_pcond);
798                                         }
799                                 }
800                         }
801
802                         do {
803                                 if (pool->ltp_finishing || pq->ltp_open_count > pq->ltp_max_count) {
804                                         /* Not paused, and either finishing or too many
805                                          * threads running (can happen if ltp_max_count
806                                          * was reduced).  Let this thread die.
807                                          */
808                                         goto done;
809                                 }
810
811                                 /* We could check an idle timer here, and let the
812                                  * thread die if it has been inactive for a while.
813                                  * Only die if there are other open threads (i.e.,
814                                  * always have at least one thread open).
815                                  * The check should be like this:
816                                  *   if (pool->ltp_open_count>1 && pool->ltp_starting==0)
817                                  *       check timer, wait if ltp_pause, leave thread;
818                                  *
819                                  * Just use pthread_cond_timedwait() if we want to
820                                  * check idle time.
821                                  */
822                                 if (global_lock) {
823                                         ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
824                                         if (!pool->ltp_pause) {
825                                                 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
826                                                 ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
827                                                 global_lock = 0;
828                                         }
829                                 } else
830                                         ldap_pvt_thread_cond_wait(&pq->ltp_cond, &pq->ltp_mutex);
831
832                                 work_list = pq->ltp_work_list;
833                                 task = LDAP_STAILQ_FIRST(work_list);
834                         } while (task == NULL);
835
836                         if (global_lock) {
837                                 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
838                                 ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
839                                 global_lock = 0;
840                         }
841                         pq->ltp_active_count++;
842                 }
843
844                 LDAP_STAILQ_REMOVE_HEAD(work_list, ltt_next.q);
845                 pq->ltp_pending_count--;
846                 ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
847
848                 task->ltt_start_routine(&ctx, task->ltt_arg);
849
850                 ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
851                 LDAP_SLIST_INSERT_HEAD(&pq->ltp_free_list, task, ltt_next.l);
852         }
853  done:
854
855         ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
856
857         /* The pool_mutex lock protects ctx->ltu_key from pool_purgekey()
858          * during this call, since it prevents new pauses. */
859         ldap_pvt_thread_pool_context_reset(&ctx);
860
861         thread_keys[keyslot].ctx = DELETED_THREAD_CTX;
862         ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
863
864         pq->ltp_open_count--;
865         /* let pool_destroy know we're all done */
866         if (pq->ltp_open_count == 0)
867                 ldap_pvt_thread_cond_signal(&pq->ltp_cond);
868
869         if (global_lock)
870                 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
871         else
872                 ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
873
874         ldap_pvt_thread_exit(NULL);
875         return(NULL);
876 }
877
878 /* Arguments > ltp_pause to handle_pause(,PAUSE_ARG()).  arg=PAUSE_ARG
879  * ensures (arg-ltp_pause) sets GO_* at need and keeps DO_PAUSE/GO_*.
880  */
881 #define GO_IDLE         8
882 #define GO_UNIDLE       16
883 #define CHECK_PAUSE     32      /* if ltp_pause: GO_IDLE; wait; GO_UNIDLE */
884 #define DO_PAUSE        64      /* CHECK_PAUSE; pause the pool */
885 #define PAUSE_ARG(a) \
886                 ((a) | ((a) & (GO_IDLE|GO_UNIDLE) ? GO_IDLE-1 : CHECK_PAUSE))
887
888 static int
889 handle_pause( ldap_pvt_thread_pool_t *tpool, int pause_type )
890 {
891         struct ldap_int_thread_pool_s *pool;
892         struct ldap_int_thread_poolq_s *pq;
893         int ret = 0, pause, max_ltp_pause;
894
895         if (tpool == NULL)
896                 return(-1);
897
898         pool = *tpool;
899
900         if (pool == NULL)
901                 return(0);
902
903         if (pause_type == CHECK_PAUSE && !pool->ltp_pause)
904                 return(0);
905
906         {
907                 ldap_int_thread_userctx_t *ctx = ldap_pvt_thread_pool_context();
908                 pq = ctx->ltu_pq;
909         }
910
911         /* Let pool_unidle() ignore requests for new pauses */
912         max_ltp_pause = pause_type==PAUSE_ARG(GO_UNIDLE) ? WANT_PAUSE : NOT_PAUSED;
913
914         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
915
916         pause = pool->ltp_pause;        /* NOT_PAUSED, WANT_PAUSE or PAUSED */
917
918         /* If ltp_pause and not GO_IDLE|GO_UNIDLE: Set GO_IDLE,GO_UNIDLE */
919         pause_type -= pause;
920
921         if (pause_type & GO_IDLE) {
922                 int do_pool = 0;
923                 ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
924                 pq->ltp_pending_count++;
925                 pq->ltp_active_count--;
926                 if (pause && pq->ltp_active_count < 1) {
927                         do_pool = 1;
928                 }
929                 ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
930                 if (do_pool) {
931                         pool->ltp_active_queues--;
932                         if (pool->ltp_active_queues < 1)
933                         /* Tell the task waiting to DO_PAUSE it can proceed */
934                                 ldap_pvt_thread_cond_signal(&pool->ltp_pcond);
935                 }
936         }
937
938         if (pause_type & GO_UNIDLE) {
939                 /* Wait out pause if any, then cancel GO_IDLE */
940                 if (pause > max_ltp_pause) {
941                         ret = 1;
942                         do {
943                                 ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
944                         } while (pool->ltp_pause > max_ltp_pause);
945                 }
946                 ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
947                 pq->ltp_pending_count--;
948                 pq->ltp_active_count++;
949                 ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
950         }
951
952         if (pause_type & DO_PAUSE) {
953                 int i, j;
954                 /* Tell everyone else to pause or finish, then await that */
955                 ret = 0;
956                 assert(!pool->ltp_pause);
957                 pool->ltp_pause = WANT_PAUSE;
958                 pool->ltp_active_queues = 0;
959
960                 for (i=0; i<pool->ltp_numqs; i++)
961                         if (&pool->ltp_wqs[i] == pq) break;
962
963                 ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
964                 /* temporarily remove ourself from active count */
965                 pq->ltp_active_count--;
966
967                 j=i;
968                 do {
969                         pq = &pool->ltp_wqs[j];
970                         if (j != i)
971                                 ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
972
973                         /* Let ldap_pvt_thread_pool_submit() through to its ltp_pause test,
974                          * and do not finish threads in ldap_pvt_thread_pool_wrapper() */
975                         pq->ltp_open_count = -pq->ltp_open_count;
976                         /* Hide pending tasks from ldap_pvt_thread_pool_wrapper() */
977                         pq->ltp_work_list = &empty_pending_list;
978
979                         if (pq->ltp_active_count > 0)
980                                 pool->ltp_active_queues++;
981
982                         ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
983                         if (pool->ltp_numqs > 1) {
984                                 j++;
985                                 j %= pool->ltp_numqs;
986                         }
987                 } while (j != i);
988
989                 /* Wait for this task to become the sole active task */
990                 while (pool->ltp_active_queues > 0)
991                         ldap_pvt_thread_cond_wait(&pool->ltp_pcond, &pool->ltp_mutex);
992
993                 /* restore us to active count */
994                 pool->ltp_wqs[i].ltp_active_count++;
995
996                 assert(pool->ltp_pause == WANT_PAUSE);
997                 pool->ltp_pause = PAUSED;
998         }
999         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
1000
1001         return(ret);
1002 }
1003
1004 /* Consider this task idle: It will not block pool_pause() in other tasks. */
1005 void
1006 ldap_pvt_thread_pool_idle( ldap_pvt_thread_pool_t *tpool )
1007 {
1008         handle_pause(tpool, PAUSE_ARG(GO_IDLE));
1009 }
1010
1011 /* Cancel pool_idle(). If the pool is paused, wait it out first. */
1012 void
1013 ldap_pvt_thread_pool_unidle( ldap_pvt_thread_pool_t *tpool )
1014 {
1015         handle_pause(tpool, PAUSE_ARG(GO_UNIDLE));
1016 }
1017
1018 /*
1019  * If a pause was requested, wait for it.  If several threads
1020  * are waiting to pause, let through one or more pauses.
1021  * The calling task must be active, not idle.
1022  * Return 1 if we waited, 0 if not, -1 at parameter error.
1023  */
1024 int
1025 ldap_pvt_thread_pool_pausecheck( ldap_pvt_thread_pool_t *tpool )
1026 {
1027         return handle_pause(tpool, PAUSE_ARG(CHECK_PAUSE));
1028 }
1029
1030 /*
1031  * Pause the pool.  The calling task must be active, not idle.
1032  * Return when all other tasks are paused or idle.
1033  */
1034 int
1035 ldap_pvt_thread_pool_pause( ldap_pvt_thread_pool_t *tpool )
1036 {
1037         return handle_pause(tpool, PAUSE_ARG(DO_PAUSE));
1038 }
1039
1040 /* End a pause */
1041 int
1042 ldap_pvt_thread_pool_resume ( 
1043         ldap_pvt_thread_pool_t *tpool )
1044 {
1045         struct ldap_int_thread_pool_s *pool;
1046         struct ldap_int_thread_poolq_s *pq;
1047         int i;
1048
1049         if (tpool == NULL)
1050                 return(-1);
1051
1052         pool = *tpool;
1053
1054         if (pool == NULL)
1055                 return(0);
1056
1057         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
1058         assert(pool->ltp_pause == PAUSED);
1059         pool->ltp_pause = 0;
1060         for (i=0; i<pool->ltp_numqs; i++) {
1061                 pq = &pool->ltp_wqs[i];
1062                 if (pq->ltp_open_count <= 0) /* true when paused, but be paranoid */
1063                         pq->ltp_open_count = -pq->ltp_open_count;
1064                 pq->ltp_work_list = &pq->ltp_pending_list;
1065                 ldap_pvt_thread_cond_broadcast(&pq->ltp_cond);
1066         }
1067         ldap_pvt_thread_cond_broadcast(&pool->ltp_cond);
1068         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
1069         return(0);
1070 }
1071
1072 /*
1073  * Get the key's data and optionally free function in the given context.
1074  */
1075 int ldap_pvt_thread_pool_getkey(
1076         void *xctx,
1077         void *key,
1078         void **data,
1079         ldap_pvt_thread_pool_keyfree_t **kfree )
1080 {
1081         ldap_int_thread_userctx_t *ctx = xctx;
1082         int i;
1083
1084         if ( !ctx || !key || !data ) return EINVAL;
1085
1086         for ( i=0; i<MAXKEYS && ctx->ltu_key[i].ltk_key; i++ ) {
1087                 if ( ctx->ltu_key[i].ltk_key == key ) {
1088                         *data = ctx->ltu_key[i].ltk_data;
1089                         if ( kfree ) *kfree = ctx->ltu_key[i].ltk_free;
1090                         return 0;
1091                 }
1092         }
1093         return ENOENT;
1094 }
1095
1096 static void
1097 clear_key_idx( ldap_int_thread_userctx_t *ctx, int i )
1098 {
1099         for ( ; i < MAXKEYS-1 && ctx->ltu_key[i+1].ltk_key; i++ )
1100                 ctx->ltu_key[i] = ctx->ltu_key[i+1];
1101         ctx->ltu_key[i].ltk_key = NULL;
1102 }
1103
1104 /*
1105  * Set or remove data for the key in the given context.
1106  * key can be any unique pointer.
1107  * kfree() is an optional function to free the data (but not the key):
1108  *   pool_context_reset() and pool_purgekey() call kfree(key, data),
1109  *   but pool_setkey() does not.  For pool_setkey() it is the caller's
1110  *   responsibility to free any existing data with the same key.
1111  *   kfree() must not call functions taking a tpool argument.
1112  */
1113 int ldap_pvt_thread_pool_setkey(
1114         void *xctx,
1115         void *key,
1116         void *data,
1117         ldap_pvt_thread_pool_keyfree_t *kfree,
1118         void **olddatap,
1119         ldap_pvt_thread_pool_keyfree_t **oldkfreep )
1120 {
1121         ldap_int_thread_userctx_t *ctx = xctx;
1122         int i, found;
1123
1124         if ( !ctx || !key ) return EINVAL;
1125
1126         for ( i=found=0; i<MAXKEYS; i++ ) {
1127                 if ( ctx->ltu_key[i].ltk_key == key ) {
1128                         found = 1;
1129                         break;
1130                 } else if ( !ctx->ltu_key[i].ltk_key ) {
1131                         break;
1132                 }
1133         }
1134
1135         if ( olddatap ) {
1136                 if ( found ) {
1137                         *olddatap = ctx->ltu_key[i].ltk_data;
1138                 } else {
1139                         *olddatap = NULL;
1140                 }
1141         }
1142
1143         if ( oldkfreep ) {
1144                 if ( found ) {
1145                         *oldkfreep = ctx->ltu_key[i].ltk_free;
1146                 } else {
1147                         *oldkfreep = 0;
1148                 }
1149         }
1150
1151         if ( data || kfree ) {
1152                 if ( i>=MAXKEYS )
1153                         return ENOMEM;
1154                 ctx->ltu_key[i].ltk_key = key;
1155                 ctx->ltu_key[i].ltk_data = data;
1156                 ctx->ltu_key[i].ltk_free = kfree;
1157         } else if ( found ) {
1158                 clear_key_idx( ctx, i );
1159         }
1160
1161         return 0;
1162 }
1163
1164 /* Free all elements with this key, no matter which thread they're in.
1165  * May only be called while the pool is paused.
1166  */
1167 void ldap_pvt_thread_pool_purgekey( void *key )
1168 {
1169         int i, j;
1170         ldap_int_thread_userctx_t *ctx;
1171
1172         assert ( key != NULL );
1173
1174         ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
1175         for ( i=0; i<LDAP_MAXTHR; i++ ) {
1176                 ctx = thread_keys[i].ctx;
1177                 if ( ctx && ctx != DELETED_THREAD_CTX ) {
1178                         for ( j=0; j<MAXKEYS && ctx->ltu_key[j].ltk_key; j++ ) {
1179                                 if ( ctx->ltu_key[j].ltk_key == key ) {
1180                                         if (ctx->ltu_key[j].ltk_free)
1181                                                 ctx->ltu_key[j].ltk_free( ctx->ltu_key[j].ltk_key,
1182                                                 ctx->ltu_key[j].ltk_data );
1183                                         clear_key_idx( ctx, j );
1184                                         break;
1185                                 }
1186                         }
1187                 }
1188         }
1189         ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
1190 }
1191
1192 /*
1193  * Find the context of the current thread.
1194  * This is necessary if the caller does not have access to the
1195  * thread context handle (for example, a slapd plugin calling
1196  * slapi_search_internal()). No doubt it is more efficient
1197  * for the application to keep track of the thread context
1198  * handles itself.
1199  */
1200 void *ldap_pvt_thread_pool_context( )
1201 {
1202         void *ctx = NULL;
1203
1204         ldap_pvt_thread_key_getdata( ldap_tpool_key, &ctx );
1205         return ctx ? ctx : (void *) &ldap_int_main_thrctx;
1206 }
1207
1208 /*
1209  * Free the context's keys.
1210  * Must not call functions taking a tpool argument (because this
1211  * thread already holds ltp_mutex when called from pool_wrapper()).
1212  */
1213 void ldap_pvt_thread_pool_context_reset( void *vctx )
1214 {
1215         ldap_int_thread_userctx_t *ctx = vctx;
1216         int i;
1217
1218         for ( i=MAXKEYS-1; i>=0; i--) {
1219                 if ( !ctx->ltu_key[i].ltk_key )
1220                         continue;
1221                 if ( ctx->ltu_key[i].ltk_free )
1222                         ctx->ltu_key[i].ltk_free( ctx->ltu_key[i].ltk_key,
1223                         ctx->ltu_key[i].ltk_data );
1224                 ctx->ltu_key[i].ltk_key = NULL;
1225         }
1226 }
1227
1228 ldap_pvt_thread_t ldap_pvt_thread_pool_tid( void *vctx )
1229 {
1230         ldap_int_thread_userctx_t *ctx = vctx;
1231
1232         return ctx->ltu_id;
1233 }
1234 #endif /* LDAP_THREAD_HAVE_TPOOL */